From 3a9f5bf6ddb9393b68d8da9f04b2b1162720da7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 11:26:46 +0100 Subject: [PATCH 1/7] Don't fetch state for missing events that we fetched --- synapse/handlers/federation.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ebbf844489..33321699a6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -171,6 +171,12 @@ class FederationHandler(BaseHandler): yield self._get_missing_events_for_pdu( origin, pdu, prevs, min_depth ) + + # Update the set of things we've seen after trying to + # fetch the missing stuff + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) elif prevs - seen: logger.info( "Not fetching %d missing events for room %r,event %s: %r...", From 9b147cd73049c6222fa6c3b45dfb2a360de93cc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 11:55:25 +0100 Subject: [PATCH 2/7] Remove unncessary call in _get_missing_events_for_pdu --- synapse/handlers/federation.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 33321699a6..43d1ec859d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -177,6 +177,17 @@ class FederationHandler(BaseHandler): have_seen = yield self.store.have_events( [ev for ev, _ in pdu.prev_events] ) + + seen = set(have_seen.keys()) + if prevs - seen: + logger.info( + "Still missing %d prev events for %s: %r...", + len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] + ) + else: + logger.info( + "Found all missing prev events for %s", pdu.event_id + ) elif prevs - seen: logger.info( "Not fetching %d missing events for room %r,event %s: %r...", @@ -294,19 +305,6 @@ class FederationHandler(BaseHandler): get_missing=False ) - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - seen = set(have_seen.keys()) - if prevs - seen: - logger.info( - "Still missing %d prev events for %s: %r...", - len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] - ) - else: - logger.info("Found all missing prev events for %s", pdu.event_id) - defer.returnValue(have_seen) - @log_function @defer.inlineCallbacks def _process_received_pdu(self, origin, pdu, state, auth_chain): From 2347efc065912a0dca6fd0611012b853d62e6629 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 12:46:53 +0100 Subject: [PATCH 3/7] Fixup --- synapse/handlers/federation.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43d1ec859d..be2ea0cd56 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -174,11 +174,9 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = set(have_seen.iterkeys()) if prevs - seen: logger.info( "Still missing %d prev events for %s: %r...", @@ -231,19 +229,15 @@ class FederationHandler(BaseHandler): Args: origin (str): Origin of the pdu. Will be called to get the missing events pdu: received pdu - prevs (str[]): List of event ids which we are missing + prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. - - Returns: - Deferred: updated have_seen dictionary """ # We recalculate seen, since it may have changed. have_seen = yield self.store.have_events(prevs) seen = set(have_seen.keys()) if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) + return latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id From a2c89a225c567df53cc6d29af397547f4e9a4a2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:40:31 +0100 Subject: [PATCH 4/7] Prefill state caches --- synapse/storage/_base.py | 8 ++++---- synapse/storage/events.py | 8 ++++++-- synapse/storage/state.py | 8 ++++++++ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d..58b73af7d2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..a8d1b93d99 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -370,6 +370,10 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): @@ -529,7 +533,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -682,7 +686,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a16afa8df5..1e1ce87e0e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,14 @@ class StateStore(SQLBaseStore): ], ) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=context.current_state_ids, + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", From 1827057acc2b8f821ec20538cbf781f798e34700 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 09:56:05 +0100 Subject: [PATCH 5/7] Comments --- synapse/storage/events.py | 6 +++--- synapse/storage/state.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a37b7bad5a..d946024c9b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -441,10 +441,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1e1ce87e0e..5d6f7dfa28 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,9 @@ class StateStore(SQLBaseStore): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, From 2c2dcf81d03fabfe4a018b60b4b840069627b3fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 10:00:29 +0100 Subject: [PATCH 6/7] Update comment --- synapse/storage/state.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5d6f7dfa28..03981f5d2b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -229,7 +229,8 @@ class StateStore(SQLBaseStore): # Prefill the state group cache with this group. # It's fine to use the sequence like this as the state group map - # is immutable. + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, From ef862186dd6b04210a536cbe3126e018260f8467 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 10:06:43 +0100 Subject: [PATCH 7/7] Merge together redundant calculations/logging --- synapse/handlers/federation.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index be2ea0cd56..2af9849ed0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -175,14 +175,9 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) - if prevs - seen: - logger.info( - "Still missing %d prev events for %s: %r...", - len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] - ) - else: + + if not prevs - seen: logger.info( "Found all missing prev events for %s", pdu.event_id ) @@ -193,8 +188,6 @@ class FederationHandler(BaseHandler): list(prevs - seen)[:5], ) - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) if prevs - seen: logger.info( "Still missing %d events for room %r: %r...",