From c05e43bf318ed76906091dd0c3ce3bd0bac7f9cd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 12 Jul 2021 20:22:38 -0500 Subject: [PATCH] Add inserted historical messages to /backfill response --- .../databases/main/event_federation.py | 75 ++++++++++++++-- synapse/storage/databases/main/events.py | 85 ++++++++++++++----- .../delta/61/01insertion_event_lookups.sql | 24 ++++-- 3 files changed, 151 insertions(+), 33 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 8b88a01e17..1b65f44139 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -673,11 +673,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) def get_oldest_events_with_depth_in_room_txn(self, txn, room_id): - sqlAsdf = "SELECT * FROM insertion_event_extremeties as i" + sqlAsdf = "SELECT * FROM insertion_event_edges as i" txn.execute(sqlAsdf) logger.info("wfeafewawafeawg %s", dict(txn)) - sqlAsdf = "SELECT * FROM insertion_event_extremeties as i WHERE i.room_id = ?" + sqlAsdf = "SELECT * FROM insertion_event_edges as i WHERE i.room_id = ?" txn.execute(sqlAsdf, (room_id,)) logger.info("awfeawefw %s", dict(txn)) @@ -688,7 +688,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # " INNER JOIN event_backward_extremities as b" # " ON g.prev_event_id = b.event_id" # TODO - " INNER JOIN insertion_event_extremeties as i" + " INNER JOIN insertion_event_edges as i" " ON e.event_id = i.insertion_prev_event_id" " WHERE i.room_id = ?" " GROUP BY i.insertion_event_id" @@ -703,7 +703,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas " INNER JOIN event_backward_extremities as b" " ON g.prev_event_id = b.event_id" # TODO - # " INNER JOIN insertion_event_extremeties as i" + # " INNER JOIN insertion_event_edges as i" # " ON g.event_id = i.insertion_prev_event_id" " WHERE b.room_id = ? AND g.is_state is ?" " GROUP BY b.event_id" @@ -961,16 +961,50 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # We want to make sure that we do a breadth-first, "depth" ordered # search. - # TODO + # Look for the prev_event_id connected to the given event_id query = ( "SELECT depth, prev_event_id FROM event_edges" + # Get the depth of the prev_event_id from the events table " INNER JOIN events" " ON prev_event_id = events.event_id" + # Find an event which matches the given event_id " WHERE event_edges.event_id = ?" " AND event_edges.is_state = ?" " LIMIT ?" ) + # Look for the "insertion" events connected to the given event_id + # TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't + connected_insertion_event_query = ( + "SELECT e.depth, i.insertion_event_id FROM insertion_event_edges AS i" + # Get the depth of the insertion event from the events table + " INNER JOIN events AS e" + " ON e.event_id = i.insertion_event_id" + # Find an insertion event which points via prev_events to the given event_id + " WHERE i.insertion_prev_event_id = ?" + " LIMIT ?" + ) + + # Find any chunk connections of a given insertion event + # TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't + chunk_connection_query = ( + "SELECT e.depth, c.event_id FROM insertion_events AS i" + # Find the chunk that connects to the given insertion event + " INNER JOIN chunk_edges AS c" + " ON i.next_chunk_id = c.chunk_id" + # Get the depth of the chunk start event from the events table + " INNER JOIN events AS e" + " ON e.event_id = c.event_id" + # Find an insertion event which matches the given event_id + " WHERE i.insertion_event_id = ?" + " LIMIT ?" + ) + + # In a PriorityQueue, the lowest valued entries are retrieved first. + # We're using depth as the priority in the queue. + # Depth is lowest at the oldest-in-time message and highest and + # newest-in-time message. We add events to the queue with a negative depth so that + # we process the newest-in-time messages first going backwards in time. queue = PriorityQueue() for event_id in event_list: @@ -996,9 +1030,36 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_results.add(event_id) - txn.execute(query, (event_id, False, limit - len(event_results))) + txn.execute( + connected_insertion_event_query, (event_id, limit - len(event_results)) + ) + connected_insertion_event_id_results = list(txn) + logger.info( + "connected_insertion_event_query %s", + connected_insertion_event_id_results, + ) + for row in connected_insertion_event_id_results: + if row[1] not in event_results: + queue.put((-row[0], row[1])) - for row in txn: + # Find any chunk connections for the given insertion event + txn.execute( + chunk_connection_query, (row[1], limit - len(event_results)) + ) + chunk_start_event_id_results = list(txn) + logger.info( + "chunk_start_event_id_results %s", + chunk_start_event_id_results, + ) + for row in chunk_start_event_id_results: + if row[1] not in event_results: + queue.put((-row[0], row[1])) + + txn.execute(query, (event_id, False, limit - len(event_results))) + prev_event_id_results = list(txn) + logger.info("prev_event_ids %s", prev_event_id_results) + + for row in prev_event_id_results: if row[1] not in event_results: queue.put((-row[0], row[1])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 228dce91b0..6f9e91dafa 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1506,6 +1506,7 @@ class PersistEventsStore: self._handle_insertion_event(txn, event) self._handle_marker_event(txn, event) + self._handle_chunk_id(txn, event) # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) @@ -1773,10 +1774,27 @@ class PersistEventsStore: logger.info("_handle_insertion_event %s", event) + next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID) + if next_chunk_id is None: + # Invalid insertion event without next chunk ID + return + + # Keep track of the insertion event and the chunk ID + self.db_pool.simple_insert_txn( + txn, + table="insertion_events", + values={ + "insertion_event_id": event.event_id, + "room_id": event.room_id, + "next_chunk_id": next_chunk_id, + }, + ) + + # Insert an edge for every prev_event connection for prev_event_id in event.prev_events: self.db_pool.simple_insert_txn( txn, - table="insertion_event_extremeties", + table="insertion_event_edges", values={ "insertion_event_id": event.event_id, "room_id": event.room_id, @@ -1798,26 +1816,55 @@ class PersistEventsStore: logger.info("_handle_marker_event %s", event) - insertion_event_id = event.content.get( - EventContentFields.MSC2716_MARKER_INSERTION - ) - insertion_prev_event_ids = event.content.get( - EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS - ) - if not insertion_event_id or not insertion_prev_event_ids: - # Invalid marker event + # TODO: We should attempt to backfill the insertion event instead + # of trying to pack all of the info in the marker event. Otherwise, + # we need to pack in the insertion_prev_events and insertion_next_chunk_id. + + # insertion_event_id = event.content.get( + # EventContentFields.MSC2716_MARKER_INSERTION + # ) + # insertion_prev_event_ids = event.content.get( + # EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS + # ) + # if not insertion_event_id or not insertion_prev_event_ids: + # # Invalid marker event + # return + + # for prev_event_id in insertion_prev_event_ids: + # self.db_pool.simple_insert_txn( + # txn, + # table="insertion_event_edges", + # values={ + # "insertion_event_id": insertion_event_id, + # "room_id": event.room_id, + # "insertion_prev_event_id": prev_event_id, + # }, + # ) + + def _handle_chunk_id(self, txn, event): + """Handles inserting the chunk connections between the event at the + start of a chunk and an insertion event + + Args: txn event (EventBase) + """ + + chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID) + if chunk_id is None: + # No chunk connection to persist return - for prev_event_id in insertion_prev_event_ids: - self.db_pool.simple_insert_txn( - txn, - table="insertion_event_extremeties", - values={ - "insertion_event_id": insertion_event_id, - "room_id": event.room_id, - "insertion_prev_event_id": prev_event_id, - }, - ) + logger.info("_handle_chunk_id %s %s", chunk_id, event) + + # Keep track of the insertion event and the chunk ID + self.db_pool.simple_insert_txn( + txn, + table="chunk_edges", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "chunk_id": chunk_id, + }, + ) def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove diff --git a/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql index 58b71f1bc4..88cb22ec70 100644 --- a/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql +++ b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql @@ -18,23 +18,33 @@ -- and we hit an event which matches `insertion_prev_event_id`, it should backfill -- the "insertion" event and start navigating from there. +CREATE TABLE IF NOT EXISTS insertion_events( + insertion_event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + next_chunk_id TEXT NOT NULL, + UNIQUE (insertion_event_id, room_id, next_chunk_id) +); -CREATE TABLE IF NOT EXISTS insertion_event_extremeties( +CREATE INDEX IF NOT EXISTS insertion_events_insertion_room_id ON insertion_events(room_id); +CREATE INDEX IF NOT EXISTS insertion_events_insertion_event_id ON insertion_events(insertion_event_id); +CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id); + +CREATE TABLE IF NOT EXISTS insertion_event_edges( insertion_event_id TEXT NOT NULL, room_id TEXT NOT NULL, insertion_prev_event_id TEXT NOT NULL, - UNIQUE (insertion_event_id, room_id, room_id, insertion_prev_event_id) + UNIQUE (insertion_event_id, room_id, insertion_prev_event_id) ); -CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_room_id ON insertion_event_extremeties(room_id); -CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_event_id ON insertion_event_extremeties(insertion_event_id); -CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_prev_event_id ON insertion_event_extremeties(insertion_prev_event_id); +CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id); +CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_event_id ON insertion_event_edges(insertion_event_id); +CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id); -CREATE TABLE IF NOT EXISTS chunk_connections( +CREATE TABLE IF NOT EXISTS chunk_edges( event_id TEXT NOT NULL, room_id TEXT NOT NULL, chunk_id TEXT NOT NULL, UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS chunk_connections_insertion_chunk_id ON chunk_connections(chunk_id); +CREATE INDEX IF NOT EXISTS chunk_edges_chunk_id ON chunk_edges(chunk_id);