Clean up PR
parent
ab8011bb5d
commit
f20ba0264c
|
@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
|
|||
fi
|
||||
|
||||
# Run the tests!
|
||||
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory/parallel/Historical_messages_are_visible_when_already_joined_on_federated_server
|
||||
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
|
||||
|
|
|
@ -253,13 +253,13 @@ def format_event_for_client_v1(d):
|
|||
|
||||
def format_event_for_client_v2(d):
|
||||
drop_keys = (
|
||||
# "auth_events",
|
||||
# "prev_events",
|
||||
# "hashes",
|
||||
# "signatures",
|
||||
# "depth",
|
||||
# "origin",
|
||||
# "prev_state",
|
||||
"auth_events",
|
||||
"prev_events",
|
||||
"hashes",
|
||||
"signatures",
|
||||
"depth",
|
||||
"origin",
|
||||
"prev_state",
|
||||
)
|
||||
for key in drop_keys:
|
||||
d.pop(key, None)
|
||||
|
|
|
@ -1054,14 +1054,10 @@ class FederationHandler(BaseHandler):
|
|||
with (await self._room_backfill.queue(room_id)):
|
||||
return await self._maybe_backfill_inner(room_id, current_depth, limit)
|
||||
|
||||
# Todo
|
||||
async def _maybe_backfill_inner(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
) -> bool:
|
||||
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
|
||||
logger.info(
|
||||
"_maybe_backfill_inner extremities(%d)=%s", len(extremities), extremities
|
||||
)
|
||||
|
||||
if not extremities:
|
||||
logger.debug("Not backfilling as no extremeties found.")
|
||||
|
@ -2127,18 +2123,8 @@ class FederationHandler(BaseHandler):
|
|||
limit = min(limit, 100)
|
||||
|
||||
events = await self.store.get_backfill_events(room_id, pdu_list, limit)
|
||||
logger.info(
|
||||
"on_backfill_request get_backfill_events events(%d)=%s",
|
||||
len(events),
|
||||
[f'{ev.content.get("body")}: {ev.type} ({ev.event_id})' for ev in events],
|
||||
)
|
||||
|
||||
events = await filter_events_for_server(self.storage, origin, events)
|
||||
logger.info(
|
||||
"on_backfill_request filter_events_for_server events(%d)=%s",
|
||||
len(events),
|
||||
events,
|
||||
)
|
||||
|
||||
return events
|
||||
|
||||
|
|
|
@ -1009,8 +1009,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
connected_insertion_event_query, (event_id, limit - len(event_results))
|
||||
)
|
||||
connected_insertion_event_id_results = list(txn)
|
||||
logger.info(
|
||||
"connected_insertion_event_query %s",
|
||||
logger.debug(
|
||||
"_get_backfill_events: connected_insertion_event_query %s",
|
||||
connected_insertion_event_id_results,
|
||||
)
|
||||
for row in connected_insertion_event_id_results:
|
||||
|
@ -1022,8 +1022,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
chunk_connection_query, (row[1], limit - len(event_results))
|
||||
)
|
||||
chunk_start_event_id_results = list(txn)
|
||||
logger.info(
|
||||
"chunk_start_event_id_results %s",
|
||||
logger.debug(
|
||||
"_get_backfill_events: chunk_start_event_id_results %s",
|
||||
chunk_start_event_id_results,
|
||||
)
|
||||
for row in chunk_start_event_id_results:
|
||||
|
@ -1032,7 +1032,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
|
||||
txn.execute(query, (event_id, False, limit - len(event_results)))
|
||||
prev_event_id_results = list(txn)
|
||||
logger.info("prev_event_ids %s", prev_event_id_results)
|
||||
logger.debug(
|
||||
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
|
||||
)
|
||||
|
||||
for row in prev_event_id_results:
|
||||
if row[1] not in event_results:
|
||||
|
|
|
@ -1505,7 +1505,6 @@ class PersistEventsStore:
|
|||
self._handle_event_relations(txn, event)
|
||||
|
||||
self._handle_insertion_event(txn, event)
|
||||
self._handle_marker_event(txn, event)
|
||||
self._handle_chunk_id(txn, event)
|
||||
|
||||
# Store the labels for this event.
|
||||
|
@ -1760,19 +1759,19 @@ class PersistEventsStore:
|
|||
if rel_type == RelationTypes.REPLACE:
|
||||
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
||||
|
||||
def _handle_insertion_event(self, txn, event):
|
||||
def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
|
||||
"""Handles inserting insertion extremeties during peristence of marker events
|
||||
|
||||
Args:
|
||||
txn
|
||||
event (EventBase)
|
||||
txn: The database transaction object
|
||||
event: The event to process
|
||||
"""
|
||||
|
||||
if event.type != EventTypes.MSC2716_INSERTION:
|
||||
# Not a insertion event
|
||||
return
|
||||
|
||||
logger.info("_handle_insertion_event %s", event)
|
||||
logger.debug("_handle_insertion_event %s", event)
|
||||
|
||||
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
|
||||
if next_chunk_id is None:
|
||||
|
@ -1802,64 +1801,13 @@ class PersistEventsStore:
|
|||
},
|
||||
)
|
||||
|
||||
def _handle_marker_event(self, txn, event):
|
||||
"""Handles inserting insertion extremeties during peristence of marker events
|
||||
|
||||
Args:
|
||||
txn
|
||||
event (EventBase)
|
||||
"""
|
||||
|
||||
if event.type != EventTypes.MSC2716_MARKER:
|
||||
# Not a marker event
|
||||
return
|
||||
|
||||
logger.info("_handle_marker_event %s", event)
|
||||
|
||||
# TODO: We should attempt to backfill the insertion event instead
|
||||
# of trying to pack all of the info in the marker event. Otherwise,
|
||||
# we need to pack in the insertion_prev_events and insertion_next_chunk_id.
|
||||
# GET /_matrix/federation/v1/event/{eventId}
|
||||
|
||||
insertion_event_id = event.content.get(
|
||||
EventContentFields.MSC2716_MARKER_INSERTION
|
||||
)
|
||||
|
||||
# We will trust that the application service sending the marker event is
|
||||
# also the one that knows about the insertion event
|
||||
# insertion_event_origin = get_domain_from_id(event.sender)
|
||||
# m_ev = await self.federation_client.get_event(
|
||||
# [insertion_event_origin],
|
||||
# insertion_event_id,
|
||||
# outlier=True,
|
||||
# timeout=10000,
|
||||
# )
|
||||
# _auth_and_persist_events
|
||||
# handle_new_client_event
|
||||
|
||||
# insertion_prev_event_ids = event.content.get(
|
||||
# EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
|
||||
# )
|
||||
# if not insertion_event_id or not insertion_prev_event_ids:
|
||||
# # Invalid marker event
|
||||
# return
|
||||
|
||||
# for prev_event_id in insertion_prev_event_ids:
|
||||
# self.db_pool.simple_insert_txn(
|
||||
# txn,
|
||||
# table="insertion_event_edges",
|
||||
# values={
|
||||
# "insertion_event_id": insertion_event_id,
|
||||
# "room_id": event.room_id,
|
||||
# "insertion_prev_event_id": prev_event_id,
|
||||
# },
|
||||
# )
|
||||
|
||||
def _handle_chunk_id(self, txn, event):
|
||||
def _handle_chunk_id(self, txn: LoggingTransaction, event: EventBase):
|
||||
"""Handles inserting the chunk connections between the event at the
|
||||
start of a chunk and an insertion event
|
||||
|
||||
Args: txn event (EventBase)
|
||||
Args:
|
||||
txn: The database transaction object
|
||||
event: The event to process
|
||||
"""
|
||||
|
||||
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
|
||||
|
@ -1867,7 +1815,7 @@ class PersistEventsStore:
|
|||
# No chunk connection to persist
|
||||
return
|
||||
|
||||
logger.info("_handle_chunk_id %s %s", chunk_id, event)
|
||||
logger.debug("_handle_chunk_id %s %s", chunk_id, event)
|
||||
|
||||
# Keep track of the insertion event and the chunk ID
|
||||
self.db_pool.simple_insert_txn(
|
||||
|
|
Loading…
Reference in New Issue