Backfill remote event fetched by MSC3030 so we can paginate from it later (#13205)

Depends on https://github.com/matrix-org/synapse/pull/13320

Complement tests: https://github.com/matrix-org/complement/pull/406

We could use the same method to backfill for `/context` as well in the future, see https://github.com/matrix-org/synapse/issues/3848
pull/13372/head
Eric Eastwood 2022-07-22 16:00:11 -05:00 committed by GitHub
parent c7c84b81e3
commit 357561c1a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 15 deletions

View File

@ -0,0 +1 @@
Allow pagination from remote event after discovering it from MSC3030 `/timestamp_to_event`.

View File

@ -793,7 +793,7 @@ class FederationEventHandler:
if existing: if existing:
if not existing.internal_metadata.is_outlier(): if not existing.internal_metadata.is_outlier():
logger.info( logger.info(
"Ignoring received event %s which we have already seen", "_process_pulled_event: Ignoring received event %s which we have already seen",
event_id, event_id,
) )
return return
@ -1329,6 +1329,53 @@ class FederationEventHandler:
marker_event, marker_event,
) )
async def backfill_event_id(
self, destination: str, room_id: str, event_id: str
) -> EventBase:
"""Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it.
Args:
destination: The homeserver to pull the given event_id from.
room_id: The room where the event is from.
event_id: The event ID to backfill.
Raises:
FederationError if we are unable to find the event from the destination
"""
logger.info(
"backfill_event_id: event_id=%s from destination=%s", event_id, destination
)
room_version = await self._store.get_room_version(room_id)
event_from_response = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)
if not event_from_response:
raise FederationError(
"ERROR",
404,
"Unable to find event_id=%s from destination=%s to backfill."
% (event_id, destination),
affected=event_id,
)
# Persist the event we just fetched, including pulling all of the state
# and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event.
await self._process_pulled_events(
destination,
[event_from_response],
# Prevent notifications going to clients
backfilled=True,
)
return event_from_response
async def _get_events_and_persist( async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str] self, destination: str, room_id: str, event_ids: Collection[str]
) -> None: ) -> None:

View File

@ -1384,6 +1384,7 @@ class TimestampLookupHandler:
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self.federation_client = hs.get_federation_client() self.federation_client = hs.get_federation_client()
self.federation_event_handler = hs.get_federation_event_handler()
self._storage_controllers = hs.get_storage_controllers() self._storage_controllers = hs.get_storage_controllers()
async def get_event_for_timestamp( async def get_event_for_timestamp(
@ -1479,38 +1480,68 @@ class TimestampLookupHandler:
remote_response, remote_response,
) )
# TODO: Do we want to persist this as an extremity?
# TODO: I think ideally, we would try to backfill from
# this event and run this whole
# `get_event_for_timestamp` function again to make sure
# they didn't give us an event from their gappy history.
remote_event_id = remote_response.event_id remote_event_id = remote_response.event_id
origin_server_ts = remote_response.origin_server_ts remote_origin_server_ts = remote_response.origin_server_ts
# Backfill this event so we can get a pagination token for
# it with `/context` and paginate `/messages` from this
# point.
#
# TODO: The requested timestamp may lie in a part of the
# event graph that the remote server *also* didn't have,
# in which case they will have returned another event
# which may be nowhere near the requested timestamp. In
# the future, we may need to reconcile that gap and ask
# other homeservers, and/or extend `/timestamp_to_event`
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
remote_event = (
await self.federation_event_handler.backfill_event_id(
domain, room_id, remote_event_id
)
)
# XXX: When we see that the remote server is not trustworthy,
# maybe we should not ask them first in the future.
if remote_origin_server_ts != remote_event.origin_server_ts:
logger.info(
"get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
domain,
remote_event_id,
remote_origin_server_ts,
remote_event.origin_server_ts,
)
# Only return the remote event if it's closer than the local event # Only return the remote event if it's closer than the local event
if not local_event or ( if not local_event or (
abs(origin_server_ts - timestamp) abs(remote_event.origin_server_ts - timestamp)
< abs(local_event.origin_server_ts - timestamp) < abs(local_event.origin_server_ts - timestamp)
): ):
return remote_event_id, origin_server_ts logger.info(
"get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
remote_event_id,
remote_event.origin_server_ts,
timestamp,
local_event.event_id if local_event else None,
local_event.origin_server_ts if local_event else None,
)
return remote_event_id, remote_origin_server_ts
except (HttpResponseException, InvalidResponseError) as ex: except (HttpResponseException, InvalidResponseError) as ex:
# Let's not put a high priority on some other homeserver # Let's not put a high priority on some other homeserver
# failing to respond or giving a random response # failing to respond or giving a random response
logger.debug( logger.debug(
"Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
domain, domain,
type(ex).__name__, type(ex).__name__,
ex, ex,
ex.args, ex.args,
) )
except Exception as ex: except Exception:
# But we do want to see some exceptions in our code # But we do want to see some exceptions in our code
logger.warning( logger.warning(
"Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s", "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception",
domain, domain,
type(ex).__name__, exc_info=True,
ex,
ex.args,
) )
# To appease mypy, we have to add both of these conditions to check for # To appease mypy, we have to add both of these conditions to check for