From 38bcf13e1e49b2a79aedf0e1025b0b42c249e0b7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 25 Jun 2021 01:04:38 -0500 Subject: [PATCH] Add base starting insertion point when no chunk ID is provided --- scripts-dev/complement.sh | 2 +- synapse/events/utils.py | 14 +++---- synapse/rest/client/v1/room.py | 76 +++++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 28 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 1e3bf357f7..0b00e2f8ce 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Historical_messages_are_visible_when_joining_on_federated_server +go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Backfilled_historical_events_resolve_with_proper_state_in_correct_order diff --git a/synapse/events/utils.py b/synapse/events/utils.py index ec96999e4e..9c115758e9 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -253,13 +253,13 @@ def format_event_for_client_v1(d): def format_event_for_client_v2(d): drop_keys = ( - "auth_events", - "prev_events", - "hashes", - "signatures", - "depth", - "origin", - "prev_state", + # "auth_events", + # "prev_events", + # "hashes", + # "signatures", + # "depth", + # "origin", + # "prev_state", ) for key in drop_keys: d.pop(key, None) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 92ebe838fd..25af9cd429 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -349,6 +349,30 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): return depth + def _create_insertion_event_dict(self, sender: str, origin_server_ts: int): + """ + Creates an event dict for an "insertion" event with the proper fields + and a random chunk ID. + Args: + sender: The event author MXID + origin_server_ts: Timestamp when the event was sent + Returns: + Tuple of event ID and stream ordering position + """ + + next_chunk_id = random_string(64) + insertion_event = { + "type": EventTypes.MSC2716_INSERTION, + "sender": sender, + "content": { + EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, + EventContentFields.MSC2716_HISTORICAL: True, + }, + "origin_server_ts": origin_server_ts, + } + + return insertion_event + async def on_POST(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=False) @@ -449,30 +473,40 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): events_to_create = body["events"] - # If provided, connect the chunk to the last insertion point - # The chunk ID passed in comes from the chunk_id in the - # "insertion" event from the previous chunk. + # Figure out which chunk to connect to. If they passed in + # chunk_id_from_query let's use it. The chunk ID passed in comes + # from the chunk_id in the "insertion" event from the previous chunk. + last_event_in_chunk = events_to_create[-1] + chunk_id_to_connect_to = chunk_id_from_query if chunk_id_from_query: - last_event_in_chunk = events_to_create[-1] - last_event_in_chunk["content"][ - EventContentFields.MSC2716_CHUNK_ID - ] = chunk_id_from_query + # TODO: Verify the chunk_id_from_query corresponds to an insertion event + pass + # Otherwise, create an insertion event to be based off of and connect + # to as a starting point. + else: + base_insertion_event = self._create_insertion_event_dict( + sender=requester.user.to_string(), + origin_server_ts=last_event_in_chunk["origin_server_ts"], + ) + events_to_create.append(base_insertion_event) + chunk_id_to_connect_to = base_insertion_event["content"][ + EventContentFields.MSC2716_NEXT_CHUNK_ID + ] - # Add an "insertion" event to the start of each chunk (next to the oldest + # Connect this current chunk to the insertion event from the previous chunk + last_event_in_chunk["content"][ + EventContentFields.MSC2716_CHUNK_ID + ] = chunk_id_to_connect_to + + # Add an "insertion" event to the start of each chunk (next to the oldest-in-time # event in the chunk) so the next chunk can be connected to this one. - next_chunk_id = random_string(64) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": requester.user.to_string(), - "content": { - EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, + insertion_event = self._create_insertion_event_dict( + sender=requester.user.to_string(), # Since the insertion event is put at the start of the chunk, - # where the oldest event is, copy the origin_server_ts from + # where the oldest-in-time event is, copy the origin_server_ts from # the first event we're inserting - "origin_server_ts": events_to_create[0]["origin_server_ts"], - } + origin_server_ts=events_to_create[0]["origin_server_ts"], + ) # Prepend the insertion event to the start of the chunk events_to_create = [insertion_event] + events_to_create @@ -536,7 +570,9 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): return 200, { "state_events": auth_event_ids, "events": event_ids, - "next_chunk_id": next_chunk_id, + "next_chunk_id": insertion_event["content"][ + EventContentFields.MSC2716_NEXT_CHUNK_ID + ], } def on_GET(self, request, room_id):