From 4678055173636f9940e77f1af35b888f99506030 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Sep 2015 11:07:22 +0100 Subject: [PATCH 1/9] Refactor do_invite_join --- synapse/handlers/federation.py | 84 ++++++++++++++++++++++------------ synapse/state.py | 3 -- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4ff20599d6..30b9982e25 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -649,35 +649,10 @@ class FederationHandler(BaseHandler): # FIXME pass - ev_infos = [] - for e in itertools.chain(state, auth_chain): - if e.event_id == event.event_id: - continue + self._check_auth_tree(auth_chain, event) - e.internal_metadata.outlier = True - auth_ids = [e_id for e_id, _ in e.auth_events] - ev_infos.append({ - "event": e, - "auth_events": { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - }) - - yield self._handle_new_events(origin, ev_infos, outliers=True) - - auth_ids = [e_id for e_id, _ in event.auth_events] - auth_events = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - - _, event_stream_id, max_stream_id = yield self._handle_new_event( - origin, - new_event, - state=state, - current_state=state, - auth_events=auth_events, + event_stream_id, max_stream_id = yield self._persist_auth_tree( + auth_chain, state, event ) with PreserveLoggingContext(): @@ -1026,6 +1001,59 @@ class FederationHandler(BaseHandler): is_new_state=(not outliers and not backfilled), ) + def _check_auth_tree(self, auth_events, event): + event_map = { + e.event_id: e + for e in auth_events + } + + create_event = None + for e in auth_events: + if (e.type, e.state_key) == (EventTypes.Create, ""): + create_event = e + break + + for e in auth_events + [event]: + a = { + (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] + for e_id, _ in e.auth_events + } + if create_event: + a[(EventTypes.Create, "")] = create_event + + self.auth.check(e, auth_events=a) + + @defer.inlineCallbacks + def _persist_auth_tree(self, auth_events, state, event): + events_to_context = {} + for e in auth_events: + ctx = yield self.state_handler.compute_event_context( + e, outlier=True, + ) + events_to_context[e.event_id] = ctx + e.internal_metadata.outlier = True + + yield self.store.persist_events( + [ + (e, events_to_context[e.event_id]) + for e in auth_events + ], + is_new_state=False, + ) + + new_event_context = yield self.state_handler.compute_event_context( + event, old_state=state, outlier=False, + ) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, new_event_context, + backfilled=False, + is_new_state=True, + current_state=state, + ) + + defer.returnValue((event_stream_id, max_stream_id)) + @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, backfilled=False, current_state=None, auth_events=None): diff --git a/synapse/state.py b/synapse/state.py index 1fe4d066bd..ed36f844cb 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor from synapse.util.caches.expiringcache import ExpiringCache from synapse.api.constants import EventTypes from synapse.api.errors import AuthError @@ -119,8 +118,6 @@ class StateHandler(object): Returns: an EventContext """ - yield run_on_reactor() - context = EventContext() if outlier: From a3e332af1930c103b8b2ece9d50edd94193761e4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Sep 2015 13:41:07 +0100 Subject: [PATCH 2/9] Don't bail out of joining if we encounter a rejected event --- scripts-dev/check_auth.py | 2 +- synapse/handlers/federation.py | 33 ++++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/scripts-dev/check_auth.py b/scripts-dev/check_auth.py index 4fa8792a5f..b362aad722 100644 --- a/scripts-dev/check_auth.py +++ b/scripts-dev/check_auth.py @@ -38,7 +38,7 @@ def check_auth(auth, auth_chain, events): print "Failed:", e.event_id, e.type, e.state_key print "Auth_events:", auth_events print ex - print json.dumps(e.get_dict(), sort_keys=True, indent=4) + # print json.dumps(e.get_dict(), sort_keys=True, indent=4) # raise print "Success:", e.event_id, e.type, e.state_key diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 30b9982e25..0f11fa390f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -649,8 +649,6 @@ class FederationHandler(BaseHandler): # FIXME pass - self._check_auth_tree(auth_chain, event) - event_stream_id, max_stream_id = yield self._persist_auth_tree( auth_chain, state, event ) @@ -1001,7 +999,16 @@ class FederationHandler(BaseHandler): is_new_state=(not outliers and not backfilled), ) - def _check_auth_tree(self, auth_events, event): + @defer.inlineCallbacks + def _persist_auth_tree(self, auth_events, state, event): + events_to_context = {} + for e in auth_events: + ctx = yield self.state_handler.compute_event_context( + e, outlier=True, + ) + events_to_context[e.event_id] = ctx + e.internal_metadata.outlier = True + event_map = { e.event_id: e for e in auth_events @@ -1021,17 +1028,17 @@ class FederationHandler(BaseHandler): if create_event: a[(EventTypes.Create, "")] = create_event - self.auth.check(e, auth_events=a) + try: + self.auth.check(e, auth_events=a) + except AuthError: + logger.warn( + "Rejecting %s because %s", + event.event_id, e.msg + ) - @defer.inlineCallbacks - def _persist_auth_tree(self, auth_events, state, event): - events_to_context = {} - for e in auth_events: - ctx = yield self.state_handler.compute_event_context( - e, outlier=True, - ) - events_to_context[e.event_id] = ctx - e.internal_metadata.outlier = True + if e == event: + raise + events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR yield self.store.persist_events( [ From 744e7d2790380b20260c0740fc68f7f49d07136b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Sep 2015 14:26:15 +0100 Subject: [PATCH 3/9] Also handle state --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0f11fa390f..b148af5390 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1002,7 +1002,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _persist_auth_tree(self, auth_events, state, event): events_to_context = {} - for e in auth_events: + for e in itertools.chain(auth_events, state): ctx = yield self.state_handler.compute_event_context( e, outlier=True, ) @@ -1020,7 +1020,7 @@ class FederationHandler(BaseHandler): create_event = e break - for e in auth_events + [event]: + for e in itertools.chain(auth_events, state, [event]): a = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] for e_id, _ in e.auth_events @@ -1033,7 +1033,7 @@ class FederationHandler(BaseHandler): except AuthError: logger.warn( "Rejecting %s because %s", - event.event_id, e.msg + e.event_id, e.msg ) if e == event: From 3a01901d6c59f540540e00835f0716dfa7f03846 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Sep 2015 14:28:57 +0100 Subject: [PATCH 4/9] Capture err --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b148af5390..fd8a86ea99 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1030,10 +1030,10 @@ class FederationHandler(BaseHandler): try: self.auth.check(e, auth_events=a) - except AuthError: + except AuthError as err: logger.warn( "Rejecting %s because %s", - e.event_id, e.msg + e.event_id, err.msg ) if e == event: From 54e688277ad86b95345c3e8d1306c7e08b0ed484 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Sep 2015 14:32:31 +0100 Subject: [PATCH 5/9] Also persist state --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fd8a86ea99..85fdf94d06 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1043,7 +1043,7 @@ class FederationHandler(BaseHandler): yield self.store.persist_events( [ (e, events_to_context[e.event_id]) - for e in auth_events + for e in itertools.chain(auth_events, state) ], is_new_state=False, ) From c34ffd2736a2042484a3593a0174df5e2d118252 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Sep 2015 16:26:03 +0100 Subject: [PATCH 6/9] Fix getting an event for a room the server forgot it was in --- synapse/handlers/federation.py | 110 ++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 49 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85fdf94d06..e79a82cfc0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -125,60 +125,72 @@ class FederationHandler(BaseHandler): ) if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") - current_state = state - event_ids = set() - if state: - event_ids |= {e.event_id for e in state} - if auth_chain: - event_ids |= {e.event_id for e in auth_chain} + try: + event_stream_id, max_stream_id = yield self._persist_auth_tree( + auth_chain, state, event + ) + except AuthError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=event.event_id, + ) - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + else: + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} - if state and auth_chain is not None: - # If we have any state or auth_chain given to us by the replication - # layer, then we should handle them (if we haven't before.) - - event_infos = [] - - for e in itertools.chain(auth_chain, state): - if e.event_id in seen_ids: - continue - e.internal_metadata.outlier = True - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - event_infos.append({ - "event": e, - "auth_events": auth, - }) - seen_ids.add(e.event_id) - - yield self._handle_new_events( - origin, - event_infos, - outliers=True + seen_ids = set( + (yield self.store.have_events(event_ids)).keys() ) - try: - _, event_stream_id, max_stream_id = yield self._handle_new_event( - origin, - event, - state=state, - backfilled=backfilled, - current_state=current_state, - ) - except AuthError as e: - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=event.event_id, - ) + if state and auth_chain is not None: + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) + + event_infos = [] + + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue + e.internal_metadata.outlier = True + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append({ + "event": e, + "auth_events": auth, + }) + seen_ids.add(e.event_id) + + yield self._handle_new_events( + origin, + event_infos, + outliers=True + ) + + try: + _, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, + event, + state=state, + backfilled=backfilled, + current_state=current_state, + ) + except AuthError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=event.event_id, + ) # if we're receiving valid events from an origin, # it's probably a good idea to mark it as not in retry-state From 51b2448e050d4944d1a5176bcfbf30a33953ca68 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Sep 2015 10:11:15 +0100 Subject: [PATCH 7/9] Revert change of scripts/check_auth.py --- scripts-dev/check_auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts-dev/check_auth.py b/scripts-dev/check_auth.py index b362aad722..4fa8792a5f 100644 --- a/scripts-dev/check_auth.py +++ b/scripts-dev/check_auth.py @@ -38,7 +38,7 @@ def check_auth(auth, auth_chain, events): print "Failed:", e.event_id, e.type, e.state_key print "Auth_events:", auth_events print ex - # print json.dumps(e.get_dict(), sort_keys=True, indent=4) + print json.dumps(e.get_dict(), sort_keys=True, indent=4) # raise print "Success:", e.event_id, e.type, e.state_key From 9d39615b7d4d5525ab814d2d84e7f2b4523d0417 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2015 16:37:59 +0100 Subject: [PATCH 8/9] Rename var --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e79a82cfc0..3ce1aee52c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1033,15 +1033,15 @@ class FederationHandler(BaseHandler): break for e in itertools.chain(auth_events, state, [event]): - a = { + auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] for e_id, _ in e.auth_events } if create_event: - a[(EventTypes.Create, "")] = create_event + auth_for_e[(EventTypes.Create, "")] = create_event try: - self.auth.check(e, auth_events=a) + self.auth.check(e, auth_events=auth_for_e) except AuthError as err: logger.warn( "Rejecting %s because %s", From 83892d0d3039965ae3075df166cbdbd7339cb0bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2015 16:41:48 +0100 Subject: [PATCH 9/9] Comment --- synapse/handlers/federation.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3ce1aee52c..17f4ddd325 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1013,6 +1013,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _persist_auth_tree(self, auth_events, state, event): + """Checks the auth chain is valid (and passes auth checks) for the + state and event. Then persists the auth chain and state atomically. + Persists the event seperately. + + Returns: + 2-tuple of (event_stream_id, max_stream_id) from the persist_event + call for `event` + """ events_to_context = {} for e in itertools.chain(auth_events, state): ctx = yield self.state_handler.compute_event_context(