Use a background task to update databases to use the full text search
							parent
							
								
									36c58b18a3
								
							
						
					
					
						commit
						90b503216c
					
				|  | @ -22,7 +22,7 @@ import ujson | |||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| POSTGRES_SQL = """ | ||||
| POSTGRES_TABLE = """ | ||||
| CREATE TABLE IF NOT EXISTS event_search ( | ||||
|     event_id TEXT, | ||||
|     room_id TEXT, | ||||
|  | @ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search ( | |||
|     vector tsvector | ||||
| ); | ||||
| 
 | ||||
| INSERT INTO event_search SELECT | ||||
|     event_id, room_id, json::json->>'sender', 'content.body', | ||||
|     to_tsvector('english', json::json->'content'->>'body') | ||||
|     FROM events NATURAL JOIN event_json WHERE type = 'm.room.message'; | ||||
| 
 | ||||
| INSERT INTO event_search SELECT | ||||
|     event_id, room_id, json::json->>'sender', 'content.name', | ||||
|     to_tsvector('english', json::json->'content'->>'name') | ||||
|     FROM events NATURAL JOIN event_json WHERE type = 'm.room.name'; | ||||
| 
 | ||||
| INSERT INTO event_search SELECT | ||||
|     event_id, room_id, json::json->>'sender', 'content.topic', | ||||
|     to_tsvector('english', json::json->'content'->>'topic') | ||||
|     FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic'; | ||||
| 
 | ||||
| 
 | ||||
| CREATE INDEX event_search_fts_idx ON event_search USING gin(vector); | ||||
| CREATE INDEX event_search_ev_idx ON event_search(event_id); | ||||
| CREATE INDEX event_search_ev_ridx ON event_search(room_id); | ||||
|  | @ -61,67 +45,31 @@ SQLITE_TABLE = ( | |||
| 
 | ||||
| def run_upgrade(cur, database_engine, *args, **kwargs): | ||||
|     if isinstance(database_engine, PostgresEngine): | ||||
|         run_postgres_upgrade(cur) | ||||
|         for statement in get_statements(POSTGRES_TABLE.splitlines()): | ||||
|             cur.execute(statement) | ||||
|         return | ||||
| 
 | ||||
|     if isinstance(database_engine, Sqlite3Engine): | ||||
|         run_sqlite_upgrade(cur) | ||||
|         cur.execute(SQLITE_TABLE) | ||||
|         return | ||||
| 
 | ||||
|     cur.execute("SELECT MIN(stream_ordering) FROM events") | ||||
|     rows = cur.fetchall() | ||||
|     min_stream_id = rows[0][0] | ||||
| 
 | ||||
| def run_postgres_upgrade(cur): | ||||
|     for statement in get_statements(POSTGRES_SQL.splitlines()): | ||||
|         cur.execute(statement) | ||||
|     cur.execute("SELECT MAX(stream_ordering) FROM events") | ||||
|     rows = cur.fetchall() | ||||
|     max_stream_id = rows[0][0] | ||||
| 
 | ||||
|     if min_stream_id is not None and max_stream_id is not None: | ||||
|         progress = { | ||||
|             "target_min_stream_id_inclusive": min_stream_id, | ||||
|             "max_stream_id_exclusive": max_stream_id + 1, | ||||
|             "rows_inserted": 0, | ||||
|         } | ||||
|         progress_json = ujson.dumps(progress) | ||||
| 
 | ||||
| def run_sqlite_upgrade(cur): | ||||
|         cur.execute(SQLITE_TABLE) | ||||
| 
 | ||||
|         rowid = -1 | ||||
|         while True: | ||||
|             cur.execute( | ||||
|                 "SELECT rowid, json FROM event_json" | ||||
|                 " WHERE rowid > ?" | ||||
|                 " ORDER BY rowid ASC LIMIT 100", | ||||
|                 (rowid,) | ||||
|             ) | ||||
| 
 | ||||
|             res = cur.fetchall() | ||||
| 
 | ||||
|             if not res: | ||||
|                 break | ||||
| 
 | ||||
|             events = [ | ||||
|                 ujson.loads(js) | ||||
|                 for _, js in res | ||||
|             ] | ||||
| 
 | ||||
|             rowid = max(rid for rid, _ in res) | ||||
| 
 | ||||
|             rows = [] | ||||
|             for ev in events: | ||||
|                 content = ev.get("content", {}) | ||||
|                 body = content.get("body", None) | ||||
|                 name = content.get("name", None) | ||||
|                 topic = content.get("topic", None) | ||||
|                 sender = ev.get("sender", None) | ||||
|                 if ev["type"] == "m.room.message" and body: | ||||
|                     rows.append(( | ||||
|                         ev["event_id"], ev["room_id"], sender, "content.body", body | ||||
|                     )) | ||||
|                 if ev["type"] == "m.room.name" and name: | ||||
|                     rows.append(( | ||||
|                         ev["event_id"], ev["room_id"], sender, "content.name", name | ||||
|                     )) | ||||
|                 if ev["type"] == "m.room.topic" and topic: | ||||
|                     rows.append(( | ||||
|                         ev["event_id"], ev["room_id"], sender, "content.topic", topic | ||||
|                     )) | ||||
| 
 | ||||
|             if rows: | ||||
|                 logger.info(rows) | ||||
|                 cur.executemany( | ||||
|                     "INSERT INTO event_search (event_id, room_id, sender, key, value)" | ||||
|                     " VALUES (?,?,?,?,?)", | ||||
|                     rows | ||||
|                 ) | ||||
|         cur.execute( | ||||
|             "INSERT into background_updates (update_name, progress_json)" | ||||
|             " VALUES (?, ?)", ("event_search", progress_json) | ||||
|         ) | ||||
|  |  | |||
|  | @ -37,8 +37,8 @@ class SearchStore(BackgroundUpdateStore): | |||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _background_reindex_search(self, progress, batch_size): | ||||
|         target_min_stream_id = progress["target_min_stream_id"] | ||||
|         max_stream_id = progress["max_stream_id"] | ||||
|         target_min_stream_id = progress["target_min_stream_id_inclusive"] | ||||
|         max_stream_id = progress["max_stream_id_exclusive"] | ||||
|         rows_inserted = progress.get("rows_inserted", 0) | ||||
| 
 | ||||
|         INSERT_CLUMP_SIZE = 1000 | ||||
|  | @ -105,8 +105,8 @@ class SearchStore(BackgroundUpdateStore): | |||
|                 txn.execute_many(sql, clump) | ||||
| 
 | ||||
|             progress = { | ||||
|                 "target_max_stream_id": target_min_stream_id, | ||||
|                 "max_stream_id": min_stream_id, | ||||
|                 "target_min_stream_id_inclusive": target_min_stream_id, | ||||
|                 "max_stream_id_exclusive": min_stream_id, | ||||
|                 "rows_inserted": rows_inserted + len(event_search_rows) | ||||
|             } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Mark Haines
						Mark Haines