From 487f1bb49d5eb5840c7dd70d95ac53f2b24eba21 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 12:14:12 +0000 Subject: [PATCH 01/17] Use the filtered version of an event when responding to /context requests for that event Sometimes the filtering function can return a pruned version of an event (on top of either the event itself or an empty list), if it thinks the user should be able to see that there's an event there but not the content of that event. Therefore, the previous logic of 'if filtered is empty then we can use the event we retrieved from the database' is flawed, and we should use the event returned by the filtering function. --- synapse/handlers/room.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 22768e97ff..7f979e5812 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -907,7 +907,10 @@ class RoomContextHandler(object): results["events_before"] = yield filter_evts(results["events_before"]) results["events_after"] = yield filter_evts(results["events_after"]) - results["event"] = event + # filter_evts can return a pruned event in case the user is allowed to see that + # there's something there but not see the content, so use the event that's in + # `filtered` rather than the event we retrieved from the datastore. + results["event"] = filtered[0] if results["events_after"]: last_event_id = results["events_after"][-1].event_id From ac87ddb242d146cd840b232fa6e8165c9dd01ae6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 12:15:37 +0000 Subject: [PATCH 02/17] Update the documentation of the filtering function --- synapse/visibility.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/visibility.py b/synapse/visibility.py index dffe943b28..100dc47a8a 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -52,7 +52,8 @@ def filter_events_for_client( apply_retention_policies=True, ): """ - Check which events a user is allowed to see + Check which events a user is allowed to see. If the user can see the event but its + sender asked for their data to be erased, prune the content of the event. Args: storage From 8b9f5c21c35ff7a5491121ffc381bd8c97e879ce Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 12:19:35 +0000 Subject: [PATCH 03/17] Changelog --- changelog.d/6553.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6553.bugfix diff --git a/changelog.d/6553.bugfix b/changelog.d/6553.bugfix new file mode 100644 index 0000000000..e8f55e2a76 --- /dev/null +++ b/changelog.d/6553.bugfix @@ -0,0 +1 @@ +Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event the request is for. From 4c7b1bb6cccd726ae9a9f91b3309554a7fe6d262 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Dec 2019 16:22:00 +0000 Subject: [PATCH 04/17] Refactor get_events_from_store_or_dest to return a dict (#6501) There was a bunch of unnecessary conversion back and forth between dict and list going on here. We can simplify a bunch of the code. --- changelog.d/6501.misc | 1 + synapse/federation/federation_client.py | 44 +++++++++---------------- 2 files changed, 16 insertions(+), 29 deletions(-) create mode 100644 changelog.d/6501.misc diff --git a/changelog.d/6501.misc b/changelog.d/6501.misc new file mode 100644 index 0000000000..255f45a9c3 --- /dev/null +++ b/changelog.d/6501.misc @@ -0,0 +1 @@ +Refactor get_events_from_store_or_dest to return a dict. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 709449c9e3..73e1dda6a3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -18,8 +18,6 @@ import copy import itertools import logging -from six.moves import range - from prometheus_client import Counter from twisted.internet import defer @@ -41,7 +39,7 @@ from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.utils import log_function -from synapse.util import unwrapFirstError +from synapse.util import batch_iter, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -331,10 +329,12 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) - fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest( - destination, room_id, set(state_event_ids + auth_event_ids) + desired_events = set(state_event_ids + auth_event_ids) + event_map = yield self.get_events_from_store_or_dest( + destination, room_id, desired_events ) + failed_to_fetch = desired_events - event_map.keys() if failed_to_fetch: logger.warning( "Failed to fetch missing state/auth events for %s: %s", @@ -342,8 +342,6 @@ class FederationClient(FederationBase): failed_to_fetch, ) - event_map = {ev.event_id: ev for ev in fetched_events} - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] @@ -358,23 +356,18 @@ class FederationClient(FederationBase): Args: destination (str) room_id (str) - event_ids (list) + event_ids (Iterable[str]) Returns: - Deferred: A deferred resolving to a 2-tuple where the first is a list of - events and the second is a list of event ids that we failed to fetch. + Deferred[dict[str, EventBase]]: A deferred resolving to a map + from event_id to event """ - seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = list(seen_events.values()) + fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) - failed_to_fetch = set() - - missing_events = set(event_ids) - for k in seen_events: - missing_events.discard(k) + missing_events = set(event_ids) - fetched_events.keys() if not missing_events: - return signed_events, failed_to_fetch + return fetched_events logger.debug( "Fetching unknown state/auth events %s for room %s", @@ -384,11 +377,8 @@ class FederationClient(FederationBase): room_version = yield self.store.get_room_version(room_id) - batch_size = 20 - missing_events = list(missing_events) - for i in range(0, len(missing_events), batch_size): - batch = set(missing_events[i : i + batch_size]) - + # XXX 20 requests at once? really? + for batch in batch_iter(missing_events, 20): deferreds = [ run_in_background( self.get_pdu, @@ -404,13 +394,9 @@ class FederationClient(FederationBase): ) for success, result in res: if success and result: - signed_events.append(result) - batch.discard(result.event_id) + fetched_events[result.event_id] = result - # We removed all events we successfully fetched from `batch` - failed_to_fetch.update(batch) - - return signed_events, failed_to_fetch + return fetched_events @defer.inlineCallbacks @log_function From be294d6fde1b8b37b9d557e56973deb92790ddb8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Dec 2019 17:42:46 +0000 Subject: [PATCH 05/17] Move get_state methods into FederationHandler (#6503) This is a non-functional refactor as a precursor to some other work. --- changelog.d/6503.misc | 1 + synapse/federation/federation_client.py | 91 +++------------------ synapse/handlers/federation.py | 101 ++++++++++++++++++++++-- 3 files changed, 107 insertions(+), 86 deletions(-) create mode 100644 changelog.d/6503.misc diff --git a/changelog.d/6503.misc b/changelog.d/6503.misc new file mode 100644 index 0000000000..e4e9a5a3d4 --- /dev/null +++ b/changelog.d/6503.misc @@ -0,0 +1 @@ +Move get_state methods into FederationHandler. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 73e1dda6a3..d396e6564f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -37,9 +37,9 @@ from synapse.api.room_versions import ( ) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function -from synapse.util import batch_iter, unwrapFirstError +from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -308,19 +308,12 @@ class FederationClient(FederationBase): return signed_pdu @defer.inlineCallbacks - @log_function - def get_state_for_room(self, destination, room_id, event_id): - """Requests all of the room state at a given event from a remote homeserver. - - Args: - destination (str): The remote homeserver to query for the state. - room_id (str): The id of the room we're interested in. - event_id (str): The id of the event we want the state at. + def get_room_state_ids(self, destination: str, room_id: str, event_id: str): + """Calls the /state_ids endpoint to fetch the state at a particular point + in the room, and the auth events for the given event Returns: - Deferred[Tuple[List[EventBase], List[EventBase]]]: - A list of events in the state, and a list of events in the auth chain - for the given event. + Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) """ result = yield self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id @@ -329,74 +322,12 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) - desired_events = set(state_event_ids + auth_event_ids) - event_map = yield self.get_events_from_store_or_dest( - destination, room_id, desired_events - ) + if not isinstance(state_event_ids, list) or not isinstance( + auth_event_ids, list + ): + raise Exception("invalid response from /state_ids") - failed_to_fetch = desired_events - event_map.keys() - if failed_to_fetch: - logger.warning( - "Failed to fetch missing state/auth events for %s: %s", - room_id, - failed_to_fetch, - ) - - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] - auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] - - auth_chain.sort(key=lambda e: e.depth) - - return pdus, auth_chain - - @defer.inlineCallbacks - def get_events_from_store_or_dest(self, destination, room_id, event_ids): - """Fetch events from a remote destination, checking if we already have them. - - Args: - destination (str) - room_id (str) - event_ids (Iterable[str]) - - Returns: - Deferred[dict[str, EventBase]]: A deferred resolving to a map - from event_id to event - """ - fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) - - missing_events = set(event_ids) - fetched_events.keys() - - if not missing_events: - return fetched_events - - logger.debug( - "Fetching unknown state/auth events %s for room %s", - missing_events, - event_ids, - ) - - room_version = yield self.store.get_room_version(room_id) - - # XXX 20 requests at once? really? - for batch in batch_iter(missing_events, 20): - deferreds = [ - run_in_background( - self.get_pdu, - destinations=[destination], - event_id=e_id, - room_version=room_version, - ) - for e_id in batch - ] - - res = yield make_deferred_yieldable( - defer.DeferredList(deferreds, consumeErrors=True) - ) - for success, result in res: - if success and result: - fetched_events[result.event_id] = result - - return fetched_events + return state_event_ids, auth_event_ids @defer.inlineCallbacks @log_function diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bc26921768..c0dcf9abf8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -64,7 +64,7 @@ from synapse.replication.http.federation import ( from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id -from synapse.util import unwrapFirstError +from synapse.util import batch_iter, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination @@ -379,11 +379,9 @@ class FederationHandler(BaseHandler): ( remote_state, got_auth_chain, - ) = yield self.federation_client.get_state_for_room( - origin, room_id, p - ) + ) = yield self._get_state_for_room(origin, room_id, p) - # we want the state *after* p; get_state_for_room returns the + # we want the state *after* p; _get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( [origin], p, room_version, outlier=True @@ -583,6 +581,97 @@ class FederationHandler(BaseHandler): else: raise + @defer.inlineCallbacks + @log_function + def _get_state_for_room(self, destination, room_id, event_id): + """Requests all of the room state at a given event from a remote homeserver. + + Args: + destination (str): The remote homeserver to query for the state. + room_id (str): The id of the room we're interested in. + event_id (str): The id of the event we want the state at. + + Returns: + Deferred[Tuple[List[EventBase], List[EventBase]]]: + A list of events in the state, and a list of events in the auth chain + for the given event. + """ + ( + state_event_ids, + auth_event_ids, + ) = yield self.federation_client.get_room_state_ids( + destination, room_id, event_id=event_id + ) + + desired_events = set(state_event_ids + auth_event_ids) + event_map = yield self._get_events_from_store_or_dest( + destination, room_id, desired_events + ) + + failed_to_fetch = desired_events - event_map.keys() + if failed_to_fetch: + logger.warning( + "Failed to fetch missing state/auth events for %s: %s", + room_id, + failed_to_fetch, + ) + + pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] + auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] + + auth_chain.sort(key=lambda e: e.depth) + + return pdus, auth_chain + + @defer.inlineCallbacks + def _get_events_from_store_or_dest(self, destination, room_id, event_ids): + """Fetch events from a remote destination, checking if we already have them. + + Args: + destination (str) + room_id (str) + event_ids (Iterable[str]) + + Returns: + Deferred[dict[str, EventBase]]: A deferred resolving to a map + from event_id to event + """ + fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) + + missing_events = set(event_ids) - fetched_events.keys() + + if not missing_events: + return fetched_events + + logger.debug( + "Fetching unknown state/auth events %s for room %s", + missing_events, + event_ids, + ) + + room_version = yield self.store.get_room_version(room_id) + + # XXX 20 requests at once? really? + for batch in batch_iter(missing_events, 20): + deferreds = [ + run_in_background( + self.federation_client.get_pdu, + destinations=[destination], + event_id=e_id, + room_version=room_version, + ) + for e_id in batch + ] + + res = yield make_deferred_yieldable( + defer.DeferredList(deferreds, consumeErrors=True) + ) + for success, result in res: + if success and result: + fetched_events[result.event_id] = result + + return fetched_events + @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it @@ -723,7 +812,7 @@ class FederationHandler(BaseHandler): state_events = {} events_to_state = {} for e_id in edges: - state, auth = yield self.federation_client.get_state_for_room( + state, auth = yield self._get_state_for_room( destination=dest, room_id=room_id, event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) From 20d5ba16e626aa4217492c83dda9fabd36bd5d2b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 11 Dec 2019 16:37:51 +0000 Subject: [PATCH 06/17] Add `include_event_in_state` to _get_state_for_room (#6521) Make it return the state *after* the requested event, rather than the one before it. This is a bit easier and requires fewer calls to get_events_from_store_or_dest. --- changelog.d/6521.misc | 1 + synapse/handlers/federation.py | 39 ++++++++++++++++++---------------- 2 files changed, 22 insertions(+), 18 deletions(-) create mode 100644 changelog.d/6521.misc diff --git a/changelog.d/6521.misc b/changelog.d/6521.misc new file mode 100644 index 0000000000..d9a44389b9 --- /dev/null +++ b/changelog.d/6521.misc @@ -0,0 +1 @@ +Refactor some code in the event authentication path for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c0dcf9abf8..31c9132ae9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -379,22 +379,10 @@ class FederationHandler(BaseHandler): ( remote_state, got_auth_chain, - ) = yield self._get_state_for_room(origin, room_id, p) - - # we want the state *after* p; _get_state_for_room returns the - # state *before* p. - remote_event = yield self.federation_client.get_pdu( - [origin], p, room_version, outlier=True + ) = yield self._get_state_for_room( + origin, room_id, p, include_event_in_state=True ) - if remote_event is None: - raise Exception( - "Unable to get missing prev_event %s" % (p,) - ) - - if remote_event.is_state(): - remote_state.append(remote_event) - # XXX hrm I'm not convinced that duplicate events will compare # for equality, so I'm not sure this does what the author # hoped. @@ -583,13 +571,15 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _get_state_for_room(self, destination, room_id, event_id): + def _get_state_for_room(self, destination, room_id, event_id, include_event_in_state): """Requests all of the room state at a given event from a remote homeserver. Args: destination (str): The remote homeserver to query for the state. room_id (str): The id of the room we're interested in. event_id (str): The id of the event we want the state at. + include_event_in_state: if true, the event itself will be included in the + returned state event list. Returns: Deferred[Tuple[List[EventBase], List[EventBase]]]: @@ -604,6 +594,10 @@ class FederationHandler(BaseHandler): ) desired_events = set(state_event_ids + auth_event_ids) + + if include_event_in_state: + desired_events.add(event_id) + event_map = yield self._get_events_from_store_or_dest( destination, room_id, desired_events ) @@ -616,12 +610,21 @@ class FederationHandler(BaseHandler): failed_to_fetch, ) - pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] - auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] + remote_state = [ + event_map[e_id] for e_id in state_event_ids if e_id in event_map + ] + if include_event_in_state: + remote_event = event_map.get(event_id) + if not remote_event: + raise Exception("Unable to get missing prev_event %s" % (event_id,)) + if remote_event.is_state(): + remote_state.append(remote_event) + + auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] auth_chain.sort(key=lambda e: e.depth) - return pdus, auth_chain + return remote_state, auth_chain @defer.inlineCallbacks def _get_events_from_store_or_dest(self, destination, room_id, event_ids): From 35bbe4ca794d7b7b1c5b008211a377f54deecb5d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 12 Dec 2019 12:57:45 +0000 Subject: [PATCH 07/17] Check the room_id of events when fetching room state/auth (#6524) When we request the state/auth_events to populate a backwards extremity (on backfill or in the case of missing events in a transaction push), we should check that the returned events are in the right room rather than blindly using them in the room state or auth chain. Given that _get_events_from_store_or_dest takes a room_id, it seems clear that it should be sanity-checking the room_id of the requested events, so let's do it there. --- changelog.d/6524.misc | 2 + synapse/handlers/federation.py | 82 +++++++++++++++++++++++----------- 2 files changed, 58 insertions(+), 26 deletions(-) create mode 100644 changelog.d/6524.misc diff --git a/changelog.d/6524.misc b/changelog.d/6524.misc new file mode 100644 index 0000000000..f885597426 --- /dev/null +++ b/changelog.d/6524.misc @@ -0,0 +1,2 @@ +Improve sanity-checking when receiving events over federation. + diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 31c9132ae9..ebeffbb768 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -571,7 +571,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _get_state_for_room(self, destination, room_id, event_id, include_event_in_state): + def _get_state_for_room( + self, destination, room_id, event_id, include_event_in_state + ): """Requests all of the room state at a given event from a remote homeserver. Args: @@ -635,6 +637,10 @@ class FederationHandler(BaseHandler): room_id (str) event_ids (Iterable[str]) + If we fail to fetch any of the events, a warning will be logged, and the event + will be omitted from the result. Likewise, any events which turn out not to + be in the given room. + Returns: Deferred[dict[str, EventBase]]: A deferred resolving to a map from event_id to event @@ -643,35 +649,59 @@ class FederationHandler(BaseHandler): missing_events = set(event_ids) - fetched_events.keys() - if not missing_events: - return fetched_events + if missing_events: + logger.debug( + "Fetching unknown state/auth events %s for room %s", + missing_events, + room_id, + ) - logger.debug( - "Fetching unknown state/auth events %s for room %s", - missing_events, - event_ids, + room_version = yield self.store.get_room_version(room_id) + + # XXX 20 requests at once? really? + for batch in batch_iter(missing_events, 20): + deferreds = [ + run_in_background( + self.federation_client.get_pdu, + destinations=[destination], + event_id=e_id, + room_version=room_version, + ) + for e_id in batch + ] + + res = yield make_deferred_yieldable( + defer.DeferredList(deferreds, consumeErrors=True) + ) + + for success, result in res: + if success and result: + fetched_events[result.event_id] = result + + # check for events which were in the wrong room. + # + # this can happen if a remote server claims that the state or + # auth_events at an event in room A are actually events in room B + + bad_events = list( + (event_id, event.room_id) + for event_id, event in fetched_events.items() + if event.room_id != room_id ) - room_version = yield self.store.get_room_version(room_id) - - # XXX 20 requests at once? really? - for batch in batch_iter(missing_events, 20): - deferreds = [ - run_in_background( - self.federation_client.get_pdu, - destinations=[destination], - event_id=e_id, - room_version=room_version, - ) - for e_id in batch - ] - - res = yield make_deferred_yieldable( - defer.DeferredList(deferreds, consumeErrors=True) + for bad_event_id, bad_room_id in bad_events: + # This is a bogus situation, but since we may only discover it a long time + # after it happened, we try our best to carry on, by just omitting the + # bad events from the returned auth/state set. + logger.warning( + "Remote server %s claims event %s in room %s is an auth/state " + "event in room %s", + destination, + bad_event_id, + bad_room_id, + room_id, ) - for success, result in res: - if success and result: - fetched_events[result.event_id] = result + del fetched_events[bad_event_id] return fetched_events From 6577f2d8877b89f7198f7fb03cf57f10a75728ca Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 13 Dec 2019 11:44:41 +0000 Subject: [PATCH 08/17] Sanity-check room ids in event auth (#6530) When we do an event auth operation, check that all of the events involved are in the right room. --- changelog.d/6530.misc | 2 ++ synapse/event_auth.py | 12 ++++++++++++ 2 files changed, 14 insertions(+) create mode 100644 changelog.d/6530.misc diff --git a/changelog.d/6530.misc b/changelog.d/6530.misc new file mode 100644 index 0000000000..f885597426 --- /dev/null +++ b/changelog.d/6530.misc @@ -0,0 +1,2 @@ +Improve sanity-checking when receiving events over federation. + diff --git a/synapse/event_auth.py b/synapse/event_auth.py index ec3243b27b..d184b0273b 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -48,6 +48,18 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru if not hasattr(event, "room_id"): raise AuthError(500, "Event has no room_id: %s" % event) + room_id = event.room_id + + # I'm not really expecting to get auth events in the wrong room, but let's + # sanity-check it + for auth_event in auth_events.values(): + if auth_event.room_id != room_id: + raise Exception( + "During auth for event %s in room %s, found event %s in the state " + "which is in room %s" + % (event.event_id, room_id, auth_event.event_id, auth_event.room_id) + ) + if do_sig_check: sender_domain = get_domain_from_id(event.sender) From 83895316d4e18d4a52c43524942d98f864bac6f9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 13 Dec 2019 12:55:32 +0000 Subject: [PATCH 09/17] sanity-checking for events used in state res (#6531) When we perform state resolution, check that all of the events involved are in the right room. --- changelog.d/6531.misc | 1 + synapse/handlers/federation.py | 1 + synapse/state/__init__.py | 32 +++++++---- synapse/state/v1.py | 34 +++++++++-- synapse/state/v2.py | 100 ++++++++++++++++++++++++--------- tests/state/test_v2.py | 3 + 6 files changed, 128 insertions(+), 43 deletions(-) create mode 100644 changelog.d/6531.misc diff --git a/changelog.d/6531.misc b/changelog.d/6531.misc new file mode 100644 index 0000000000..598efb79fc --- /dev/null +++ b/changelog.d/6531.misc @@ -0,0 +1 @@ +Improve sanity-checking when receiving events over federation. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ebeffbb768..fd3f5ced55 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -397,6 +397,7 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x state_map = yield resolve_events_with_store( + room_id, room_version, state_maps, event_map, diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 139beef8ed..0e75e94c6f 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -16,7 +16,7 @@ import logging from collections import namedtuple -from typing import Iterable, Optional +from typing import Dict, Iterable, List, Optional, Tuple from six import iteritems, itervalues @@ -416,6 +416,7 @@ class StateHandler(object): with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + event.room_id, room_version, state_set_ids, event_map=state_map, @@ -461,7 +462,7 @@ class StateResolutionHandler(object): not be called for a single state group Args: - room_id (str): room we are resolving for (used for logging) + room_id (str): room we are resolving for (used for logging and sanity checks) room_version (str): version of the room state_groups_ids (dict[int, dict[(str, str), str]]): map from state group id to the state in that state group @@ -517,6 +518,7 @@ class StateResolutionHandler(object): logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + room_id, room_version, list(itervalues(state_groups_ids)), event_map=event_map, @@ -588,36 +590,44 @@ def _make_state_cache_entry(new_state, state_groups_ids): ) -def resolve_events_with_store(room_version, state_sets, event_map, state_res_store): +def resolve_events_with_store( + room_id: str, + room_version: str, + state_sets: List[Dict[Tuple[str, str], str]], + event_map: Optional[Dict[str, EventBase]], + state_res_store: "StateResolutionStore", +): """ Args: - room_version(str): Version of the room + room_id: the room we are working in - state_sets(list): List of dicts of (type, state_key) -> event_id, + room_version: Version of the room + + state_sets: List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - event_map(dict[str,FrozenEvent]|None): + event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be used as a starting point fof finding the state we need; any missing events will be requested via state_map_factory. - If None, all events will be fetched via state_map_factory. + If None, all events will be fetched via state_res_store. - state_res_store (StateResolutionStore) + state_res_store: a place to fetch events from - Returns + Returns: Deferred[dict[(str, str), str]]: a map from (type, state_key) to event_id. """ v = KNOWN_ROOM_VERSIONS[room_version] if v.state_res == StateResolutionVersions.V1: return v1.resolve_events_with_store( - state_sets, event_map, state_res_store.get_events + room_id, state_sets, event_map, state_res_store.get_events ) else: return v2.resolve_events_with_store( - room_version, state_sets, event_map, state_res_store + room_id, room_version, state_sets, event_map, state_res_store ) diff --git a/synapse/state/v1.py b/synapse/state/v1.py index a2f92d9ff9..b2f9865f39 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -15,6 +15,7 @@ import hashlib import logging +from typing import Callable, Dict, List, Optional, Tuple from six import iteritems, iterkeys, itervalues @@ -24,6 +25,7 @@ from synapse import event_auth from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.room_versions import RoomVersions +from synapse.events import EventBase logger = logging.getLogger(__name__) @@ -32,13 +34,20 @@ POWER_KEY = (EventTypes.PowerLevels, "") @defer.inlineCallbacks -def resolve_events_with_store(state_sets, event_map, state_map_factory): +def resolve_events_with_store( + room_id: str, + state_sets: List[Dict[Tuple[str, str], str]], + event_map: Optional[Dict[str, EventBase]], + state_map_factory: Callable, +): """ Args: - state_sets(list): List of dicts of (type, state_key) -> event_id, + room_id: the room we are working in + + state_sets: List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - event_map(dict[str,FrozenEvent]|None): + event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be used as a starting point fof finding the state we need; any missing @@ -46,11 +55,11 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory): If None, all events will be fetched via state_map_factory. - state_map_factory(func): will be called + state_map_factory: will be called with a list of event_ids that are needed, and should return with a Deferred of dict of event_id to event. - Returns + Returns: Deferred[dict[(str, str), str]]: a map from (type, state_key) to event_id. """ @@ -76,6 +85,14 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory): if event_map is not None: state_map.update(event_map) + # everything in the state map should be in the right room + for event in state_map.values(): + if event.room_id != room_id: + raise Exception( + "Attempting to state-resolve for room %s with event %s which is in %s" + % (room_id, event.event_id, event.room_id,) + ) + # get the ids of the auth events which allow us to authenticate the # conflicted state, picking only from the unconflicting state. # @@ -95,6 +112,13 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory): ) state_map_new = yield state_map_factory(new_needed_events) + for event in state_map_new.values(): + if event.room_id != room_id: + raise Exception( + "Attempting to state-resolve for room %s with event %s which is in %s" + % (room_id, event.event_id, event.room_id,) + ) + state_map.update(state_map_new) return _resolve_with_state( diff --git a/synapse/state/v2.py b/synapse/state/v2.py index b327c86f40..cb77ed5b78 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -16,29 +16,40 @@ import heapq import itertools import logging +from typing import Dict, List, Optional, Tuple from six import iteritems, itervalues from twisted.internet import defer +import synapse.state from synapse import event_auth from synapse.api.constants import EventTypes from synapse.api.errors import AuthError +from synapse.events import EventBase logger = logging.getLogger(__name__) @defer.inlineCallbacks -def resolve_events_with_store(room_version, state_sets, event_map, state_res_store): +def resolve_events_with_store( + room_id: str, + room_version: str, + state_sets: List[Dict[Tuple[str, str], str]], + event_map: Optional[Dict[str, EventBase]], + state_res_store: "synapse.state.StateResolutionStore", +): """Resolves the state using the v2 state resolution algorithm Args: - room_version (str): The room version + room_id: the room we are working in - state_sets(list): List of dicts of (type, state_key) -> event_id, + room_version: The room version + + state_sets: List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - event_map(dict[str,FrozenEvent]|None): + event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be used as a starting point fof finding the state we need; any missing @@ -46,9 +57,9 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto If None, all events will be fetched via state_res_store. - state_res_store (StateResolutionStore) + state_res_store: - Returns + Returns: Deferred[dict[(str, str), str]]: a map from (type, state_key) to event_id. """ @@ -84,6 +95,14 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto ) event_map.update(events) + # everything in the event map should be in the right room + for event in event_map.values(): + if event.room_id != room_id: + raise Exception( + "Attempting to state-resolve for room %s with event %s which is in %s" + % (room_id, event.event_id, event.room_id,) + ) + full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map) logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) @@ -94,13 +113,14 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto ) sorted_power_events = yield _reverse_topological_power_sort( - power_events, event_map, state_res_store, full_conflicted_set + room_id, power_events, event_map, state_res_store, full_conflicted_set ) logger.debug("sorted %d power events", len(sorted_power_events)) # Now sequentially auth each one resolved_state = yield _iterative_auth_checks( + room_id, room_version, sorted_power_events, unconflicted_state, @@ -121,13 +141,18 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto pl = resolved_state.get((EventTypes.PowerLevels, ""), None) leftover_events = yield _mainline_sort( - leftover_events, pl, event_map, state_res_store + room_id, leftover_events, pl, event_map, state_res_store ) logger.debug("resolving remaining events") resolved_state = yield _iterative_auth_checks( - room_version, leftover_events, resolved_state, event_map, state_res_store + room_id, + room_version, + leftover_events, + resolved_state, + event_map, + state_res_store, ) logger.debug("resolved") @@ -141,11 +166,12 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto @defer.inlineCallbacks -def _get_power_level_for_sender(event_id, event_map, state_res_store): +def _get_power_level_for_sender(room_id, event_id, event_map, state_res_store): """Return the power level of the sender of the given event according to their auth events. Args: + room_id (str) event_id (str) event_map (dict[str,FrozenEvent]) state_res_store (StateResolutionStore) @@ -153,11 +179,11 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store): Returns: Deferred[int] """ - event = yield _get_event(event_id, event_map, state_res_store) + event = yield _get_event(room_id, event_id, event_map, state_res_store) pl = None for aid in event.auth_event_ids(): - aev = yield _get_event(aid, event_map, state_res_store) + aev = yield _get_event(room_id, aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): pl = aev break @@ -165,7 +191,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store): if pl is None: # Couldn't find power level. Check if they're the creator of the room for aid in event.auth_event_ids(): - aev = yield _get_event(aid, event_map, state_res_store) + aev = yield _get_event(room_id, aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.Create, ""): if aev.content.get("creator") == event.sender: return 100 @@ -279,7 +305,7 @@ def _is_power_event(event): @defer.inlineCallbacks def _add_event_and_auth_chain_to_graph( - graph, event_id, event_map, state_res_store, auth_diff + graph, room_id, event_id, event_map, state_res_store, auth_diff ): """Helper function for _reverse_topological_power_sort that add the event and its auth chain (that is in the auth diff) to the graph @@ -287,6 +313,7 @@ def _add_event_and_auth_chain_to_graph( Args: graph (dict[str, set[str]]): A map from event ID to the events auth event IDs + room_id (str): the room we are working in event_id (str): Event to add to the graph event_map (dict[str,FrozenEvent]) state_res_store (StateResolutionStore) @@ -298,7 +325,7 @@ def _add_event_and_auth_chain_to_graph( eid = state.pop() graph.setdefault(eid, set()) - event = yield _get_event(eid, event_map, state_res_store) + event = yield _get_event(room_id, eid, event_map, state_res_store) for aid in event.auth_event_ids(): if aid in auth_diff: if aid not in graph: @@ -308,11 +335,14 @@ def _add_event_and_auth_chain_to_graph( @defer.inlineCallbacks -def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff): +def _reverse_topological_power_sort( + room_id, event_ids, event_map, state_res_store, auth_diff +): """Returns a list of the event_ids sorted by reverse topological ordering, and then by power level and origin_server_ts Args: + room_id (str): the room we are working in event_ids (list[str]): The events to sort event_map (dict[str,FrozenEvent]) state_res_store (StateResolutionStore) @@ -325,12 +355,14 @@ def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_ graph = {} for event_id in event_ids: yield _add_event_and_auth_chain_to_graph( - graph, event_id, event_map, state_res_store, auth_diff + graph, room_id, event_id, event_map, state_res_store, auth_diff ) event_to_pl = {} for event_id in graph: - pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store) + pl = yield _get_power_level_for_sender( + room_id, event_id, event_map, state_res_store + ) event_to_pl[event_id] = pl def _get_power_order(event_id): @@ -348,12 +380,13 @@ def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_ @defer.inlineCallbacks def _iterative_auth_checks( - room_version, event_ids, base_state, event_map, state_res_store + room_id, room_version, event_ids, base_state, event_map, state_res_store ): """Sequentially apply auth checks to each event in given list, updating the state as it goes along. Args: + room_id (str) room_version (str) event_ids (list[str]): Ordered list of events to apply auth checks to base_state (dict[tuple[str, str], str]): The set of state to start with @@ -370,7 +403,7 @@ def _iterative_auth_checks( auth_events = {} for aid in event.auth_event_ids(): - ev = yield _get_event(aid, event_map, state_res_store) + ev = yield _get_event(room_id, aid, event_map, state_res_store) if ev.rejected_reason is None: auth_events[(ev.type, ev.state_key)] = ev @@ -378,7 +411,7 @@ def _iterative_auth_checks( for key in event_auth.auth_types_for_event(event): if key in resolved_state: ev_id = resolved_state[key] - ev = yield _get_event(ev_id, event_map, state_res_store) + ev = yield _get_event(room_id, ev_id, event_map, state_res_store) if ev.rejected_reason is None: auth_events[key] = event_map[ev_id] @@ -400,11 +433,14 @@ def _iterative_auth_checks( @defer.inlineCallbacks -def _mainline_sort(event_ids, resolved_power_event_id, event_map, state_res_store): +def _mainline_sort( + room_id, event_ids, resolved_power_event_id, event_map, state_res_store +): """Returns a sorted list of event_ids sorted by mainline ordering based on the given event resolved_power_event_id Args: + room_id (str): room we're working in event_ids (list[str]): Events to sort resolved_power_event_id (str): The final resolved power level event ID event_map (dict[str,FrozenEvent]) @@ -417,11 +453,11 @@ def _mainline_sort(event_ids, resolved_power_event_id, event_map, state_res_stor pl = resolved_power_event_id while pl: mainline.append(pl) - pl_ev = yield _get_event(pl, event_map, state_res_store) + pl_ev = yield _get_event(room_id, pl, event_map, state_res_store) auth_events = pl_ev.auth_event_ids() pl = None for aid in auth_events: - ev = yield _get_event(aid, event_map, state_res_store) + ev = yield _get_event(room_id, aid, event_map, state_res_store) if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""): pl = aid break @@ -457,6 +493,8 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor Deferred[int] """ + room_id = event.room_id + # We do an iterative search, replacing `event with the power level in its # auth events (if any) while event: @@ -468,7 +506,7 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor event = None for aid in auth_events: - aev = yield _get_event(aid, event_map, state_res_store) + aev = yield _get_event(room_id, aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): event = aev break @@ -478,11 +516,12 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor @defer.inlineCallbacks -def _get_event(event_id, event_map, state_res_store): +def _get_event(room_id, event_id, event_map, state_res_store): """Helper function to look up event in event_map, falling back to looking it up in the store Args: + room_id (str) event_id (str) event_map (dict[str,FrozenEvent]) state_res_store (StateResolutionStore) @@ -493,7 +532,14 @@ def _get_event(event_id, event_map, state_res_store): if event_id not in event_map: events = yield state_res_store.get_events([event_id], allow_rejected=True) event_map.update(events) - return event_map[event_id] + event = event_map[event_id] + assert event is not None + if event.room_id != room_id: + raise Exception( + "In state res for room %s, event %s is in %s" + % (room_id, event_id, event.room_id) + ) + return event def lexicographical_topological_sort(graph, key): diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 8d3845c870..0f341d3ac3 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -58,6 +58,7 @@ class FakeEvent(object): self.type = type self.state_key = state_key self.content = content + self.room_id = ROOM_ID def to_event(self, auth_events, prev_events): """Given the auth_events and prev_events, convert to a Frozen Event @@ -418,6 +419,7 @@ class StateTestCase(unittest.TestCase): state_before = dict(state_at_event[prev_events[0]]) else: state_d = resolve_events_with_store( + ROOM_ID, RoomVersions.V2.identifier, [state_at_event[n] for n in prev_events], event_map=event_map, @@ -565,6 +567,7 @@ class SimpleParamStateTestCase(unittest.TestCase): # Test that we correctly handle passing `None` as the event_map state_d = resolve_events_with_store( + ROOM_ID, RoomVersions.V2.identifier, [self.state_at_bob, self.state_at_charlie], event_map=None, From ff773ff7243fbbe88fabff952d6faded0241c64e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 16 Dec 2019 12:26:28 +0000 Subject: [PATCH 10/17] Persist auth/state events at backwards extremities when we fetch them (#6526) The main point here is to make sure that the state returned by _get_state_in_room has been authed before we try to use it as state in the room. --- changelog.d/6526.bugfix | 1 + synapse/handlers/federation.py | 243 +++++++++++---------------------- synapse/util/async_helpers.py | 4 +- 3 files changed, 83 insertions(+), 165 deletions(-) create mode 100644 changelog.d/6526.bugfix diff --git a/changelog.d/6526.bugfix b/changelog.d/6526.bugfix new file mode 100644 index 0000000000..53214b0748 --- /dev/null +++ b/changelog.d/6526.bugfix @@ -0,0 +1 @@ +Fix a bug which could cause the federation server to incorrectly return errors when handling certain obscure event graphs. \ No newline at end of file diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fd3f5ced55..f4ac0bfbc8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -64,8 +64,7 @@ from synapse.replication.http.federation import ( from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id -from synapse.util import batch_iter, unwrapFirstError -from synapse.util.async_helpers import Linearizer +from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -240,7 +239,6 @@ class FederationHandler(BaseHandler): return None state = None - auth_chain = [] # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): @@ -346,7 +344,6 @@ class FederationHandler(BaseHandler): # Calculate the state after each of the previous events, and # resolve them to find the correct state at the current event. - auth_chains = set() event_map = {event_id: pdu} try: # Get the state of the events we know about @@ -370,24 +367,14 @@ class FederationHandler(BaseHandler): p, ) - room_version = yield self.store.get_room_version(room_id) - with nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped # by the get_pdu_cache in federation_client. - ( - remote_state, - got_auth_chain, - ) = yield self._get_state_for_room( + (remote_state, _,) = yield self._get_state_for_room( origin, room_id, p, include_event_in_state=True ) - # XXX hrm I'm not convinced that duplicate events will compare - # for equality, so I'm not sure this does what the author - # hoped. - auth_chains.update(got_auth_chain) - remote_state_map = { (x.type, x.state_key): x.event_id for x in remote_state } @@ -396,6 +383,7 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x + room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_store( room_id, room_version, @@ -417,7 +405,6 @@ class FederationHandler(BaseHandler): event_map.update(evs) state = [event_map[e] for e in six.itervalues(state_map)] - auth_chain = list(auth_chains) except Exception: logger.warning( "[%s %s] Error attempting to resolve state at missing " @@ -433,9 +420,7 @@ class FederationHandler(BaseHandler): affected=event_id, ) - yield self._process_received_pdu( - origin, pdu, state=state, auth_chain=auth_chain - ) + yield self._process_received_pdu(origin, pdu, state=state) @defer.inlineCallbacks def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): @@ -638,6 +623,8 @@ class FederationHandler(BaseHandler): room_id (str) event_ids (Iterable[str]) + Persists any events we don't already have as outliers. + If we fail to fetch any of the events, a warning will be logged, and the event will be omitted from the result. Likewise, any events which turn out not to be in the given room. @@ -657,27 +644,15 @@ class FederationHandler(BaseHandler): room_id, ) - room_version = yield self.store.get_room_version(room_id) + yield self._get_events_and_persist( + destination=destination, room_id=room_id, events=missing_events + ) - # XXX 20 requests at once? really? - for batch in batch_iter(missing_events, 20): - deferreds = [ - run_in_background( - self.federation_client.get_pdu, - destinations=[destination], - event_id=e_id, - room_version=room_version, - ) - for e_id in batch - ] - - res = yield make_deferred_yieldable( - defer.DeferredList(deferreds, consumeErrors=True) - ) - - for success, result in res: - if success and result: - fetched_events[result.event_id] = result + # we need to make sure we re-load from the database to get the rejected + # state correct. + fetched_events.update( + (yield self.store.get_events(missing_events, allow_rejected=True)) + ) # check for events which were in the wrong room. # @@ -707,50 +682,24 @@ class FederationHandler(BaseHandler): return fetched_events @defer.inlineCallbacks - def _process_received_pdu(self, origin, event, state, auth_chain): + def _process_received_pdu(self, origin, event, state): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. + + Args: + origin: server sending the event + + event: event to be persisted + + state: Normally None, but if we are handling a gap in the graph + (ie, we are missing one or more prev_events), the resolved state at the + event """ room_id = event.room_id event_id = event.event_id logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) - event_ids = set() - if state: - event_ids |= {e.event_id for e in state} - if auth_chain: - event_ids |= {e.event_id for e in auth_chain} - - seen_ids = yield self.store.have_seen_events(event_ids) - - if state and auth_chain is not None: - # If we have any state or auth_chain given to us by the replication - # layer, then we should handle them (if we haven't before.) - - event_infos = [] - - for e in itertools.chain(auth_chain, state): - if e.event_id in seen_ids: - continue - e.internal_metadata.outlier = True - auth_ids = e.auth_event_ids() - auth = { - (e.type, e.state_key): e - for e in auth_chain - if e.event_id in auth_ids or e.type == EventTypes.Create - } - event_infos.append(_NewEventInfo(event=e, auth_events=auth)) - seen_ids.add(e.event_id) - - logger.info( - "[%s %s] persisting newly-received auth/state events %s", - room_id, - event_id, - [e.event.event_id for e in event_infos], - ) - yield self._handle_new_events(origin, event_infos) - try: context = yield self._handle_new_event(origin, event, state=state) except AuthError as e: @@ -806,8 +755,6 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") - room_version = yield self.store.get_room_version(room_id) - events = yield self.federation_client.backfill( dest, room_id, limit=limit, extremities=extremities ) @@ -836,6 +783,9 @@ class FederationHandler(BaseHandler): event_ids = set(e.event_id for e in events) + # build a list of events whose prev_events weren't in the batch. + # (XXX: this will include events whose prev_events we already have; that doesn't + # sound right?) edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids] logger.info("backfill: Got %d events with %d edges", len(events), len(edges)) @@ -864,95 +814,11 @@ class FederationHandler(BaseHandler): auth_events.update( {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} ) - missing_auth = required_auth - set(auth_events) - failed_to_fetch = set() - # Try and fetch any missing auth events from both DB and remote servers. - # We repeatedly do this until we stop finding new auth events. - while missing_auth - failed_to_fetch: - logger.info("Missing auth for backfill: %r", missing_auth) - ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) - auth_events.update(ret_events) - - required_auth.update( - a_id for event in ret_events.values() for a_id in event.auth_event_ids() - ) - missing_auth = required_auth - set(auth_events) - - if missing_auth - failed_to_fetch: - logger.info( - "Fetching missing auth for backfill: %r", - missing_auth - failed_to_fetch, - ) - - results = yield make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background( - self.federation_client.get_pdu, - [dest], - event_id, - room_version=room_version, - outlier=True, - timeout=10000, - ) - for event_id in missing_auth - failed_to_fetch - ], - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) - auth_events.update({a.event_id: a for a in results if a}) - required_auth.update( - a_id - for event in results - if event - for a_id in event.auth_event_ids() - ) - missing_auth = required_auth - set(auth_events) - - failed_to_fetch = missing_auth - set(auth_events) - - seen_events = yield self.store.have_seen_events( - set(auth_events.keys()) | set(state_events.keys()) - ) - - # We now have a chunk of events plus associated state and auth chain to - # persist. We do the persistence in two steps: - # 1. Auth events and state get persisted as outliers, plus the - # backward extremities get persisted (as non-outliers). - # 2. The rest of the events in the chunk get persisted one by one, as - # each one depends on the previous event for its state. - # - # The important thing is that events in the chunk get persisted as - # non-outliers, including when those events are also in the state or - # auth chain. Caution must therefore be taken to ensure that they are - # not accidentally marked as outliers. - - # Step 1a: persist auth events that *don't* appear in the chunk ev_infos = [] - for a in auth_events.values(): - # We only want to persist auth events as outliers that we haven't - # seen and aren't about to persist as part of the backfilled chunk. - if a.event_id in seen_events or a.event_id in event_map: - continue - a.internal_metadata.outlier = True - ev_infos.append( - _NewEventInfo( - event=a, - auth_events={ - ( - auth_events[a_id].type, - auth_events[a_id].state_key, - ): auth_events[a_id] - for a_id in a.auth_event_ids() - if a_id in auth_events - }, - ) - ) - - # Step 1b: persist the events in the chunk we fetched state for (i.e. - # the backwards extremities) as non-outliers. + # Step 1: persist the events in the chunk we fetched state for (i.e. + # the backwards extremities), with custom auth events and state for e_id in events_to_state: # For paranoia we ensure that these events are marked as # non-outliers @@ -1194,6 +1060,57 @@ class FederationHandler(BaseHandler): return False + @defer.inlineCallbacks + def _get_events_and_persist( + self, destination: str, room_id: str, events: Iterable[str] + ): + """Fetch the given events from a server, and persist them as outliers. + + Logs a warning if we can't find the given event. + """ + + room_version = yield self.store.get_room_version(room_id) + + event_infos = [] + + async def get_event(event_id: str): + with nested_logging_context(event_id): + try: + event = await self.federation_client.get_pdu( + [destination], event_id, room_version, outlier=True, + ) + if event is None: + logger.warning( + "Server %s didn't return event %s", destination, event_id, + ) + return + + # recursively fetch the auth events for this event + auth_events = await self._get_events_from_store_or_dest( + destination, room_id, event.auth_event_ids() + ) + auth = {} + for auth_event_id in event.auth_event_ids(): + ae = auth_events.get(auth_event_id) + if ae: + auth[(ae.type, ae.state_key)] = ae + + event_infos.append(_NewEventInfo(event, None, auth)) + + except Exception as e: + logger.warning( + "Error fetching missing state/auth event %s: %s %s", + event_id, + type(e), + e, + ) + + yield concurrently_execute(get_event, events, 5) + + yield self._handle_new_events( + destination, event_infos, + ) + def _sanity_check_event(self, ev): """ Do some early sanity checks of a received event diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 5c4de2e69f..04b6abdc24 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -140,8 +140,8 @@ def concurrently_execute(func, args, limit): Args: func (func): Function to execute, should return a deferred or coroutine. - args (list): List of arguments to pass to func, each invocation of func - gets a signle argument. + args (Iterable): List of arguments to pass to func, each invocation of func + gets a single argument. limit (int): Maximum number of conccurent executions. Returns: From bbb75ff6eeda25e2f0eebd0a6639efd48b4dbb3c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 16 Dec 2019 13:14:37 +0000 Subject: [PATCH 11/17] Exclude rejected state events when calculating state at backwards extrems (#6527) This fixes a weird bug where, if you were determined enough, you could end up with a rejected event forming part of the state at a backwards-extremity. Authing that backwards extrem would then lead to us trying to pull the rejected event from the db (with allow_rejected=False), which would fail with a 404. --- changelog.d/6527.bugfix | 1 + synapse/handlers/federation.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/6527.bugfix diff --git a/changelog.d/6527.bugfix b/changelog.d/6527.bugfix new file mode 100644 index 0000000000..53214b0748 --- /dev/null +++ b/changelog.d/6527.bugfix @@ -0,0 +1 @@ +Fix a bug which could cause the federation server to incorrectly return errors when handling certain obscure event graphs. \ No newline at end of file diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f4ac0bfbc8..abe02907b9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -606,7 +606,7 @@ class FederationHandler(BaseHandler): remote_event = event_map.get(event_id) if not remote_event: raise Exception("Unable to get missing prev_event %s" % (event_id,)) - if remote_event.is_state(): + if remote_event.is_state() and remote_event.rejected_reason is None: remote_state.append(remote_event) auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] From 596dd9914dad1933ded1426bdec1e2b1e6874e39 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 14:53:21 +0000 Subject: [PATCH 12/17] Add test case --- tests/rest/client/v1/test_rooms.py | 133 +++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 1ca7fa742f..9cb505f316 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -29,6 +29,7 @@ import synapse.rest.admin from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.handlers.pagination import PurgeStatus from synapse.rest.client.v1 import login, profile, room +from synapse.rest.client.v2_alpha import account from synapse.util.stringutils import random_string from tests import unittest @@ -1597,3 +1598,135 @@ class LabelsTestCase(unittest.HomeserverTestCase): ) return event_id + + +class ContextTestCase(unittest.HomeserverTestCase): + + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + account.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + self.hs = self.setup_test_homeserver() + + return self.hs + + def prepare(self, reactor, clock, homeserver): + self.user_id = self.register_user("user", "password") + self.tok = self.login("user", "password") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) + + self.other_user_id = self.register_user("user2", "password") + self.other_tok = self.login("user2", "password") + + self.helper.invite(self.room_id, self.user_id, self.other_user_id, tok=self.tok) + self.helper.join(self.room_id, self.other_user_id, tok=self.other_tok) + + def test_erased_sender(self): + """Test that an erasure request results in the requester's events being hidden + from any new member of the room. + """ + + # Send a bunch of events in the room. + + self.helper.send(self.room_id, "message 1", tok=self.tok) + self.helper.send(self.room_id, "message 2", tok=self.tok) + event_id = self.helper.send(self.room_id, "message 3", tok=self.tok)["event_id"] + self.helper.send(self.room_id, "message 4", tok=self.tok) + self.helper.send(self.room_id, "message 5", tok=self.tok) + + # Check that we can still see the messages before the erasure request. + + request, channel = self.make_request( + "GET", + '/rooms/%s/context/%s?filter={"types":["m.room.message"]}' + % (self.room_id, event_id), + access_token=self.tok, + ) + self.render(request) + self.assertEqual(channel.code, 200, channel.result) + + events_before = channel.json_body["events_before"] + + self.assertEqual(len(events_before), 2, events_before) + self.assertEqual( + events_before[0].get("content", {}).get("body"), + "message 2", + events_before[0], + ) + self.assertEqual( + events_before[1].get("content", {}).get("body"), + "message 1", + events_before[1], + ) + + self.assertEqual( + channel.json_body["event"].get("content", {}).get("body"), + "message 3", + channel.json_body["event"], + ) + + events_after = channel.json_body["events_after"] + + self.assertEqual(len(events_after), 2, events_after) + self.assertEqual( + events_after[0].get("content", {}).get("body"), + "message 4", + events_after[0], + ) + self.assertEqual( + events_after[1].get("content", {}).get("body"), + "message 5", + events_after[1], + ) + + # Deactivate the first account and erase the user's data. + + deactivate_account_handler = self.hs.get_deactivate_account_handler() + self.get_success( + deactivate_account_handler.deactivate_account(self.user_id, erase_data=True) + ) + + # Invite another user in the room. This is needed because messages will be + # pruned only if the user wasn't a member of the room when the messages were + # sent. + + invited_user_id = self.register_user("user3", "password") + invited_tok = self.login("user3", "password") + + self.helper.invite( + self.room_id, self.other_user_id, invited_user_id, tok=self.other_tok + ) + self.helper.join(self.room_id, invited_user_id, tok=invited_tok) + + # Check that a user that joined the room after the erasure request can't see + # the messages anymore. + + request, channel = self.make_request( + 'GET', + '/rooms/%s/context/%s?filter={"types":["m.room.message"]}' + % (self.room_id, event_id), + access_token=invited_tok, + ) + self.render(request) + self.assertEqual(channel.code, 200, channel.result) + + events_before = channel.json_body["events_before"] + + self.assertEqual(len(events_before), 2, events_before) + self.assertDictEqual(events_before[0].get("content"), {}, events_before[0]) + self.assertDictEqual(events_before[1].get("content"), {}, events_before[1]) + + self.assertDictEqual( + channel.json_body["event"].get("content"), {}, channel.json_body["event"] + ) + + events_after = channel.json_body["events_after"] + + self.assertEqual(len(events_after), 2, events_after) + self.assertDictEqual(events_after[0].get("content"), {}, events_after[0]) + self.assertEqual(events_after[1].get("content"), {}, events_after[1]) + From a29420f9f449da72d1d38bcab4cedc182e9f2ba0 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 14:55:50 +0000 Subject: [PATCH 13/17] Lint --- tests/rest/client/v1/test_rooms.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 9cb505f316..dca0fef97a 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1706,7 +1706,7 @@ class ContextTestCase(unittest.HomeserverTestCase): # the messages anymore. request, channel = self.make_request( - 'GET', + "GET", '/rooms/%s/context/%s?filter={"types":["m.room.message"]}' % (self.room_id, event_id), access_token=invited_tok, @@ -1729,4 +1729,3 @@ class ContextTestCase(unittest.HomeserverTestCase): self.assertEqual(len(events_after), 2, events_after) self.assertDictEqual(events_after[0].get("content"), {}, events_after[0]) self.assertEqual(events_after[1].get("content"), {}, events_after[1]) - From 284e690aa0e37c0d4d7516fc2f02b2b2fede4601 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 14:56:05 +0000 Subject: [PATCH 14/17] Update changelog.d/6553.bugfix Co-Authored-By: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/6553.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/6553.bugfix b/changelog.d/6553.bugfix index e8f55e2a76..4fe576b873 100644 --- a/changelog.d/6553.bugfix +++ b/changelog.d/6553.bugfix @@ -1 +1 @@ -Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event the request is for. +Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event. From a82006954912ed96b0d47db43db44e76e5b052d6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 16 Dec 2019 16:00:18 +0000 Subject: [PATCH 15/17] Incorporate review --- synapse/handlers/room.py | 2 +- tests/rest/client/v1/test_rooms.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7f979e5812..60b8bbc7a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -941,7 +941,7 @@ class RoomContextHandler(object): if event_filter: state_events = event_filter.filter(state_events) - results["state"] = state_events + results["state"] = yield filter_evts(state_events) # We use a dummy token here as we only care about the room portion of # the token, which we replace. diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index dca0fef97a..e3af280ba6 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1609,11 +1609,6 @@ class ContextTestCase(unittest.HomeserverTestCase): account.register_servlets, ] - def make_homeserver(self, reactor, clock): - self.hs = self.setup_test_homeserver() - - return self.hs - def prepare(self, reactor, clock, homeserver): self.user_id = self.register_user("user", "password") self.tok = self.login("user", "password") From 5ca2cfadc36359c1203ea38c2d1953ce0e7ced2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Dec 2019 16:59:32 +0000 Subject: [PATCH 16/17] Add auth events as per spec. (#6556) Previously we tried to be clever and filter out some unnecessary event IDs to keep the auth chain small, but that had some annoying interactions with state res v2 so we stop doing that for now. --- changelog.d/6556.bugfix | 1 + synapse/api/auth.py | 103 ++++++++++++++-------------------------- 2 files changed, 36 insertions(+), 68 deletions(-) create mode 100644 changelog.d/6556.bugfix diff --git a/changelog.d/6556.bugfix b/changelog.d/6556.bugfix new file mode 100644 index 0000000000..e75639f5b4 --- /dev/null +++ b/changelog.d/6556.bugfix @@ -0,0 +1 @@ +Fix a cause of state resets in room versions 2 onwards. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5d0b7d2801..9fd52a8c77 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, Tuple from six import itervalues @@ -25,13 +26,7 @@ from twisted.internet import defer import synapse.logging.opentracing as opentracing import synapse.types from synapse import event_auth -from synapse.api.constants import ( - EventTypes, - JoinRules, - LimitBlockingTypes, - Membership, - UserTypes, -) +from synapse.api.constants import EventTypes, LimitBlockingTypes, Membership, UserTypes from synapse.api.errors import ( AuthError, Codes, @@ -513,71 +508,43 @@ class Auth(object): """ return self.store.is_server_admin(user) - @defer.inlineCallbacks - def compute_auth_events(self, event, current_state_ids, for_verification=False): + def compute_auth_events( + self, + event, + current_state_ids: Dict[Tuple[str, str], str], + for_verification: bool = False, + ): + """Given an event and current state return the list of event IDs used + to auth an event. + + If `for_verification` is False then only return auth events that + should be added to the event's `auth_events`. + + Returns: + defer.Deferred(list[str]): List of event IDs. + """ + if event.type == EventTypes.Create: - return [] + return defer.succeed([]) + + # Currently we ignore the `for_verification` flag even though there are + # some situations where we can drop particular auth events when adding + # to the event's `auth_events` (e.g. joins pointing to previous joins + # when room is publically joinable). Dropping event IDs has the + # advantage that the auth chain for the room grows slower, but we use + # the auth chain in state resolution v2 to order events, which means + # care must be taken if dropping events to ensure that it doesn't + # introduce undesirable "state reset" behaviour. + # + # All of which sounds a bit tricky so we don't bother for now. auth_ids = [] + for etype, state_key in event_auth.auth_types_for_event(event): + auth_ev_id = current_state_ids.get((etype, state_key)) + if auth_ev_id: + auth_ids.append(auth_ev_id) - key = (EventTypes.PowerLevels, "") - power_level_event_id = current_state_ids.get(key) - - if power_level_event_id: - auth_ids.append(power_level_event_id) - - key = (EventTypes.JoinRules, "") - join_rule_event_id = current_state_ids.get(key) - - key = (EventTypes.Member, event.sender) - member_event_id = current_state_ids.get(key) - - key = (EventTypes.Create, "") - create_event_id = current_state_ids.get(key) - if create_event_id: - auth_ids.append(create_event_id) - - if join_rule_event_id: - join_rule_event = yield self.store.get_event(join_rule_event_id) - join_rule = join_rule_event.content.get("join_rule") - is_public = join_rule == JoinRules.PUBLIC if join_rule else False - else: - is_public = False - - if event.type == EventTypes.Member: - e_type = event.content["membership"] - if e_type in [Membership.JOIN, Membership.INVITE]: - if join_rule_event_id: - auth_ids.append(join_rule_event_id) - - if e_type == Membership.JOIN: - if member_event_id and not is_public: - auth_ids.append(member_event_id) - else: - if member_event_id: - auth_ids.append(member_event_id) - - if for_verification: - key = (EventTypes.Member, event.state_key) - existing_event_id = current_state_ids.get(key) - if existing_event_id: - auth_ids.append(existing_event_id) - - if e_type == Membership.INVITE: - if "third_party_invite" in event.content: - key = ( - EventTypes.ThirdPartyInvite, - event.content["third_party_invite"]["signed"]["token"], - ) - third_party_invite_id = current_state_ids.get(key) - if third_party_invite_id: - auth_ids.append(third_party_invite_id) - elif member_event_id: - member_event = yield self.store.get_event(member_event_id) - if member_event.content["membership"] == Membership.JOIN: - auth_ids.append(member_event.event_id) - - return auth_ids + return defer.succeed(auth_ids) @defer.inlineCallbacks def check_can_change_room_list(self, room_id, user): From 50294225300602ec91712963736a523738195a01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Dec 2019 15:06:08 +0000 Subject: [PATCH 17/17] Fix bug where we added duplicate event IDs as auth_events (#6560) --- changelog.d/6560.bugfix | 1 + synapse/event_auth.py | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) create mode 100644 changelog.d/6560.bugfix diff --git a/changelog.d/6560.bugfix b/changelog.d/6560.bugfix new file mode 100644 index 0000000000..e75639f5b4 --- /dev/null +++ b/changelog.d/6560.bugfix @@ -0,0 +1 @@ +Fix a cause of state resets in room versions 2 onwards. diff --git a/synapse/event_auth.py b/synapse/event_auth.py index d184b0273b..350ed9351f 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Set, Tuple from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes @@ -637,7 +638,7 @@ def get_public_keys(invite_event): return public_keys -def auth_types_for_event(event): +def auth_types_for_event(event) -> Set[Tuple[str]]: """Given an event, return a list of (EventType, StateKey) that may be needed to auth the event. The returned list may be a superset of what would actually be required depending on the full state of the room. @@ -646,20 +647,20 @@ def auth_types_for_event(event): actually auth the event. """ if event.type == EventTypes.Create: - return [] + return set() - auth_types = [ + auth_types = { (EventTypes.PowerLevels, ""), (EventTypes.Member, event.sender), (EventTypes.Create, ""), - ] + } if event.type == EventTypes.Member: membership = event.content["membership"] if membership in [Membership.JOIN, Membership.INVITE]: - auth_types.append((EventTypes.JoinRules, "")) + auth_types.add((EventTypes.JoinRules, "")) - auth_types.append((EventTypes.Member, event.state_key)) + auth_types.add((EventTypes.Member, event.state_key)) if membership == Membership.INVITE: if "third_party_invite" in event.content: @@ -667,6 +668,6 @@ def auth_types_for_event(event): EventTypes.ThirdPartyInvite, event.content["third_party_invite"]["signed"]["token"], ) - auth_types.append(key) + auth_types.add(key) return auth_types