Implement persist_event*s*

erikj/persist_event_perf
Erik Johnston 2015-06-22 16:55:09 +01:00
parent 03a9e4436b
commit 96ce61b2b1
6 changed files with 274 additions and 187 deletions

View File

@ -995,6 +995,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
logger.debug("_handle_new_events: %r", event_infos)
contexts = yield defer.gatherResults(
[
self._prep_event(
@ -1008,6 +1009,8 @@ class FederationHandler(BaseHandler):
]
)
logger.debug("_handle_new_events2: %d, %d", len(event_infos), len(contexts))
yield self.store.persist_events(
[
(ev_info["event"], context)
@ -1017,6 +1020,8 @@ class FederationHandler(BaseHandler):
is_new_state=(not backfilled),
)
logger.debug("_handle_new_events3: %r", event_infos)
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):

View File

@ -356,6 +356,25 @@ class EventFederationStore(SQLBaseStore):
For the given event, update the event edges table and forward and
backward extremities tables.
"""
logger.debug(
"_handle_mult_prev_events event[0]: %s",
events[0],
)
logger.debug(
"_handle_mult_prev_events thing: %s",
[
[e_id for e_id, _ in ev.prev_events]
for ev in events
],
)
logger.debug(
"_handle_mult_prev_events thing2: %s",
[
(ev.event_id, e_id)
for ev in events
for e_id, _ in ev.prev_events
],
)
self._simple_insert_many_txn(
txn,
table="event_edges",
@ -366,7 +385,8 @@ class EventFederationStore(SQLBaseStore):
"room_id": ev.room_id,
"is_state": False,
}
for e_id in [e_id for ev in events for e_id, _ in ev.prev_events]
for ev in events
for e_id, _ in ev.prev_events
],
)
@ -413,11 +433,6 @@ class EventFederationStore(SQLBaseStore):
" )"
)
prev_events = [
e_id for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
]
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
for ev in events for e_id, _ in ev.prev_events

View File

@ -23,7 +23,6 @@ from synapse.events.utils import prune_event
from synapse.util.logcontext import preserve_context_over_deferred
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from synapse.crypto.event_signing import compute_event_reference_hash
from syutil.jsonutil import encode_json
from contextlib import contextmanager
@ -45,16 +44,39 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False,
is_new_state=True):
return defer.gatherResults([
self.persist_event(
event, context,
if not events_and_contexts:
return
if backfilled:
if not self.min_token_deferred.called:
yield self.min_token_deferred
start = self.min_token - 1
self.min_token -= len(events_and_contexts) + 1
stream_orderings = range(start, self.min_token, -1)
@contextmanager
def stream_ordering_manager():
yield stream_orderings
stream_ordering_manager = stream_ordering_manager()
else:
stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
self, len(events_and_contexts)
)
with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
yield self.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
is_new_state=is_new_state,
)
for event, context in events_and_contexts
])
@defer.inlineCallbacks
@log_function
@ -77,6 +99,7 @@ class EventsStore(SQLBaseStore):
try:
with stream_ordering_manager as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
@ -219,7 +242,7 @@ class EventsStore(SQLBaseStore):
)
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
self._store_room_members_txn(txn, [event])
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
@ -327,10 +350,7 @@ class EventsStore(SQLBaseStore):
],
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
)
self._store_event_reference_hashes_txn(txn, [event])
if event.is_state():
vals = {
@ -393,44 +413,60 @@ class EventsStore(SQLBaseStore):
return
@log_function
def _persist_events_txn(self, txn, event_and_contexts, backfilled,
stream_ordering, is_new_state=True):
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True):
# Remove the any existing cache entries for the event_ids
for event, _ in event_and_contexts:
for event, _ in events_and_contexts:
txn.call_after(self._invalidate_get_event_cache, event.event_id)
depth_updates = {}
for event, _ in event_and_contexts:
for event, _ in events_and_contexts:
if event.internal_metadata.is_outlier():
continue
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
)
for room_id, depth in depth_updates:
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in %s" % (
",".join(["?"] * len(event_and_contexts)),
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
",".join(["?"] * len(events_and_contexts)),
),
[event.event_id for event, _ in event_and_contexts]
[event.event_id for event, _ in events_and_contexts]
)
have_persisted = {
event_id: outlier
for event_id, outlier in txn.fetchall()
}
event_map = {}
to_remove = set()
for event, context in events_and_contexts:
# Handle the case of the list including the same event multiple
# times. The tricky thing here is when they differ by whether
# they are an outlier.
if event.event_id in event_map:
other = event_map[event.event_id]
# metadata_json = encode_json(
# event.internal_metadata.get_dict(),
# using_frozen_dicts=USE_FROZEN_DICTS
# ).decode("UTF-8")
if not other.internal_metadata.is_outlier():
to_remove.add(event)
continue
elif not event.internal_metadata.is_outlier():
to_remove.add(event)
continue
else:
to_remove.add(other)
event_map[event.event_id] = event
for event, context in event_and_contexts:
if event.event_id not in have_persisted:
continue
to_remove.add(event)
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
self._store_state_groups_txn(
@ -460,95 +496,87 @@ class EventsStore(SQLBaseStore):
(False, event.event_id,)
)
events_and_contexts = filter(
lambda ec: ec[0] not in to_remove,
events_and_contexts
)
self._store_mult_state_groups_txn(txn, [
(event, context)
for event, context in event_and_contexts
for event, context in events_and_contexts
if not event.internal_metadata.is_outlier
])
self._handle_prev_events(
self._handle_mult_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
events=[event for event, _ in events_and_contexts],
)
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
event_dict = {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
self._store_room_members_txn(
txn,
[
event
for event, _ in events_and_contexts
if event.type == EventTypes.Member
]
}
)
self._simple_insert_txn(
def event_dict(event):
return {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
self._simple_insert_many_txn(
txn,
table="event_json",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": metadata_json,
"json": encode_json(
event_dict, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
},
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": encode_json(
event.internal_metadata.get_dict(),
using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
"json": encode_json(
event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
)
content = encode_json(
event.content, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8")
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
}
unrec = {
k: v
for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
"signatures",
"hashes",
"prev_events",
]
}
vals["unrecognized_keys"] = encode_json(
unrec, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8")
sql = (
"INSERT INTO events"
" (stream_ordering, topological_ordering, event_id, type,"
" room_id, content, processed, outlier, depth)"
" VALUES (?,?,?,?,?,?,?,?,?)"
)
txn.execute(
sql,
(
stream_ordering, event.depth, event.event_id, event.type,
event.room_id, content, True, outlier, event.depth
)
self._simple_insert_many_txn(
txn,
table="events",
values=[
{
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"depth": event.depth,
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(
event.content, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
)
if context.rejected:
@ -579,16 +607,25 @@ class EventsStore(SQLBaseStore):
"room_id": event.room_id,
"auth_id": auth_id,
}
for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
],
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts]
)
if event.is_state():
state_events_and_contexts = filter(
lambda i: i[0].is_state(),
events_and_contexts,
)
for event, context in state_events_and_contexts:
pass
state_values = []
for event, context in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@ -600,51 +637,56 @@ class EventsStore(SQLBaseStore):
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
self._simple_insert_txn(
txn,
"state_events",
vals,
)
state_values.append(vals)
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": True,
}
for e_id, h in event.prev_state
],
)
self._simple_insert_many_txn(
txn,
table="state_events",
values=state_values,
)
if is_new_state and not context.rejected:
txn.call_after(
self.get_current_state_for_key.invalidate,
event.room_id, event.type, event.state_key
)
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": event.event_id,
"prev_event_id": prev_id,
"room_id": event.room_id,
"is_state": True,
}
for event, _ in state_events_and_contexts
for prev_id, _ in event.prev_state
],
)
if (event.type == EventTypes.Name
or event.type == EventTypes.Aliases):
if is_new_state:
for event, _ in state_events_and_contexts:
if not context.rejected:
txn.call_after(
self.get_room_name_and_aliases.invalidate,
event.room_id
)
self.get_current_state_for_key.invalidate,
event.room_id, event.type, event.state_key
)
self._simple_upsert_txn(
txn,
"current_state_events",
keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
values={
"event_id": event.event_id,
}
)
if (event.type == EventTypes.Name
or event.type == EventTypes.Aliases):
txn.call_after(
self.get_room_name_and_aliases.invalidate,
event.room_id
)
self._simple_upsert_txn(
txn,
"current_state_events",
keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
values={
"event_id": event.event_id,
}
)
return

View File

@ -35,38 +35,28 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
def _store_room_member_txn(self, txn, event):
def _store_room_members_txn(self, txn, events):
"""Store a room member in the database.
"""
try:
target_user_id = event.state_key
except:
logger.exception(
"Failed to parse target_user_id=%s", target_user_id
)
raise
logger.debug(
"_store_room_member_txn: target_user_id=%s, membership=%s",
target_user_id,
event.membership,
)
self._simple_insert_txn(
self._simple_insert_many_txn(
txn,
"room_memberships",
{
"event_id": event.event_id,
"user_id": target_user_id,
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
}
table="room_memberships",
values=[
{
"event_id": event.event_id,
"user_id": event.state_key,
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
}
for event in events
]
)
txn.call_after(self.get_rooms_for_user.invalidate, target_user_id)
txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
txn.call_after(self.get_users_in_room.invalidate, event.room_id)
for event in events:
txn.call_after(self.get_rooms_for_user.invalidate, event.state_key)
txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
txn.call_after(self.get_users_in_room.invalidate, event.room_id)
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.

View File

@ -18,6 +18,7 @@ from twisted.internet import defer
from _base import SQLBaseStore
from syutil.base64util import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
class SignatureStore(SQLBaseStore):
@ -101,23 +102,26 @@ class SignatureStore(SQLBaseStore):
txn.execute(query, (event_id, ))
return {k: v for k, v in txn.fetchall()}
def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
hash_bytes):
def _store_event_reference_hashes_txn(self, txn, events):
"""Store a hash for a PDU
Args:
txn (cursor):
event_id (str): Id for the Event.
algorithm (str): Hashing algorithm.
hash_bytes (bytes): Hash function output bytes.
events (list): list of Events.
"""
self._simple_insert_txn(
vals = []
for event in events:
ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
vals.append({
"event_id": event.event_id,
"algorithm": ref_alg,
"hash": buffer(ref_hash_bytes),
})
self._simple_insert_many_txn(
txn,
"event_reference_hashes",
{
"event_id": event_id,
"algorithm": algorithm,
"hash": buffer(hash_bytes),
},
table="event_reference_hashes",
values=vals,
)
def _get_event_signatures_txn(self, txn, event_id):

View File

@ -107,6 +107,37 @@ class StreamIdGenerator(object):
defer.returnValue(manager())
@defer.inlineCallbacks
def get_next_mult(self, store, n):
"""
Usage:
with yield stream_id_gen.get_next(store, n) as stream_ids:
# ... persist events ...
"""
if not self._current_max:
yield store.runInteraction(
"_compute_current_max",
self._get_or_compute_current_max,
)
with self._lock:
next_ids = range(self._current_max + 1, self._current_max + n + 1)
self._current_max += n
for next_id in next_ids:
self._unfinished_ids.append(next_id)
@contextlib.contextmanager
def manager():
try:
yield next_ids
finally:
with self._lock:
for next_id in next_ids:
self._unfinished_ids.remove(next_id)
defer.returnValue(manager())
@defer.inlineCallbacks
def get_max_token(self, store):
"""Returns the maximum stream id such that all stream ids less than or