Add base starting insertion point when no chunk ID is provided
parent
2d942ec0c1
commit
38bcf13e1e
|
@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
|
|||
fi
|
||||
|
||||
# Run the tests!
|
||||
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Historical_messages_are_visible_when_joining_on_federated_server
|
||||
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Backfilled_historical_events_resolve_with_proper_state_in_correct_order
|
||||
|
|
|
@ -253,13 +253,13 @@ def format_event_for_client_v1(d):
|
|||
|
||||
def format_event_for_client_v2(d):
|
||||
drop_keys = (
|
||||
"auth_events",
|
||||
"prev_events",
|
||||
"hashes",
|
||||
"signatures",
|
||||
"depth",
|
||||
"origin",
|
||||
"prev_state",
|
||||
# "auth_events",
|
||||
# "prev_events",
|
||||
# "hashes",
|
||||
# "signatures",
|
||||
# "depth",
|
||||
# "origin",
|
||||
# "prev_state",
|
||||
)
|
||||
for key in drop_keys:
|
||||
d.pop(key, None)
|
||||
|
|
|
@ -349,6 +349,30 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
|||
|
||||
return depth
|
||||
|
||||
def _create_insertion_event_dict(self, sender: str, origin_server_ts: int):
|
||||
"""
|
||||
Creates an event dict for an "insertion" event with the proper fields
|
||||
and a random chunk ID.
|
||||
Args:
|
||||
sender: The event author MXID
|
||||
origin_server_ts: Timestamp when the event was sent
|
||||
Returns:
|
||||
Tuple of event ID and stream ordering position
|
||||
"""
|
||||
|
||||
next_chunk_id = random_string(64)
|
||||
insertion_event = {
|
||||
"type": EventTypes.MSC2716_INSERTION,
|
||||
"sender": sender,
|
||||
"content": {
|
||||
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
|
||||
EventContentFields.MSC2716_HISTORICAL: True,
|
||||
},
|
||||
"origin_server_ts": origin_server_ts,
|
||||
}
|
||||
|
||||
return insertion_event
|
||||
|
||||
async def on_POST(self, request, room_id):
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=False)
|
||||
|
||||
|
@ -449,30 +473,40 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
|||
|
||||
events_to_create = body["events"]
|
||||
|
||||
# If provided, connect the chunk to the last insertion point
|
||||
# The chunk ID passed in comes from the chunk_id in the
|
||||
# "insertion" event from the previous chunk.
|
||||
# Figure out which chunk to connect to. If they passed in
|
||||
# chunk_id_from_query let's use it. The chunk ID passed in comes
|
||||
# from the chunk_id in the "insertion" event from the previous chunk.
|
||||
last_event_in_chunk = events_to_create[-1]
|
||||
chunk_id_to_connect_to = chunk_id_from_query
|
||||
if chunk_id_from_query:
|
||||
last_event_in_chunk = events_to_create[-1]
|
||||
last_event_in_chunk["content"][
|
||||
EventContentFields.MSC2716_CHUNK_ID
|
||||
] = chunk_id_from_query
|
||||
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
|
||||
pass
|
||||
# Otherwise, create an insertion event to be based off of and connect
|
||||
# to as a starting point.
|
||||
else:
|
||||
base_insertion_event = self._create_insertion_event_dict(
|
||||
sender=requester.user.to_string(),
|
||||
origin_server_ts=last_event_in_chunk["origin_server_ts"],
|
||||
)
|
||||
events_to_create.append(base_insertion_event)
|
||||
chunk_id_to_connect_to = base_insertion_event["content"][
|
||||
EventContentFields.MSC2716_NEXT_CHUNK_ID
|
||||
]
|
||||
|
||||
# Add an "insertion" event to the start of each chunk (next to the oldest
|
||||
# Connect this current chunk to the insertion event from the previous chunk
|
||||
last_event_in_chunk["content"][
|
||||
EventContentFields.MSC2716_CHUNK_ID
|
||||
] = chunk_id_to_connect_to
|
||||
|
||||
# Add an "insertion" event to the start of each chunk (next to the oldest-in-time
|
||||
# event in the chunk) so the next chunk can be connected to this one.
|
||||
next_chunk_id = random_string(64)
|
||||
insertion_event = {
|
||||
"type": EventTypes.MSC2716_INSERTION,
|
||||
"sender": requester.user.to_string(),
|
||||
"content": {
|
||||
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
|
||||
EventContentFields.MSC2716_HISTORICAL: True,
|
||||
},
|
||||
insertion_event = self._create_insertion_event_dict(
|
||||
sender=requester.user.to_string(),
|
||||
# Since the insertion event is put at the start of the chunk,
|
||||
# where the oldest event is, copy the origin_server_ts from
|
||||
# where the oldest-in-time event is, copy the origin_server_ts from
|
||||
# the first event we're inserting
|
||||
"origin_server_ts": events_to_create[0]["origin_server_ts"],
|
||||
}
|
||||
origin_server_ts=events_to_create[0]["origin_server_ts"],
|
||||
)
|
||||
# Prepend the insertion event to the start of the chunk
|
||||
events_to_create = [insertion_event] + events_to_create
|
||||
|
||||
|
@ -536,7 +570,9 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
|||
return 200, {
|
||||
"state_events": auth_event_ids,
|
||||
"events": event_ids,
|
||||
"next_chunk_id": next_chunk_id,
|
||||
"next_chunk_id": insertion_event["content"][
|
||||
EventContentFields.MSC2716_NEXT_CHUNK_ID
|
||||
],
|
||||
}
|
||||
|
||||
def on_GET(self, request, room_id):
|
||||
|
|
Loading…
Reference in New Issue