From 223341205edf725c5583a73d5680245a24790452 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:59:16 +0100 Subject: [PATCH 1/7] Don't require to_delete to have event_ids --- synapse/storage/events.py | 91 +++++++++++++++++++++++---------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 19c05dc8d6..87d0b78a2d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ import logging from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems, itervalues +from six import iteritems from six.moves import range from canonicaljson import json @@ -347,8 +347,9 @@ class EventsStore(EventsWorkerStore): # state in each room after adding these events current_state_for_room = {} - # map room_id->(to_delete, to_insert) where each entry is - # a map (type,key)->event_id giving the state delta in each + # map room_id->(to_delete, to_insert) where to_delete is a list + # of type/state keys to remove from current state, and to_insert + # is a map (type,key)->event_id giving the state delta in each # room state_delta_for_room = {} @@ -647,17 +648,16 @@ class EventsStore(EventsWorkerStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. + tuple[list, dict] (to_delete, to_insert): where to_delete are the + type/state_keys to remove from current_state_events and `to_insert` + are the updates to current_state_events. """ existing_state = yield self.get_current_state_ids(room_id) - to_delete = { - key: ev_id for key, ev_id in iteritems(existing_state) - if ev_id != current_state.get(key) - } + to_delete = [ + key for key, ev_id in iteritems(existing_state) + if key not in current_state + ] to_insert = { key: ev_id for key, ev_id in iteritems(current_state) @@ -684,10 +684,10 @@ class EventsStore(EventsWorkerStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - state_delta_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list, dict)]): The current-state delta for each room. For each room, a tuple - (to_delete, to_insert), being a list of event ids to be removed - from the current state, and a list of event ids to be added to + (to_delete, to_insert), being a list of type/state keys to be + removed from the current state, and a state set to be added to the current state. new_forward_extremeties (dict[str, list[str]]): The new forward extremities for each room. For each room, a @@ -765,9 +765,46 @@ class EventsStore(EventsWorkerStore): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple + + # First we add entries to the current_state_delta_stream. We + # do this before updating the current_state_events table so + # that we can use it to calculate the `prev_event_id`. (This + # allows us to not have to pull out the existing state + # unnecessarily). + sql = """ + INSERT INTO current_state_delta_stream + (stream_id, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ( + SELECT event_id FROM current_state_events + WHERE room_id = ? AND type = ? AND state_key = ? + ) + """ + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, None, + room_id, etype, state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + )) + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, ev_id, + room_id, etype, state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + )) + + # Now we actually update the current_state_events table + txn.executemany( - "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in itervalues(to_delete)], + "DELETE FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?", + ( + (room_id, etype, state_key) + for etype, state_key in itertools.chain(to_delete, to_insert) + ), ) self._simple_insert_many_txn( @@ -784,25 +821,6 @@ class EventsStore(EventsWorkerStore): ], ) - state_deltas = {key: None for key in to_delete} - state_deltas.update(to_insert) - - self._simple_insert_many_txn( - txn, - table="current_state_delta_stream", - values=[ - { - "stream_id": max_stream_order, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": ev_id, - "prev_event_id": to_delete.get(key, None), - } - for key, ev_id in iteritems(state_deltas) - ] - ) - txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, @@ -816,7 +834,8 @@ class EventsStore(EventsWorkerStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in state_deltas + state_key + for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member ) From a79410e7b8dce3fb8b4b40901a66fd9fde9c47a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 11:13:47 +0100 Subject: [PATCH 2/7] Have _get_new_state_after_events return delta If we have a delta from the existing to new current state, then we can reuse that rather than manually working it out by fetching both lots of state. --- synapse/storage/events.py | 63 +++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 87d0b78a2d..6d2aae9c4a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -344,7 +344,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties = {} # map room_id->(type,state_key)->event_id tracking the full - # state in each room after adding these events + # state in each room after adding these events. + # This is simply used to prefill the get_current_state_ids + # cache current_state_for_room = {} # map room_id->(to_delete, to_insert) where to_delete is a list @@ -419,28 +421,40 @@ class EventsStore(EventsWorkerStore): logger.info( "Calculating state delta for room %s", room_id, ) - with Measure( - self._clock, - "persist_events.get_new_state_after_events", + self._clock, + "persist_events.get_new_state_after_events", ): - current_state = yield self._get_new_state_after_events( + res = yield self._get_new_state_after_events( room_id, ev_ctx_rm, latest_event_ids, new_latest_event_ids, ) + current_state, delta_ids = res - if current_state is not None: - current_state_for_room[room_id] = current_state + # If either are not None then there has been a change, + # and we need to work out the delta (or use that + # given) + if delta_ids is not None: + # If there is a delta we know that we've + # only added or replaced state, never + # removed keys entirely. + state_delta_for_room[room_id] = ([], delta_ids) + elif current_state is not None: with Measure( - self._clock, - "persist_events.calculate_state_delta", + self._clock, + "persist_events.calculate_state_delta", ): delta = yield self._calculate_state_delta( room_id, current_state, ) - state_delta_for_room[room_id] = delta + state_delta_for_room[room_id] = delta + + # If we have the current_state then lets prefill + # the cache with it. + if current_state is not None: + current_state_for_room[room_id] = current_state yield self.runInteraction( "persist_events", @@ -539,9 +553,10 @@ class EventsStore(EventsWorkerStore): the new forward extremities for the room. Returns: - Deferred[dict[(str,str), str]|None]: - None if there are no changes to the room state, or - a dict of (type, state_key) -> event_id]. + Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]: + Returns a tuple of two state maps, the first being the full new current + state and the second being the delta to the existing current state. + If both are None then there has been no change. """ if not new_latest_event_ids: @@ -549,6 +564,9 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + + state_group_deltas = {} + for ev, ctx in events_context: if ctx.state_group is None: # I don't think this can happen, but let's double-check @@ -567,6 +585,9 @@ class EventsStore(EventsWorkerStore): if current_state_ids is not None: state_groups_map[ctx.state_group] = current_state_ids + if ctx.prev_group: + state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can # pull the state group from its context. @@ -608,7 +629,7 @@ class EventsStore(EventsWorkerStore): # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return + defer.returnValue((None, None)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -620,7 +641,17 @@ class EventsStore(EventsWorkerStore): if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_groups_map[new_state_groups.pop()]) + new_state_group = new_state_groups.pop() + + delta_ids = None + if len(old_state_groups) == 1: + old_state_group = old_state_groups.pop() + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + + defer.returnValue((state_groups_map[new_state_group], delta_ids)) # Ok, we need to defer to the state handler to resolve our state sets. @@ -639,7 +670,7 @@ class EventsStore(EventsWorkerStore): room_id, state_groups, events_map, get_events ) - defer.returnValue(res.state) + defer.returnValue((res.state, None)) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): From 811ac73a42db16db7211a7517ad5651a1f6aee71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 13:40:42 +0100 Subject: [PATCH 3/7] Don't fetch state from the database unless needed --- synapse/storage/events.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6d2aae9c4a..eb8bbe13ab 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -631,6 +631,24 @@ class EventsStore(EventsWorkerStore): if old_state_groups == new_state_groups: defer.returnValue((None, None)) + if len(new_state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + if len(old_state_groups) == 1: + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) + # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. missing_state = new_state_groups - set(state_groups_map) @@ -639,19 +657,7 @@ class EventsStore(EventsWorkerStore): state_groups_map.update(group_to_state) if len(new_state_groups) == 1: - # If there is only one state group, then we know what the current - # state is. - new_state_group = new_state_groups.pop() - - delta_ids = None - if len(old_state_groups) == 1: - old_state_group = old_state_groups.pop() - - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) - - defer.returnValue((state_groups_map[new_state_group], delta_ids)) + defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. From f33c596533e46335a3fc4d02236087412615ee1a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 14:28:04 +0100 Subject: [PATCH 4/7] Newsfile --- changelog.d/3595.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3595.misc diff --git a/changelog.d/3595.misc b/changelog.d/3595.misc new file mode 100644 index 0000000000..85903504cc --- /dev/null +++ b/changelog.d/3595.misc @@ -0,0 +1 @@ +Attempt to reduce amount of state pulled out of DB during persist_events From ed0dd68731fa1549b4ea9b2dab08c2afb6ccb64e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 14:31:38 +0100 Subject: [PATCH 5/7] Fixup comment (and indent) --- synapse/storage/events.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index eb8bbe13ab..76cfbc90fe 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -565,6 +565,7 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + # Map from (prev state group, new state group) -> delta state dict state_group_deltas = {} for ev, ctx in events_context: @@ -631,23 +632,24 @@ class EventsStore(EventsWorkerStore): if old_state_groups == new_state_groups: defer.returnValue((None, None)) - if len(new_state_groups) == 1: - # If there is only one state group, then we know what the current - # state is. - if len(old_state_groups) == 1: - new_state_group = next(iter(new_state_groups)) - old_state_group = next(iter(old_state_groups)) + if len(new_state_groups) == 1 and len(old_state_groups) == 1: + # If we're going from one state group to another, lets check if + # we have a delta for that transition. If we do then we can just + # return that. - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) - if delta_ids is not None: - # We have a delta from the existing to new current state, - # so lets just return that. If we happen to already have - # the current state in memory then lets also return that, - # but it doesn't matter if we don't. - new_state = state_groups_map.get(new_state_group) - defer.returnValue((new_state, delta_ids)) + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -657,6 +659,8 @@ class EventsStore(EventsWorkerStore): state_groups_map.update(group_to_state) if len(new_state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. From 8f65ab98d2ab35bbb29c23cc7978d54b12fd3db0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 15:08:01 +0100 Subject: [PATCH 6/7] Remove unnecessary iteritems --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 76cfbc90fe..9c94d3454f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -696,7 +696,7 @@ class EventsStore(EventsWorkerStore): existing_state = yield self.get_current_state_ids(room_id) to_delete = [ - key for key, ev_id in iteritems(existing_state) + key for key in existing_state if key not in current_state ] From 709c309b0e1756afd8b4514b95b0e901c567a0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Jul 2018 15:12:50 +0100 Subject: [PATCH 7/7] Expand on docstring comment about return value --- synapse/storage/events.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9c94d3454f..906a405031 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -557,6 +557,11 @@ class EventsStore(EventsWorkerStore): Returns a tuple of two state maps, the first being the full new current state and the second being the delta to the existing current state. If both are None then there has been no change. + + If there has been a change then we only return the delta if its + already been calculated. Conversely if we do know the delta then + the new current state is only returned if we've already calculated + it. """ if not new_latest_event_ids: