Use new store.persist_events function in federation handler

erikj/persist_event_perf
Erik Johnston 2015-06-22 11:42:10 +01:00
parent b39b294d1f
commit 96be533f1f
2 changed files with 129 additions and 69 deletions

View File

@ -138,26 +138,25 @@ class FederationHandler(BaseHandler):
if state and auth_chain is not None: if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication # If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.) # layer, then we should handle them (if we haven't before.)
event_infos = []
for e in itertools.chain(auth_chain, state): for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids: if e.event_id in seen_ids:
continue continue
e.internal_metadata.outlier = True e.internal_metadata.outlier = True
try: auth_ids = [e_id for e_id, _ in e.auth_events]
auth_ids = [e_id for e_id, _ in e.auth_events] auth = {
auth = { (e.type, e.state_key): e for e in auth_chain
(e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids
if e.event_id in auth_ids }
} event_infos.append[{
yield self._handle_new_event( "event": e,
origin, e, auth_events=auth "auth_events": auth,
) }]
seen_ids.add(e.event_id) seen_ids.add(e.event_id)
except:
logger.exception( yield self._handle_new_events(origin, event_infos)
"Failed to handle state event %s",
e.event_id,
)
try: try:
_, event_stream_id, max_stream_id = yield self._handle_new_event( _, event_stream_id, max_stream_id = yield self._handle_new_event(
@ -292,38 +291,29 @@ class FederationHandler(BaseHandler):
).addErrback(unwrapFirstError) ).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results}) auth_events.update({a.event_id: a for a in results})
yield defer.gatherResults( ev_infos = []
[ for a in auth_events.values():
self._handle_new_event( if a.event_id in seen_events:
dest, a, continue
auth_events={ ev_infos.append({
(auth_events[a_id].type, auth_events[a_id].state_key): "event": a,
auth_events[a_id] "auth_events": {
for a_id, _ in a.auth_events (auth_events[a_id].type, auth_events[a_id].state_key):
}, auth_events[a_id]
) for a_id, _ in a.auth_events
for a in auth_events.values() }
if a.event_id not in seen_events })
],
consumeErrors=True,
).addErrback(unwrapFirstError)
yield defer.gatherResults( for e_id in events_to_state:
[ ev_infos.append({
self._handle_new_event( "event": event_map[e_id],
dest, event_map[e_id], "state": events_to_state[e_id],
state=events_to_state[e_id], "auth_events": {
backfilled=True, (auth_events[a_id].type, auth_events[a_id].state_key):
auth_events={ auth_events[a_id]
(auth_events[a_id].type, auth_events[a_id].state_key): for a_id, _ in event_map[e_id].auth_events
auth_events[a_id] }
for a_id, _ in event_map[e_id].auth_events })
},
)
for e_id in events_to_state
],
consumeErrors=True
).addErrback(unwrapFirstError)
events.sort(key=lambda e: e.depth) events.sort(key=lambda e: e.depth)
@ -331,10 +321,14 @@ class FederationHandler(BaseHandler):
if event in events_to_state: if event in events_to_state:
continue continue
yield self._handle_new_event( ev_infos.append({
dest, event, "event": event,
backfilled=True, })
)
yield self._handle_new_events(
dest, ev_infos,
backfilled=True,
)
defer.returnValue(events) defer.returnValue(events)
@ -600,32 +594,26 @@ class FederationHandler(BaseHandler):
# FIXME # FIXME
pass pass
yield self._handle_auth_events( # yield self._handle_auth_events(
origin, [e for e in auth_chain if e.event_id != event.event_id] # origin, [e for e in auth_chain if e.event_id != event.event_id]
) # )
@defer.inlineCallbacks ev_infos = []
def handle_state(e): for e in itertools.chain(state, auth_chain):
if e.event_id == event.event_id: if e.event_id == event.event_id:
return continue
e.internal_metadata.outlier = True e.internal_metadata.outlier = True
try: auth_ids = [e_id for e_id, _ in e.auth_events]
auth_ids = [e_id for e_id, _ in e.auth_events] ev_infos.append({
auth = { "event": e,
"auth_events": {
(e.type, e.state_key): e for e in auth_chain (e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids if e.event_id in auth_ids
} }
yield self._handle_new_event( })
origin, e, auth_events=auth
)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
yield defer.DeferredList([handle_state(e) for e in state]) yield self._handle_new_events(origin, ev_infos)
auth_ids = [e_id for e_id, _ in event.auth_events] auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = { auth_events = {
@ -1005,6 +993,67 @@ class FederationHandler(BaseHandler):
defer.returnValue((context, event_stream_id, max_stream_id)) defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
contexts = yield defer.gatherResults(
[
self._prep_event(
origin,
ev_info["event"],
state=ev_info.get("state"),
backfilled=backfilled,
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
]
)
yield self.store.persist_events(
[
(ev_info["event"], context)
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
is_new_state=(not backfilled),
)
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
event, old_state=state, outlier=outlier,
)
if not auth_events:
auth_events = context.current_state
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
if len(event.prev_events) == 1 and event.depth < 5:
c = yield self.store.get_event(
event.prev_events[0][0],
allow_none=True,
)
if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
try:
yield self.do_auth(
origin, event, context, auth_events=auth_events
)
except AuthError as e:
logger.warn(
"Rejecting %s because %s",
event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
defer.returnValue(context)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
missing): missing):

View File

@ -45,6 +45,17 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
class EventsStore(SQLBaseStore): class EventsStore(SQLBaseStore):
def persist_events(self, events_and_contexts, backfilled=False,
is_new_state=True):
return defer.gatherResults([
self.persist_event(
event, context,
backfilled=backfilled,
is_new_state=is_new_state,
)
for event, context in events_and_contexts
])
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, backfilled=False, def persist_event(self, event, context, backfilled=False,