Process markers when we receive it over federation
parent
164e32b1d2
commit
435f074541
|
@ -42,6 +42,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from synapse import event_auth
|
from synapse import event_auth
|
||||||
from synapse.api.constants import (
|
from synapse.api.constants import (
|
||||||
|
EventContentFields,
|
||||||
EventTypes,
|
EventTypes,
|
||||||
Membership,
|
Membership,
|
||||||
RejectedReason,
|
RejectedReason,
|
||||||
|
@ -263,6 +264,7 @@ class FederationHandler(BaseHandler):
|
||||||
state = None
|
state = None
|
||||||
|
|
||||||
# Get missing pdus if necessary.
|
# Get missing pdus if necessary.
|
||||||
|
# We don't need to worry about outliers because TODO!
|
||||||
if not pdu.internal_metadata.is_outlier():
|
if not pdu.internal_metadata.is_outlier():
|
||||||
# We only backfill backwards to the min depth.
|
# We only backfill backwards to the min depth.
|
||||||
min_depth = await self.get_min_depth_for_context(pdu.room_id)
|
min_depth = await self.get_min_depth_for_context(pdu.room_id)
|
||||||
|
@ -889,6 +891,57 @@ class FederationHandler(BaseHandler):
|
||||||
"resync_device_due_to_pdu", self._resync_device, event.sender
|
"resync_device_due_to_pdu", self._resync_device, event.sender
|
||||||
)
|
)
|
||||||
|
|
||||||
|
await self._handle_marker_event(origin, event)
|
||||||
|
|
||||||
|
async def _handle_marker_event(self, origin: str, marker_event: EventBase):
|
||||||
|
"""Handles backfilling the insertion event when we receive a marker
|
||||||
|
event that points to one
|
||||||
|
|
||||||
|
Args:
|
||||||
|
origin: Origin of the event. Will be called to get the insertion event
|
||||||
|
event: The event to process
|
||||||
|
"""
|
||||||
|
|
||||||
|
if marker_event.type != EventTypes.MSC2716_MARKER:
|
||||||
|
# Not a marker event
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("_handle_marker_event: received %s", marker_event)
|
||||||
|
|
||||||
|
insertion_event_id = marker_event.content.get(
|
||||||
|
EventContentFields.MSC2716_MARKER_INSERTION
|
||||||
|
)
|
||||||
|
|
||||||
|
if insertion_event_id is None:
|
||||||
|
# Nothing to retrieve then (invalid marker)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
|
||||||
|
)
|
||||||
|
|
||||||
|
await self._get_events_and_persist(
|
||||||
|
origin,
|
||||||
|
marker_event.room_id,
|
||||||
|
[insertion_event_id],
|
||||||
|
)
|
||||||
|
|
||||||
|
insertion_event = await self.store.get_event(insertion_event_id, allow_none=True)
|
||||||
|
if insertion_event is None:
|
||||||
|
logger.warning(
|
||||||
|
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
|
||||||
|
origin,
|
||||||
|
insertion_event_id,
|
||||||
|
marker_event.event_id,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"_handle_marker_event: Succesfully backfilled insertion event %s from marker event %s",
|
||||||
|
insertion_event,
|
||||||
|
marker_event,
|
||||||
|
)
|
||||||
|
|
||||||
async def _resync_device(self, sender: str) -> None:
|
async def _resync_device(self, sender: str) -> None:
|
||||||
"""We have detected that the device list for the given user may be out
|
"""We have detected that the device list for the given user may be out
|
||||||
of sync, so we try and resync them.
|
of sync, so we try and resync them.
|
||||||
|
|
|
@ -1005,7 +1005,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
connected_insertion_event_query, (event_id, limit - len(event_results))
|
connected_insertion_event_query, (event_id, limit - len(event_results))
|
||||||
)
|
)
|
||||||
connected_insertion_event_id_results = txn.fetchall()
|
connected_insertion_event_id_results = txn.fetchall()
|
||||||
logger.debug(
|
logger.info(
|
||||||
"_get_backfill_events: connected_insertion_event_query %s",
|
"_get_backfill_events: connected_insertion_event_query %s",
|
||||||
connected_insertion_event_id_results,
|
connected_insertion_event_id_results,
|
||||||
)
|
)
|
||||||
|
@ -1020,7 +1020,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
(connected_insertion_event, limit - len(event_results)),
|
(connected_insertion_event, limit - len(event_results)),
|
||||||
)
|
)
|
||||||
chunk_start_event_id_results = txn.fetchall()
|
chunk_start_event_id_results = txn.fetchall()
|
||||||
logger.debug(
|
logger.info(
|
||||||
"_get_backfill_events: chunk_start_event_id_results %s",
|
"_get_backfill_events: chunk_start_event_id_results %s",
|
||||||
chunk_start_event_id_results,
|
chunk_start_event_id_results,
|
||||||
)
|
)
|
||||||
|
@ -1030,7 +1030,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
|
|
||||||
txn.execute(query, (event_id, False, limit - len(event_results)))
|
txn.execute(query, (event_id, False, limit - len(event_results)))
|
||||||
prev_event_id_results = txn.fetchall()
|
prev_event_id_results = txn.fetchall()
|
||||||
logger.debug(
|
logger.info(
|
||||||
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
|
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1506,7 +1506,6 @@ class PersistEventsStore:
|
||||||
|
|
||||||
self._handle_insertion_event(txn, event)
|
self._handle_insertion_event(txn, event)
|
||||||
self._handle_chunk_id(txn, event)
|
self._handle_chunk_id(txn, event)
|
||||||
self._handle_marker_event(txn, event)
|
|
||||||
|
|
||||||
# Store the labels for this event.
|
# Store the labels for this event.
|
||||||
labels = event.content.get(EventContentFields.LABELS)
|
labels = event.content.get(EventContentFields.LABELS)
|
||||||
|
@ -1778,7 +1777,7 @@ class PersistEventsStore:
|
||||||
# Invalid insertion event without next chunk ID
|
# Invalid insertion event without next chunk ID
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(
|
logger.info(
|
||||||
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
|
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1819,7 +1818,7 @@ class PersistEventsStore:
|
||||||
# No chunk connection to persist
|
# No chunk connection to persist
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("_handle_chunk_id %s %s", chunk_id, event)
|
logger.info("_handle_chunk_id %s %s", chunk_id, event)
|
||||||
|
|
||||||
# Keep track of the insertion event and the chunk ID
|
# Keep track of the insertion event and the chunk ID
|
||||||
self.db_pool.simple_insert_txn(
|
self.db_pool.simple_insert_txn(
|
||||||
|
@ -1832,51 +1831,6 @@ class PersistEventsStore:
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
def _handle_marker_event(self, txn, event):
|
|
||||||
"""Handles backfilling the insertion event when we receive a marker
|
|
||||||
event that points to one
|
|
||||||
|
|
||||||
Args:
|
|
||||||
txn: The database transaction object
|
|
||||||
event: The event to process
|
|
||||||
"""
|
|
||||||
|
|
||||||
if event.type != EventTypes.MSC2716_MARKER:
|
|
||||||
# Not a marker event
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info("_handle_marker_event %s", event)
|
|
||||||
|
|
||||||
# TODO: We should attempt to backfill the insertion event instead
|
|
||||||
# of trying to pack all of the info in the marker event. Otherwise,
|
|
||||||
# we need to pack in the insertion_prev_events and insertion_next_chunk_id.
|
|
||||||
# GET /_matrix/federation/v1/event/{eventId}
|
|
||||||
|
|
||||||
insertion_event_id = event.content.get(
|
|
||||||
EventContentFields.MSC2716_MARKER_INSERTION
|
|
||||||
)
|
|
||||||
|
|
||||||
async def backfill_insertion_event():
|
|
||||||
logger.info("marker -> backfill_insertion_event")
|
|
||||||
# We will trust that the application service sending the marker event is
|
|
||||||
# also the one that knows about the insertion event
|
|
||||||
insertion_event_origin = get_domain_from_id(event.sender)
|
|
||||||
insertion_event = await self.federation_client.get_event(
|
|
||||||
[insertion_event_origin],
|
|
||||||
insertion_event_id,
|
|
||||||
outlier=True,
|
|
||||||
timeout=10000,
|
|
||||||
)
|
|
||||||
logger.info("marker -> fetched insertion_event %s", insertion_event)
|
|
||||||
# _auth_and_persist_events
|
|
||||||
# handle_new_client_event
|
|
||||||
|
|
||||||
# We don't need to do any processing for a marker event coming from the same homeserver
|
|
||||||
if self.hs.is_mine_id(event.sender):
|
|
||||||
# TODO: "Note that simply calling a coroutine will not schedule it to be executed"
|
|
||||||
# https://docs.python.org/3/library/asyncio-task.html
|
|
||||||
backfill_insertion_event()
|
|
||||||
|
|
||||||
def _handle_redaction(self, txn, redacted_event_id):
|
def _handle_redaction(self, txn, redacted_event_id):
|
||||||
"""Handles receiving a redaction and checking whether we need to remove
|
"""Handles receiving a redaction and checking whether we need to remove
|
||||||
any redacted relations from the database.
|
any redacted relations from the database.
|
||||||
|
|
Loading…
Reference in New Issue