diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ec32008d5a..11605b34a3 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -27,12 +27,17 @@ class EventContext(object): ] def __init__(self): + # The current state including the current event self.current_state_ids = None + # The current state excluding the current event self.prev_state_ids = None self.state_group = None + self.rejected = False self.push_actions = [] + # A previously persisted state group and a delta between that + # and this state. self.prev_group = None self.delta_ids = None diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 968b68f462..ee8b763008 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -118,8 +118,6 @@ class StateStore(SQLBaseStore): if self._have_persisted_state_group_txn(txn, context.state_group): continue - state_event_ids = dict(context.current_state_ids) - self._simple_insert_txn( txn, table="state_groups", @@ -130,49 +128,36 @@ class StateStore(SQLBaseStore): }, ) + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. if context.prev_group: potential_hops = self._count_state_group_hops_txn( txn, context.prev_group ) - if potential_hops < MAX_STATE_DELTA_HOPS: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, - ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": context.state_group, + "prev_state_group": context.prev_group, + }, + ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.items() - ], - ) - else: - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.current_state_ids.items() - ], - ) + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) else: self._simple_insert_many_txn( txn, @@ -185,7 +170,7 @@ class StateStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in state_event_ids.items() + for key, state_id in context.current_state_ids.items() ], ) @@ -202,6 +187,10 @@ class StateStore(SQLBaseStore): ) def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ if isinstance(self.database_engine, PostgresEngine): sql = (""" WITH RECURSIVE state(state_group) AS ( @@ -319,6 +308,11 @@ class StateStore(SQLBaseStore): results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. sql = (""" WITH RECURSIVE state(state_group) AS ( VALUES(?::bigint) @@ -644,6 +638,9 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): + """This background update will slowly deduplicate state by reencoding + them as deltas. + """ last_state_group = progress.get("last_state_group", 0) rows_inserted = progress.get("rows_inserted", 0) max_group = progress.get("max_group", None)