From 78a691d005b925b6211a3d090add34c6efb1c0f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jul 2018 16:00:38 +0100 Subject: [PATCH 1/4] Split out DB writes in federation handler This will allow us to easily add an internal replication API to proxy these reqeusts to master, so that we can move federation APIs to workers. --- synapse/handlers/federation.py | 165 ++++++++++++++++----------------- synapse/storage/events.py | 14 ++- 2 files changed, 94 insertions(+), 85 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 145c1a21d4..06700d5038 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -444,7 +444,7 @@ class FederationHandler(BaseHandler): yield self._handle_new_events(origin, event_infos) try: - context, event_stream_id, max_stream_id = yield self._handle_new_event( + context = yield self._handle_new_event( origin, event, state=state, @@ -469,17 +469,6 @@ class FederationHandler(BaseHandler): except StoreError: logger.exception("Failed to store room.") - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: # Only fire user_joined_room if the user has acutally @@ -501,7 +490,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield user_joined_room(self.distributor, user, event.room_id) + yield self.user_joined_room(user, event.room_id) @log_function @defer.inlineCallbacks @@ -942,7 +931,7 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] - yield self.store.clean_room_for_join(room_id) + yield self._clean_room_for_join(room_id) handled_events = set() @@ -981,15 +970,10 @@ class FederationHandler(BaseHandler): # FIXME pass - event_stream_id, max_stream_id = yield self._persist_auth_tree( + yield self._persist_auth_tree( origin, auth_chain, state, event ) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[joinee] - ) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -1084,7 +1068,7 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin - context, event_stream_id, max_stream_id = yield self._handle_new_event( + context = yield self._handle_new_event( origin, event ) @@ -1094,20 +1078,10 @@ class FederationHandler(BaseHandler): event.signatures, ) - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield user_joined_room(self.distributor, user, event.room_id) + yield self.user_joined_room(user, event.room_id) prev_state_ids = yield context.get_prev_state_ids(self.store) @@ -1176,17 +1150,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - ) - - target_user = UserID.from_string(event.state_key) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + yield self._persist_events([(event, context)]) defer.returnValue(event) @@ -1217,17 +1181,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - ) - - target_user = UserID.from_string(event.state_key) - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + yield self._persist_events([(event, context)]) defer.returnValue(event) @@ -1318,7 +1272,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context, event_stream_id, max_stream_id = yield self._handle_new_event( + yield self._handle_new_event( origin, event ) @@ -1328,16 +1282,6 @@ class FederationHandler(BaseHandler): event.signatures, ) - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) - defer.returnValue(None) @defer.inlineCallbacks @@ -1472,9 +1416,8 @@ class FederationHandler(BaseHandler): event, context ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, + yield self._persist_events( + [(event, context)], backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. @@ -1487,15 +1430,7 @@ class FederationHandler(BaseHandler): six.reraise(tp, value, tb) - if not backfilled: - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - logcontext.run_in_background( - self.pusher_pool.on_new_notifications, - event_stream_id, max_stream_id, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): @@ -1517,7 +1452,7 @@ class FederationHandler(BaseHandler): ], consumeErrors=True, )) - yield self.store.persist_events( + yield self._persist_events( [ (ev_info["event"], context) for ev_info, context in zip(event_infos, contexts) @@ -1605,7 +1540,7 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self.store.persist_events( + yield self._persist_events( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) @@ -1616,12 +1551,10 @@ class FederationHandler(BaseHandler): event, old_state=state ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, new_event_context, + yield self._persist_events( + [(event, new_event_context)], ) - defer.returnValue((event_stream_id, max_stream_id)) - @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): """ @@ -2347,3 +2280,69 @@ class FederationHandler(BaseHandler): ) if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") + + @defer.inlineCallbacks + def _persist_events(self, event_and_contexts, backfilled=False): + """Persists events and tells the notifier/pushers about them, if + necessary. + + Args: + event_and_contexts(list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether these events are a result of + backfilling or not + + Returns: + Deferred + """ + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) + + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) + + def _notify_persisted_event(self, event, max_stream_id): + """Checks to see if notifier/pushers should be notified about the + event or not. + + Args: + event (FrozenEvent) + max_stream_id (int): The max_stream_id returned by persist_events + """ + + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + event_stream_id = event.internal_metadata.stream_ordering + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, + ) + + def _clean_room_for_join(self, room_id): + return self.store.clean_room_for_join(room_id) + + def user_joined_room(self, user, room_id): + """Called when a new user has joined the room + """ + return user_joined_room(self.distributor, user, room_id) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 200f5ec95f..e3910ed282 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -231,12 +231,18 @@ class EventsStore(EventsWorkerStore): self._state_resolution_handler = hs.get_state_resolution_handler() + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) - backfilled: ? + backfilled (bool): Whether the results are retrieved from federation + via backfill or not. Used to determine if they're "new" events + which might update the current state etc. + + Returns: + Deferred[int]: he stream ordering of the latest persisted event """ partitioned = {} for event, ctx in events_and_contexts: @@ -253,10 +259,14 @@ class EventsStore(EventsWorkerStore): for room_id in partitioned: self._maybe_start_persisting(room_id) - return make_deferred_yieldable( + yield make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) + max_persisted_id = yield self._stream_id_gen.get_current_token() + + defer.returnValue(max_persisted_id) + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False): From ad7cd95a7842bc2eafb5bd69467978e60b4ac0d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Jul 2018 15:02:57 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/3621.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3621.misc diff --git a/changelog.d/3621.misc b/changelog.d/3621.misc new file mode 100644 index 0000000000..83e5fc8aa1 --- /dev/null +++ b/changelog.d/3621.misc @@ -0,0 +1 @@ +Refactor FederationHandler to move DB writes into separate functions From 4b256b92713624018d86f7966d8d2b02122b052c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Aug 2018 13:39:07 +0100 Subject: [PATCH 3/4] _persist_auth_tree no longer returns anything --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 617452be6c..2e3cbe2aab 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -400,7 +400,7 @@ class FederationHandler(BaseHandler): ) try: - event_stream_id, max_stream_id = yield self._persist_auth_tree( + yield self._persist_auth_tree( origin, auth_chain, state, event ) except AuthError as e: From a6d7b749150d1673117f19c6a134ade8b8c2071b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Aug 2018 13:39:14 +0100 Subject: [PATCH 4/4] update docs --- synapse/handlers/federation.py | 8 +++++--- synapse/storage/events.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2e3cbe2aab..21e1c48eef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1438,6 +1438,8 @@ class FederationHandler(BaseHandler): should not depend on one another, e.g. this should be used to persist a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. + + Notifies about the events where appropriate. """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ @@ -1464,7 +1466,8 @@ class FederationHandler(BaseHandler): def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. - Persists the event seperately. + Persists the event separately. Notifies about the persisted events + where appropriate. Will attempt to fetch missing auth events. @@ -1475,8 +1478,7 @@ class FederationHandler(BaseHandler): event (Event) Returns: - 2-tuple of (event_stream_id, max_stream_id) from the persist_event - call for `event` + Deferred """ events_to_context = {} for e in itertools.chain(auth_events, state): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bbf6e42195..ee9159c102 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -242,7 +242,7 @@ class EventsStore(EventsWorkerStore): which might update the current state etc. Returns: - Deferred[int]: he stream ordering of the latest persisted event + Deferred[int]: the stream ordering of the latest persisted event """ partitioned = {} for event, ctx in events_and_contexts: