BasePlatform: Move the event indexing methods into a separate class.

pull/21833/head
Damir Jelić 2019-11-13 12:25:16 +01:00
parent 80b28004e1
commit f453fea24a
4 changed files with 246 additions and 71 deletions

View File

@ -0,0 +1,208 @@
// @flow
/*
Copyright 2019 New Vector Ltd
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export interface MatrixEvent {
type: string;
sender: string;
content: {};
event_id: string;
origin_server_ts: number;
unsigned: ?{};
room_id: string;
}
export interface MatrixProfile {
avatar_url: string;
displayname: string;
}
export interface CrawlerCheckpoint {
roomId: string;
token: string;
fullCrawl: boolean;
direction: string;
}
export interface ResultContext {
events_before: [MatrixEvent];
events_after: [MatrixEvent];
profile_info: Map<string, MatrixProfile>;
}
export interface ResultsElement {
rank: number;
result: MatrixEvent;
context: ResultContext;
}
export interface SearchResult {
count: number;
results: [ResultsElement];
highlights: [string];
}
export interface SearchArgs {
search_term: string;
before_limit: number;
after_limit: number;
order_by_recency: boolean;
room_id: ?string;
}
export interface HistoricEvent {
event: MatrixEvent;
profile: MatrixProfile;
}
/**
* Base class for classes that provide platform-specific event indexing.
*
* Instances of this class are provided by the application.
*/
export default class BaseEventIndexManager {
/**
* Initialize the event index for the given user.
*
* @param {string} userId The unique identifier of the logged in user that
* owns the index.
*
* @return {Promise} A promise that will resolve when the event index is
* initialized.
*/
async initEventIndex(userId: string): Promise<> {
throw new Error("Unimplemented");
}
/**
* Queue up an event to be added to the index.
*
* @param {MatrixEvent} ev The event that should be added to the index.
* @param {MatrixProfile} profile The profile of the event sender at the
* time of the event receival.
*
* @return {Promise} A promise that will resolve when the was queued up for
* addition.
*/
async addEventToIndex(ev: MatrixEvent, profile: MatrixProfile): Promise<> {
throw new Error("Unimplemented");
}
/**
* Check if our event index is empty.
*
* @return {Promise<boolean>} A promise that will resolve to true if the
* event index is empty, false otherwise.
*/
indexIsEmpty(): Promise<boolean> {
throw new Error("Unimplemented");
}
/**
* Commit the previously queued up events to the index.
*
* @return {Promise} A promise that will resolve once the queued up events
* were added to the index.
*/
async commitLiveEvents(): Promise<> {
throw new Error("Unimplemented");
}
/**
* Search the event index using the given term for matching events.
*
* @param {SearchArgs} searchArgs The search configuration sets what should
* be searched for and what should be contained in the search result.
*
* @return {Promise<[SearchResult]>} A promise that will resolve to an array
* of search results once the search is done.
*/
async searchEventIndex(searchArgs: SearchArgs): Promise<SearchResult> {
throw new Error("Unimplemented");
}
/**
* Add events from the room history to the event index.
*
* This is used to add a batch of events to the index.
*
* @param {[HistoricEvent]} events The list of events and profiles that
* should be added to the event index.
* @param {[CrawlerCheckpoint]} checkpoint A new crawler checkpoint that
* should be stored in the index which should be used to continue crawling
* the room.
* @param {[CrawlerCheckpoint]} oldCheckpoint The checkpoint that was used
* to fetch the current batch of events. This checkpoint will be removed
* from the index.
*
* @return {Promise} A promise that will resolve to true if all the events
* were already added to the index, false otherwise.
*/
async addHistoricEvents(
events: [HistoricEvent],
checkpoint: CrawlerCheckpoint | null = null,
oldCheckpoint: CrawlerCheckpoint | null = null,
): Promise<bool> {
throw new Error("Unimplemented");
}
/**
* Add a new crawler checkpoint to the index.
*
* @param {CrawlerCheckpoint} checkpoint The checkpoint that should be added
* to the index.
*
* @return {Promise} A promise that will resolve once the checkpoint has
* been stored.
*/
async addCrawlerCheckpoint(checkpoint: CrawlerCheckpoint): Promise<> {
throw new Error("Unimplemented");
}
/**
* Add a new crawler checkpoint to the index.
*
* @param {CrawlerCheckpoint} checkpoint The checkpoint that should be
* removed from the index.
*
* @return {Promise} A promise that will resolve once the checkpoint has
* been removed.
*/
async removeCrawlerCheckpoint(checkpoint: CrawlerCheckpoint): Promise<> {
throw new Error("Unimplemented");
}
/**
* Load the stored checkpoints from the index.
*
* @return {Promise<[CrawlerCheckpoint]>} A promise that will resolve to an
* array of crawler checkpoints once they have been loaded from the index.
*/
async loadCheckpoints(): Promise<[CrawlerCheckpoint]> {
throw new Error("Unimplemented");
}
/**
* Delete our current event index.
*
* @return {Promise} A promise that will resolve once the event index has
* been deleted.
*/
async deleteEventIndex(): Promise<> {
throw new Error("Unimplemented");
}
}

View File

@ -19,6 +19,7 @@ limitations under the License.
*/
import dis from './dispatcher';
import BaseEventIndexManager from './BaseEventIndexManager';
/**
* Base class for classes that provide platform-specific functionality
@ -152,43 +153,7 @@ export default class BasePlatform {
throw new Error("Unimplemented");
}
supportsEventIndexing(): boolean {
return false;
}
async initEventIndex(userId: string): boolean {
throw new Error("Unimplemented");
}
async addEventToIndex(ev: {}, profile: {}): void {
throw new Error("Unimplemented");
}
indexIsEmpty(): Promise<boolean> {
throw new Error("Unimplemented");
}
async commitLiveEvents(): void {
throw new Error("Unimplemented");
}
async searchEventIndex(term: string): Promise<{}> {
throw new Error("Unimplemented");
}
async addHistoricEvents(events: [], checkpoint: {} = null, oldCheckpoint: {} = null): Promise<bool> {
throw new Error("Unimplemented");
}
async addCrawlerCheckpoint(checkpoint: {}): Promise<> {
throw new Error("Unimplemented");
}
async removeCrawlerCheckpoint(checkpoint: {}): Promise<> {
throw new Error("Unimplemented");
}
async deleteEventIndex(): Promise<> {
throw new Error("Unimplemented");
getEventIndexingManager(): BaseEventIndexManager | null {
return null;
}
}

View File

@ -46,9 +46,11 @@ class EventIndexPeg {
* otherwise.
*/
async init() {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return false;
const indexManager = PlatformPeg.get().getEventIndexingManager();
console.log("Initializing event index, got {}", indexManager);
if (indexManager === null) return false;
console.log("Seshat: Creatingnew EventIndex object", indexManager);
const index = new EventIndex();
const userId = MatrixClientPeg.get().getUserId();

View File

@ -31,19 +31,19 @@ export default class EventIndexer {
}
async init(userId) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return false;
platform.initEventIndex(userId);
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager === null) return false;
indexManager.initEventIndex(userId);
return true;
}
async onSync(state, prevState, data) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager === null) return;
if (prevState === null && state === "PREPARED") {
// Load our stored checkpoints, if any.
this.crawlerChekpoints = await platform.loadCheckpoints();
this.crawlerChekpoints = await indexManager.loadCheckpoints();
console.log("Seshat: Loaded checkpoints",
this.crawlerChekpoints);
return;
@ -85,8 +85,8 @@ export default class EventIndexer {
direction: "f",
};
await platform.addCrawlerCheckpoint(backCheckpoint);
await platform.addCrawlerCheckpoint(forwardCheckpoint);
await indexManager.addCrawlerCheckpoint(backCheckpoint);
await indexManager.addCrawlerCheckpoint(forwardCheckpoint);
this.crawlerChekpoints.push(backCheckpoint);
this.crawlerChekpoints.push(forwardCheckpoint);
}));
@ -95,7 +95,7 @@ export default class EventIndexer {
// If our indexer is empty we're most likely running Riot the
// first time with indexing support or running it with an
// initial sync. Add checkpoints to crawl our encrypted rooms.
const eventIndexWasEmpty = await platform.isEventIndexEmpty();
const eventIndexWasEmpty = await indexManager.isEventIndexEmpty();
if (eventIndexWasEmpty) await addInitialCheckpoints();
// Start our crawler.
@ -107,14 +107,14 @@ export default class EventIndexer {
// A sync was done, presumably we queued up some live events,
// commit them now.
console.log("Seshat: Committing events");
await platform.commitLiveEvents();
await indexManager.commitLiveEvents();
return;
}
}
async onRoomTimeline(ev, room, toStartOfTimeline, removed, data) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager === null) return;
// We only index encrypted rooms locally.
if (!MatrixClientPeg.get().isRoomEncrypted(room.roomId)) return;
@ -139,8 +139,8 @@ export default class EventIndexer {
}
async onEventDecrypted(ev, err) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager === null) return;
const eventId = ev.getId();
@ -151,8 +151,8 @@ export default class EventIndexer {
}
async addLiveEventToIndex(ev) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager === null) return;
if (["m.room.message", "m.room.name", "m.room.topic"]
.indexOf(ev.getType()) == -1) {
@ -165,7 +165,7 @@ export default class EventIndexer {
avatar_url: ev.sender.getMxcAvatarUrl(),
};
platform.addEventToIndex(e, profile);
indexManager.addEventToIndex(e, profile);
}
async crawlerFunc(handle) {
@ -180,7 +180,7 @@ export default class EventIndexer {
console.log("Seshat: Started crawler function");
const client = MatrixClientPeg.get();
const platform = PlatformPeg.get();
const indexManager = PlatformPeg.get().getEventIndexingManager();
handle.cancel = () => {
cancelled = true;
@ -223,14 +223,14 @@ export default class EventIndexer {
} catch (e) {
console.log("Seshat: Error crawling events:", e);
this.crawlerChekpoints.push(checkpoint);
continue
continue;
}
if (res.chunk.length === 0) {
console.log("Seshat: Done with the checkpoint", checkpoint);
// We got to the start/end of our timeline, lets just
// delete our checkpoint and go back to sleep.
await platform.removeCrawlerCheckpoint(checkpoint);
await indexManager.removeCrawlerCheckpoint(checkpoint);
continue;
}
@ -323,7 +323,7 @@ export default class EventIndexer {
);
try {
const eventsAlreadyAdded = await platform.addHistoricEvents(
const eventsAlreadyAdded = await indexManager.addHistoricEvents(
events, newCheckpoint, checkpoint);
// If all events were already indexed we assume that we catched
// up with our index and don't need to crawl the room further.
@ -332,7 +332,7 @@ export default class EventIndexer {
if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) {
console.log("Seshat: Checkpoint had already all events",
"added, stopping the crawl", checkpoint);
await platform.removeCrawlerCheckpoint(newCheckpoint);
await indexManager.removeCrawlerCheckpoint(newCheckpoint);
} else {
this.crawlerChekpoints.push(newCheckpoint);
}
@ -348,8 +348,8 @@ export default class EventIndexer {
}
async addCheckpointForLimitedRoom(room) {
const platform = PlatformPeg.get();
if (!platform.supportsEventIndexing()) return;
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager === null) return;
if (!MatrixClientPeg.get().isRoomEncrypted(room.roomId)) return;
const timeline = room.getLiveTimeline();
@ -372,19 +372,19 @@ export default class EventIndexer {
console.log("Seshat: Added checkpoint because of a limited timeline",
backwardsCheckpoint, forwardsCheckpoint);
await platform.addCrawlerCheckpoint(backwardsCheckpoint);
await platform.addCrawlerCheckpoint(forwardsCheckpoint);
await indexManager.addCrawlerCheckpoint(backwardsCheckpoint);
await indexManager.addCrawlerCheckpoint(forwardsCheckpoint);
this.crawlerChekpoints.push(backwardsCheckpoint);
this.crawlerChekpoints.push(forwardsCheckpoint);
}
async deleteEventIndex() {
const platform = PlatformPeg.get();
if (platform.supportsEventIndexing()) {
const indexManager = PlatformPeg.get().getEventIndexingManager();
if (indexManager !== null) {
console.log("Seshat: Deleting event index.");
this.crawlerRef.cancel();
await platform.deleteEventIndex();
await indexManager.deleteEventIndex();
}
}
@ -400,7 +400,7 @@ export default class EventIndexer {
}
async search(searchArgs) {
const platform = PlatformPeg.get();
return platform.searchEventIndex(searchArgs)
const indexManager = PlatformPeg.get().getEventIndexingManager();
return indexManager.searchEventIndex(searchArgs);
}
}