Merge pull request #2841 from matrix-org/rav/refactor_calc_state_delta
factor _get_new_state_after_events out of _calculate_state_deltapull/2849/head
commit
78d6ddba86
|
@ -386,11 +386,18 @@ class EventsStore(SQLBaseStore):
|
||||||
if all_single_prev_not_state:
|
if all_single_prev_not_state:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
state = yield self._calculate_state_delta(
|
logger.info(
|
||||||
room_id, ev_ctx_rm, new_latest_event_ids
|
"Calculating state delta for room %s", room_id,
|
||||||
)
|
)
|
||||||
if state:
|
current_state = yield self._get_new_state_after_events(
|
||||||
current_state_for_room[room_id] = state
|
ev_ctx_rm, new_latest_event_ids,
|
||||||
|
)
|
||||||
|
if current_state is not None:
|
||||||
|
delta = yield self._calculate_state_delta(
|
||||||
|
room_id, current_state,
|
||||||
|
)
|
||||||
|
if delta is not None:
|
||||||
|
current_state_for_room[room_id] = delta
|
||||||
|
|
||||||
yield self.runInteraction(
|
yield self.runInteraction(
|
||||||
"persist_events",
|
"persist_events",
|
||||||
|
@ -467,20 +474,22 @@ class EventsStore(SQLBaseStore):
|
||||||
defer.returnValue(new_latest_event_ids)
|
defer.returnValue(new_latest_event_ids)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
|
def _get_new_state_after_events(self, events_context, new_latest_event_ids):
|
||||||
"""Calculate the new state deltas for a room.
|
"""Calculate the current state dict after adding some new events to
|
||||||
|
a room
|
||||||
|
|
||||||
Assumes that we are only persisting events for one room at a time.
|
Args:
|
||||||
|
events_context (list[(EventBase, EventContext)]):
|
||||||
|
events and contexts which are being added to the room
|
||||||
|
|
||||||
|
new_latest_event_ids (iterable[str]):
|
||||||
|
the new forward extremities for the room.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
3-tuple (to_delete, to_insert, new_state) where both are state dicts,
|
Deferred[dict[(str,str), str]|None]:
|
||||||
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
|
None if there are no changes to the room state, or
|
||||||
first be deleted from current_state_events, `to_insert` are entries
|
a dict of (type, state_key) -> event_id].
|
||||||
to insert. `new_state` is the full set of state.
|
|
||||||
May return None if there are no changes to be applied.
|
|
||||||
"""
|
"""
|
||||||
# Now we need to work out the different state sets for
|
|
||||||
# each state extremities
|
|
||||||
state_sets = []
|
state_sets = []
|
||||||
state_groups = set()
|
state_groups = set()
|
||||||
missing_event_ids = []
|
missing_event_ids = []
|
||||||
|
@ -523,12 +532,12 @@ class EventsStore(SQLBaseStore):
|
||||||
state_sets.extend(group_to_state.itervalues())
|
state_sets.extend(group_to_state.itervalues())
|
||||||
|
|
||||||
if not new_latest_event_ids:
|
if not new_latest_event_ids:
|
||||||
current_state = {}
|
defer.returnValue({})
|
||||||
elif was_updated:
|
elif was_updated:
|
||||||
if len(state_sets) == 1:
|
if len(state_sets) == 1:
|
||||||
# If there is only one state set, then we know what the current
|
# If there is only one state set, then we know what the current
|
||||||
# state is.
|
# state is.
|
||||||
current_state = state_sets[0]
|
defer.returnValue(state_sets[0])
|
||||||
else:
|
else:
|
||||||
# We work out the current state by passing the state sets to the
|
# We work out the current state by passing the state sets to the
|
||||||
# state resolution algorithm. It may ask for some events, including
|
# state resolution algorithm. It may ask for some events, including
|
||||||
|
@ -537,8 +546,7 @@ class EventsStore(SQLBaseStore):
|
||||||
# up in the db.
|
# up in the db.
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Resolving state for %s with %i state sets",
|
"Resolving state with %i state sets", len(state_sets),
|
||||||
room_id, len(state_sets),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
events_map = {ev.event_id: ev for ev, _ in events_context}
|
events_map = {ev.event_id: ev for ev, _ in events_context}
|
||||||
|
@ -567,9 +575,22 @@ class EventsStore(SQLBaseStore):
|
||||||
state_sets,
|
state_sets,
|
||||||
state_map_factory=get_events,
|
state_map_factory=get_events,
|
||||||
)
|
)
|
||||||
|
defer.returnValue(current_state)
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _calculate_state_delta(self, room_id, current_state):
|
||||||
|
"""Calculate the new state deltas for a room.
|
||||||
|
|
||||||
|
Assumes that we are only persisting events for one room at a time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
3-tuple (to_delete, to_insert, new_state) where both are state dicts,
|
||||||
|
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
|
||||||
|
first be deleted from current_state_events, `to_insert` are entries
|
||||||
|
to insert. `new_state` is the full set of state.
|
||||||
|
"""
|
||||||
existing_state = yield self.get_current_state_ids(room_id)
|
existing_state = yield self.get_current_state_ids(room_id)
|
||||||
|
|
||||||
existing_events = set(existing_state.itervalues())
|
existing_events = set(existing_state.itervalues())
|
||||||
|
|
Loading…
Reference in New Issue