diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index aca32edc17..9d08c154da 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests +go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Historical_messages_are_visible_when_already_joined_on_federated_server diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 081d79f1ac..ec70c5b925 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1506,6 +1506,7 @@ class PersistEventsStore: self._handle_insertion_event(txn, event) self._handle_chunk_id(txn, event) + self._handle_marker_event(txn, event) # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) @@ -1831,6 +1832,51 @@ class PersistEventsStore: }, ) + def _handle_marker_event(self, txn, event): + """Handles backfilling the insertion event when we receive a marker + event that points to one + + Args: + txn: The database transaction object + event: The event to process + """ + + if event.type != EventTypes.MSC2716_MARKER: + # Not a marker event + return + + logger.info("_handle_marker_event %s", event) + + # TODO: We should attempt to backfill the insertion event instead + # of trying to pack all of the info in the marker event. Otherwise, + # we need to pack in the insertion_prev_events and insertion_next_chunk_id. + # GET /_matrix/federation/v1/event/{eventId} + + insertion_event_id = event.content.get( + EventContentFields.MSC2716_MARKER_INSERTION + ) + + async def backfill_insertion_event(): + logger.info("marker -> backfill_insertion_event") + # We will trust that the application service sending the marker event is + # also the one that knows about the insertion event + insertion_event_origin = get_domain_from_id(event.sender) + insertion_event = await self.federation_client.get_event( + [insertion_event_origin], + insertion_event_id, + outlier=True, + timeout=10000, + ) + logger.info("marker -> fetched insertion_event %s", insertion_event) + # _auth_and_persist_events + # handle_new_client_event + + # We don't need to do any processing for a marker event coming from the same homeserver + if self.hs.is_mine_id(event.sender): + # TODO: "Note that simply calling a coroutine will not schedule it to be executed" + # https://docs.python.org/3/library/asyncio-task.html + backfill_insertion_event() + def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove any redacted relations from the database.