From 435f074541a15d18b751b6a3cbf537f89d2eb19e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 16 Jul 2021 17:24:18 -0500 Subject: [PATCH] Process markers when we receive it over federation --- synapse/handlers/federation.py | 53 +++++++++++++++++++ .../databases/main/event_federation.py | 6 +-- synapse/storage/databases/main/events.py | 50 +---------------- 3 files changed, 58 insertions(+), 51 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 991ec9919a..41ecd8cebc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -42,6 +42,7 @@ from twisted.internet import defer from synapse import event_auth from synapse.api.constants import ( + EventContentFields, EventTypes, Membership, RejectedReason, @@ -263,6 +264,7 @@ class FederationHandler(BaseHandler): state = None # Get missing pdus if necessary. + # We don't need to worry about outliers because TODO! if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. min_depth = await self.get_min_depth_for_context(pdu.room_id) @@ -889,6 +891,57 @@ class FederationHandler(BaseHandler): "resync_device_due_to_pdu", self._resync_device, event.sender ) + await self._handle_marker_event(origin, event) + + async def _handle_marker_event(self, origin: str, marker_event: EventBase): + """Handles backfilling the insertion event when we receive a marker + event that points to one + + Args: + origin: Origin of the event. Will be called to get the insertion event + event: The event to process + """ + + if marker_event.type != EventTypes.MSC2716_MARKER: + # Not a marker event + return + + logger.info("_handle_marker_event: received %s", marker_event) + + insertion_event_id = marker_event.content.get( + EventContentFields.MSC2716_MARKER_INSERTION + ) + + if insertion_event_id is None: + # Nothing to retrieve then (invalid marker) + return + + logger.info( + "_handle_marker_event: backfilling insertion event %s", insertion_event_id + ) + + await self._get_events_and_persist( + origin, + marker_event.room_id, + [insertion_event_id], + ) + + insertion_event = await self.store.get_event(insertion_event_id, allow_none=True) + if insertion_event is None: + logger.warning( + "_handle_marker_event: server %s didn't return insertion event %s for marker %s", + origin, + insertion_event_id, + marker_event.event_id, + ) + return + + logger.info( + "_handle_marker_event: Succesfully backfilled insertion event %s from marker event %s", + insertion_event, + marker_event, + ) + async def _resync_device(self, sender: str) -> None: """We have detected that the device list for the given user may be out of sync, so we try and resync them. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 37b99c7f81..a2460fb7ca 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1005,7 +1005,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas connected_insertion_event_query, (event_id, limit - len(event_results)) ) connected_insertion_event_id_results = txn.fetchall() - logger.debug( + logger.info( "_get_backfill_events: connected_insertion_event_query %s", connected_insertion_event_id_results, ) @@ -1020,7 +1020,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas (connected_insertion_event, limit - len(event_results)), ) chunk_start_event_id_results = txn.fetchall() - logger.debug( + logger.info( "_get_backfill_events: chunk_start_event_id_results %s", chunk_start_event_id_results, ) @@ -1030,7 +1030,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas txn.execute(query, (event_id, False, limit - len(event_results))) prev_event_id_results = txn.fetchall() - logger.debug( + logger.info( "_get_backfill_events: prev_event_ids %s", prev_event_id_results ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ec70c5b925..201301440f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1506,7 +1506,6 @@ class PersistEventsStore: self._handle_insertion_event(txn, event) self._handle_chunk_id(txn, event) - self._handle_marker_event(txn, event) # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) @@ -1778,7 +1777,7 @@ class PersistEventsStore: # Invalid insertion event without next chunk ID return - logger.debug( + logger.info( "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event ) @@ -1819,7 +1818,7 @@ class PersistEventsStore: # No chunk connection to persist return - logger.debug("_handle_chunk_id %s %s", chunk_id, event) + logger.info("_handle_chunk_id %s %s", chunk_id, event) # Keep track of the insertion event and the chunk ID self.db_pool.simple_insert_txn( @@ -1832,51 +1831,6 @@ class PersistEventsStore: }, ) - def _handle_marker_event(self, txn, event): - """Handles backfilling the insertion event when we receive a marker - event that points to one - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_MARKER: - # Not a marker event - return - - logger.info("_handle_marker_event %s", event) - - # TODO: We should attempt to backfill the insertion event instead - # of trying to pack all of the info in the marker event. Otherwise, - # we need to pack in the insertion_prev_events and insertion_next_chunk_id. - # GET /_matrix/federation/v1/event/{eventId} - - insertion_event_id = event.content.get( - EventContentFields.MSC2716_MARKER_INSERTION - ) - - async def backfill_insertion_event(): - logger.info("marker -> backfill_insertion_event") - # We will trust that the application service sending the marker event is - # also the one that knows about the insertion event - insertion_event_origin = get_domain_from_id(event.sender) - insertion_event = await self.federation_client.get_event( - [insertion_event_origin], - insertion_event_id, - outlier=True, - timeout=10000, - ) - logger.info("marker -> fetched insertion_event %s", insertion_event) - # _auth_and_persist_events - # handle_new_client_event - - # We don't need to do any processing for a marker event coming from the same homeserver - if self.hs.is_mine_id(event.sender): - # TODO: "Note that simply calling a coroutine will not schedule it to be executed" - # https://docs.python.org/3/library/asyncio-task.html - backfill_insertion_event() - def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove any redacted relations from the database.