From d26094e92cace20525552e5a0c8b21ff9ce53f11 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 8 Jul 2021 20:25:59 -0500 Subject: [PATCH] Add base starting insertion event when no chunk ID is provided (MSC2716) (#10250) * Add base starting insertion point when no chunk ID is provided This is so we can have the marker event point to this initial insertion event and be able to traverse the events in the first chunk. --- changelog.d/10250.bugfix | 1 + synapse/handlers/message.py | 8 +++ synapse/rest/client/v1/room.py | 114 ++++++++++++++++++++++++++------- 3 files changed, 99 insertions(+), 24 deletions(-) create mode 100644 changelog.d/10250.bugfix diff --git a/changelog.d/10250.bugfix b/changelog.d/10250.bugfix new file mode 100644 index 0000000000..a8107dafb2 --- /dev/null +++ b/changelog.d/10250.bugfix @@ -0,0 +1 @@ +Add base starting insertion event when no chunk ID is specified in the historical batch send API. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b960e18c4c..e06655f3d4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -518,6 +518,9 @@ class EventCreationHandler: outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. + historical: Indicates whether the message is being inserted + back in time around some existing events. This is used to skip + a few checks and mark the event as backfilled. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -772,6 +775,7 @@ class EventCreationHandler: txn_id: Optional[str] = None, ignore_shadow_ban: bool = False, outlier: bool = False, + historical: bool = False, depth: Optional[int] = None, ) -> Tuple[EventBase, int]: """ @@ -799,6 +803,9 @@ class EventCreationHandler: outlier: Indicates whether the event is an `outlier`, i.e. if it's from an arbitrary point and floating in the DAG as opposed to being inline with the current DAG. + historical: Indicates whether the message is being inserted + back in time around some existing events. This is used to skip + a few checks and mark the event as backfilled. depth: Override the depth used to order the event in the DAG. Should normally be set to None, which will cause the depth to be calculated based on the prev_events. @@ -847,6 +854,7 @@ class EventCreationHandler: prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, outlier=outlier, + historical=historical, depth=depth, ) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 92ebe838fd..9c58e3689e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -349,6 +349,35 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): return depth + def _create_insertion_event_dict( + self, sender: str, room_id: str, origin_server_ts: int + ): + """Creates an event dict for an "insertion" event with the proper fields + and a random chunk ID. + + Args: + sender: The event author MXID + room_id: The room ID that the event belongs to + origin_server_ts: Timestamp when the event was sent + + Returns: + Tuple of event ID and stream ordering position + """ + + next_chunk_id = random_string(8) + insertion_event = { + "type": EventTypes.MSC2716_INSERTION, + "sender": sender, + "room_id": room_id, + "content": { + EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, + EventContentFields.MSC2716_HISTORICAL: True, + }, + "origin_server_ts": origin_server_ts, + } + + return insertion_event + async def on_POST(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=False) @@ -449,37 +478,68 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): events_to_create = body["events"] - # If provided, connect the chunk to the last insertion point - # The chunk ID passed in comes from the chunk_id in the - # "insertion" event from the previous chunk. - if chunk_id_from_query: - last_event_in_chunk = events_to_create[-1] - last_event_in_chunk["content"][ - EventContentFields.MSC2716_CHUNK_ID - ] = chunk_id_from_query + prev_event_ids = prev_events_from_query + inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query) - # Add an "insertion" event to the start of each chunk (next to the oldest + # Figure out which chunk to connect to. If they passed in + # chunk_id_from_query let's use it. The chunk ID passed in comes + # from the chunk_id in the "insertion" event from the previous chunk. + last_event_in_chunk = events_to_create[-1] + chunk_id_to_connect_to = chunk_id_from_query + base_insertion_event = None + if chunk_id_from_query: + # TODO: Verify the chunk_id_from_query corresponds to an insertion event + pass + # Otherwise, create an insertion event to act as a starting point. + # + # We don't always have an insertion event to start hanging more history + # off of (ideally there would be one in the main DAG, but that's not the + # case if we're wanting to add history to e.g. existing rooms without + # an insertion event), in which case we just create a new insertion event + # that can then get pointed to by a "marker" event later. + else: + base_insertion_event_dict = self._create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + origin_server_ts=last_event_in_chunk["origin_server_ts"], + ) + base_insertion_event_dict["prev_events"] = prev_event_ids.copy() + + ( + base_insertion_event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, + base_insertion_event_dict, + prev_event_ids=base_insertion_event_dict.get("prev_events"), + auth_event_ids=auth_event_ids, + historical=True, + depth=inherited_depth, + ) + + chunk_id_to_connect_to = base_insertion_event["content"][ + EventContentFields.MSC2716_NEXT_CHUNK_ID + ] + + # Connect this current chunk to the insertion event from the previous chunk + last_event_in_chunk["content"][ + EventContentFields.MSC2716_CHUNK_ID + ] = chunk_id_to_connect_to + + # Add an "insertion" event to the start of each chunk (next to the oldest-in-time # event in the chunk) so the next chunk can be connected to this one. - next_chunk_id = random_string(64) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": requester.user.to_string(), - "content": { - EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, + insertion_event = self._create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, # Since the insertion event is put at the start of the chunk, - # where the oldest event is, copy the origin_server_ts from + # where the oldest-in-time event is, copy the origin_server_ts from # the first event we're inserting - "origin_server_ts": events_to_create[0]["origin_server_ts"], - } + origin_server_ts=events_to_create[0]["origin_server_ts"], + ) # Prepend the insertion event to the start of the chunk events_to_create = [insertion_event] + events_to_create - inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query) - event_ids = [] - prev_event_ids = prev_events_from_query events_to_persist = [] for ev in events_to_create: assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) @@ -533,10 +593,16 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): context=context, ) + # Add the base_insertion_event to the bottom of the list we return + if base_insertion_event is not None: + event_ids.append(base_insertion_event.event_id) + return 200, { "state_events": auth_event_ids, "events": event_ids, - "next_chunk_id": next_chunk_id, + "next_chunk_id": insertion_event["content"][ + EventContentFields.MSC2716_NEXT_CHUNK_ID + ], } def on_GET(self, request, room_id):