Merge pull request #3595 from matrix-org/erikj/use_deltas

Use deltas to calculate current state deltas
pull/3598/merge
Erik Johnston 2018-07-24 15:41:18 +01:00 committed by GitHub
commit 60a1d147a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 52 deletions

1
changelog.d/3595.misc Normal file
View File

@ -0,0 +1 @@
Attempt to reduce amount of state pulled out of DB during persist_events

View File

@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, itervalues
from six import iteritems
from six.moves import range
from canonicaljson import json
@ -344,11 +344,14 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties = {}
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}
# map room_id->(to_delete, to_insert) where each entry is
# a map (type,key)->event_id giving the state delta in each
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
@ -418,28 +421,40 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
with Measure(
self._clock,
"persist_events.get_new_state_after_events",
self._clock,
"persist_events.get_new_state_after_events",
):
current_state = yield self._get_new_state_after_events(
res = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
)
current_state, delta_ids = res
if current_state is not None:
current_state_for_room[room_id] = current_state
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
self._clock,
"persist_events.calculate_state_delta",
self._clock,
"persist_events.calculate_state_delta",
):
delta = yield self._calculate_state_delta(
room_id, current_state,
)
state_delta_for_room[room_id] = delta
state_delta_for_room[room_id] = delta
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
yield self.runInteraction(
"persist_events",
@ -538,9 +553,15 @@ class EventsStore(EventsWorkerStore):
the new forward extremities for the room.
Returns:
Deferred[dict[(str,str), str]|None]:
None if there are no changes to the room state, or
a dict of (type, state_key) -> event_id].
Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
Returns a tuple of two state maps, the first being the full new current
state and the second being the delta to the existing current state.
If both are None then there has been no change.
If there has been a change then we only return the delta if its
already been calculated. Conversely if we do know the delta then
the new current state is only returned if we've already calculated
it.
"""
if not new_latest_event_ids:
@ -548,6 +569,10 @@ class EventsStore(EventsWorkerStore):
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
# Map from (prev state group, new state group) -> delta state dict
state_group_deltas = {}
for ev, ctx in events_context:
if ctx.state_group is None:
# I don't think this can happen, but let's double-check
@ -566,6 +591,9 @@ class EventsStore(EventsWorkerStore):
if current_state_ids is not None:
state_groups_map[ctx.state_group] = current_state_ids
if ctx.prev_group:
state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
# pull the state group from its context.
@ -607,7 +635,26 @@ class EventsStore(EventsWorkerStore):
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
return
defer.returnValue((None, None))
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
# If we're going from one state group to another, lets check if
# we have a delta for that transition. If we do then we can just
# return that.
new_state_group = next(iter(new_state_groups))
old_state_group = next(iter(old_state_groups))
delta_ids = state_group_deltas.get(
(old_state_group, new_state_group,), None
)
if delta_ids is not None:
# We have a delta from the existing to new current state,
# so lets just return that. If we happen to already have
# the current state in memory then lets also return that,
# but it doesn't matter if we don't.
new_state = state_groups_map.get(new_state_group)
defer.returnValue((new_state, delta_ids))
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@ -619,7 +666,7 @@ class EventsStore(EventsWorkerStore):
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
defer.returnValue(state_groups_map[new_state_groups.pop()])
defer.returnValue((state_groups_map[new_state_groups.pop()], None))
# Ok, we need to defer to the state handler to resolve our state sets.
@ -638,7 +685,7 @@ class EventsStore(EventsWorkerStore):
room_id, state_groups, events_map, get_events
)
defer.returnValue(res.state)
defer.returnValue((res.state, None))
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@ -647,17 +694,16 @@ class EventsStore(EventsWorkerStore):
Assumes that we are only persisting events for one room at a time.
Returns:
2-tuple (to_delete, to_insert) where both are state dicts,
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries
to insert.
tuple[list, dict] (to_delete, to_insert): where to_delete are the
type/state_keys to remove from current_state_events and `to_insert`
are the updates to current_state_events.
"""
existing_state = yield self.get_current_state_ids(room_id)
to_delete = {
key: ev_id for key, ev_id in iteritems(existing_state)
if ev_id != current_state.get(key)
}
to_delete = [
key for key in existing_state
if key not in current_state
]
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
@ -684,10 +730,10 @@ class EventsStore(EventsWorkerStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
state_delta_for_room (dict[str, (list[str], list[str])]):
state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of event ids to be removed
from the current state, and a list of event ids to be added to
(to_delete, to_insert), being a list of type/state keys to be
removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
@ -765,9 +811,46 @@ class EventsStore(EventsWorkerStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
)
"""
txn.executemany(sql, (
(
max_stream_order, room_id, etype, state_key, None,
room_id, etype, state_key,
)
for etype, state_key in to_delete
# We sanity check that we're deleting rather than updating
if (etype, state_key) not in to_insert
))
txn.executemany(sql, (
(
max_stream_order, room_id, etype, state_key, ev_id,
room_id, etype, state_key,
)
for (etype, state_key), ev_id in iteritems(to_insert)
))
# Now we actually update the current_state_events table
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in itervalues(to_delete)],
"DELETE FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?",
(
(room_id, etype, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
),
)
self._simple_insert_many_txn(
@ -784,25 +867,6 @@ class EventsStore(EventsWorkerStore):
],
)
state_deltas = {key: None for key in to_delete}
state_deltas.update(to_insert)
self._simple_insert_many_txn(
txn,
table="current_state_delta_stream",
values=[
{
"stream_id": max_stream_order,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": ev_id,
"prev_event_id": to_delete.get(key, None),
}
for key, ev_id in iteritems(state_deltas)
]
)
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
@ -816,7 +880,8 @@ class EventsStore(EventsWorkerStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
state_key for ev_type, state_key in state_deltas
state_key
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
)