Merge pull request #283 from matrix-org/erikj/atomic_join_federation
Atomically persist events when joining a room over federation/pull/293/head
commit
5879edbb09
|
@ -125,60 +125,72 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
if not is_in_room and not event.internal_metadata.is_outlier():
|
||||
logger.debug("Got event for room we're not in.")
|
||||
current_state = state
|
||||
|
||||
event_ids = set()
|
||||
if state:
|
||||
event_ids |= {e.event_id for e in state}
|
||||
if auth_chain:
|
||||
event_ids |= {e.event_id for e in auth_chain}
|
||||
try:
|
||||
event_stream_id, max_stream_id = yield self._persist_auth_tree(
|
||||
auth_chain, state, event
|
||||
)
|
||||
except AuthError as e:
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
e.code,
|
||||
e.msg,
|
||||
affected=event.event_id,
|
||||
)
|
||||
|
||||
seen_ids = set(
|
||||
(yield self.store.have_events(event_ids)).keys()
|
||||
)
|
||||
else:
|
||||
event_ids = set()
|
||||
if state:
|
||||
event_ids |= {e.event_id for e in state}
|
||||
if auth_chain:
|
||||
event_ids |= {e.event_id for e in auth_chain}
|
||||
|
||||
if state and auth_chain is not None:
|
||||
# If we have any state or auth_chain given to us by the replication
|
||||
# layer, then we should handle them (if we haven't before.)
|
||||
|
||||
event_infos = []
|
||||
|
||||
for e in itertools.chain(auth_chain, state):
|
||||
if e.event_id in seen_ids:
|
||||
continue
|
||||
e.internal_metadata.outlier = True
|
||||
auth_ids = [e_id for e_id, _ in e.auth_events]
|
||||
auth = {
|
||||
(e.type, e.state_key): e for e in auth_chain
|
||||
if e.event_id in auth_ids
|
||||
}
|
||||
event_infos.append({
|
||||
"event": e,
|
||||
"auth_events": auth,
|
||||
})
|
||||
seen_ids.add(e.event_id)
|
||||
|
||||
yield self._handle_new_events(
|
||||
origin,
|
||||
event_infos,
|
||||
outliers=True
|
||||
seen_ids = set(
|
||||
(yield self.store.have_events(event_ids)).keys()
|
||||
)
|
||||
|
||||
try:
|
||||
_, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
origin,
|
||||
event,
|
||||
state=state,
|
||||
backfilled=backfilled,
|
||||
current_state=current_state,
|
||||
)
|
||||
except AuthError as e:
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
e.code,
|
||||
e.msg,
|
||||
affected=event.event_id,
|
||||
)
|
||||
if state and auth_chain is not None:
|
||||
# If we have any state or auth_chain given to us by the replication
|
||||
# layer, then we should handle them (if we haven't before.)
|
||||
|
||||
event_infos = []
|
||||
|
||||
for e in itertools.chain(auth_chain, state):
|
||||
if e.event_id in seen_ids:
|
||||
continue
|
||||
e.internal_metadata.outlier = True
|
||||
auth_ids = [e_id for e_id, _ in e.auth_events]
|
||||
auth = {
|
||||
(e.type, e.state_key): e for e in auth_chain
|
||||
if e.event_id in auth_ids
|
||||
}
|
||||
event_infos.append({
|
||||
"event": e,
|
||||
"auth_events": auth,
|
||||
})
|
||||
seen_ids.add(e.event_id)
|
||||
|
||||
yield self._handle_new_events(
|
||||
origin,
|
||||
event_infos,
|
||||
outliers=True
|
||||
)
|
||||
|
||||
try:
|
||||
_, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
origin,
|
||||
event,
|
||||
state=state,
|
||||
backfilled=backfilled,
|
||||
current_state=current_state,
|
||||
)
|
||||
except AuthError as e:
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
e.code,
|
||||
e.msg,
|
||||
affected=event.event_id,
|
||||
)
|
||||
|
||||
# if we're receiving valid events from an origin,
|
||||
# it's probably a good idea to mark it as not in retry-state
|
||||
|
@ -649,35 +661,8 @@ class FederationHandler(BaseHandler):
|
|||
# FIXME
|
||||
pass
|
||||
|
||||
ev_infos = []
|
||||
for e in itertools.chain(state, auth_chain):
|
||||
if e.event_id == event.event_id:
|
||||
continue
|
||||
|
||||
e.internal_metadata.outlier = True
|
||||
auth_ids = [e_id for e_id, _ in e.auth_events]
|
||||
ev_infos.append({
|
||||
"event": e,
|
||||
"auth_events": {
|
||||
(e.type, e.state_key): e for e in auth_chain
|
||||
if e.event_id in auth_ids
|
||||
}
|
||||
})
|
||||
|
||||
yield self._handle_new_events(origin, ev_infos, outliers=True)
|
||||
|
||||
auth_ids = [e_id for e_id, _ in event.auth_events]
|
||||
auth_events = {
|
||||
(e.type, e.state_key): e for e in auth_chain
|
||||
if e.event_id in auth_ids
|
||||
}
|
||||
|
||||
_, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
origin,
|
||||
new_event,
|
||||
state=state,
|
||||
current_state=state,
|
||||
auth_events=auth_events,
|
||||
event_stream_id, max_stream_id = yield self._persist_auth_tree(
|
||||
auth_chain, state, event
|
||||
)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
|
@ -1026,6 +1011,76 @@ class FederationHandler(BaseHandler):
|
|||
is_new_state=(not outliers and not backfilled),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _persist_auth_tree(self, auth_events, state, event):
|
||||
"""Checks the auth chain is valid (and passes auth checks) for the
|
||||
state and event. Then persists the auth chain and state atomically.
|
||||
Persists the event seperately.
|
||||
|
||||
Returns:
|
||||
2-tuple of (event_stream_id, max_stream_id) from the persist_event
|
||||
call for `event`
|
||||
"""
|
||||
events_to_context = {}
|
||||
for e in itertools.chain(auth_events, state):
|
||||
ctx = yield self.state_handler.compute_event_context(
|
||||
e, outlier=True,
|
||||
)
|
||||
events_to_context[e.event_id] = ctx
|
||||
e.internal_metadata.outlier = True
|
||||
|
||||
event_map = {
|
||||
e.event_id: e
|
||||
for e in auth_events
|
||||
}
|
||||
|
||||
create_event = None
|
||||
for e in auth_events:
|
||||
if (e.type, e.state_key) == (EventTypes.Create, ""):
|
||||
create_event = e
|
||||
break
|
||||
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
auth_for_e = {
|
||||
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
|
||||
for e_id, _ in e.auth_events
|
||||
}
|
||||
if create_event:
|
||||
auth_for_e[(EventTypes.Create, "")] = create_event
|
||||
|
||||
try:
|
||||
self.auth.check(e, auth_events=auth_for_e)
|
||||
except AuthError as err:
|
||||
logger.warn(
|
||||
"Rejecting %s because %s",
|
||||
e.event_id, err.msg
|
||||
)
|
||||
|
||||
if e == event:
|
||||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
yield self.store.persist_events(
|
||||
[
|
||||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
],
|
||||
is_new_state=False,
|
||||
)
|
||||
|
||||
new_event_context = yield self.state_handler.compute_event_context(
|
||||
event, old_state=state, outlier=False,
|
||||
)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event, new_event_context,
|
||||
backfilled=False,
|
||||
is_new_state=True,
|
||||
current_state=state,
|
||||
)
|
||||
|
||||
defer.returnValue((event_stream_id, max_stream_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _prep_event(self, origin, event, state=None, backfilled=False,
|
||||
current_state=None, auth_events=None):
|
||||
|
|
Loading…
Reference in New Issue