Fix backfill auth events
parent
9377509dcd
commit
93acf49e9b
|
@ -335,32 +335,59 @@ class FederationHandler(BaseHandler):
|
||||||
state_events.update({s.event_id: s for s in state})
|
state_events.update({s.event_id: s for s in state})
|
||||||
events_to_state[e_id] = state
|
events_to_state[e_id] = state
|
||||||
|
|
||||||
|
required_auth = set(
|
||||||
|
a_id
|
||||||
|
for event in events + state_events.values() + auth_events.values()
|
||||||
|
for a_id, _ in event.auth_events
|
||||||
|
)
|
||||||
|
auth_events.update({
|
||||||
|
e_id: event_map[e_id] for e_id in required_auth if e_id in event_map
|
||||||
|
})
|
||||||
|
missing_auth = required_auth - set(auth_events)
|
||||||
|
failed_to_fetch = set()
|
||||||
|
|
||||||
|
# Try and fetch any missing auth events from both DB and remote servers.
|
||||||
|
# We repeatedly do this until we stop finding new auth events.
|
||||||
|
while missing_auth - failed_to_fetch:
|
||||||
|
logger.info("Missing auth for backfill: %r", missing_auth)
|
||||||
|
ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
|
||||||
|
auth_events.update(ret_events)
|
||||||
|
|
||||||
|
required_auth.update(
|
||||||
|
a_id for event in ret_events.values() for a_id, _ in event.auth_events
|
||||||
|
)
|
||||||
|
missing_auth = required_auth - set(auth_events)
|
||||||
|
|
||||||
|
if missing_auth - failed_to_fetch:
|
||||||
|
logger.info(
|
||||||
|
"Fetching missing auth for backfill: %r",
|
||||||
|
missing_auth - failed_to_fetch
|
||||||
|
)
|
||||||
|
|
||||||
|
results = yield defer.gatherResults(
|
||||||
|
[
|
||||||
|
self.replication_layer.get_pdu(
|
||||||
|
[dest],
|
||||||
|
event_id,
|
||||||
|
outlier=True,
|
||||||
|
timeout=10000,
|
||||||
|
)
|
||||||
|
for event_id in missing_auth - failed_to_fetch
|
||||||
|
],
|
||||||
|
consumeErrors=True
|
||||||
|
).addErrback(unwrapFirstError)
|
||||||
|
auth_events.update({a.event_id: a for a in results})
|
||||||
|
required_auth.update(
|
||||||
|
a_id for event in results for a_id, _ in event.auth_events
|
||||||
|
)
|
||||||
|
missing_auth = required_auth - set(auth_events)
|
||||||
|
|
||||||
|
failed_to_fetch = missing_auth - set(auth_events)
|
||||||
|
|
||||||
seen_events = yield self.store.have_events(
|
seen_events = yield self.store.have_events(
|
||||||
set(auth_events.keys()) | set(state_events.keys())
|
set(auth_events.keys()) | set(state_events.keys())
|
||||||
)
|
)
|
||||||
|
|
||||||
all_events = events + state_events.values() + auth_events.values()
|
|
||||||
required_auth = set(
|
|
||||||
a_id for event in all_events for a_id, _ in event.auth_events
|
|
||||||
)
|
|
||||||
|
|
||||||
missing_auth = required_auth - set(auth_events)
|
|
||||||
if missing_auth:
|
|
||||||
logger.info("Missing auth for backfill: %r", missing_auth)
|
|
||||||
results = yield defer.gatherResults(
|
|
||||||
[
|
|
||||||
self.replication_layer.get_pdu(
|
|
||||||
[dest],
|
|
||||||
event_id,
|
|
||||||
outlier=True,
|
|
||||||
timeout=10000,
|
|
||||||
)
|
|
||||||
for event_id in missing_auth
|
|
||||||
],
|
|
||||||
consumeErrors=True
|
|
||||||
).addErrback(unwrapFirstError)
|
|
||||||
auth_events.update({a.event_id: a for a in results})
|
|
||||||
|
|
||||||
ev_infos = []
|
ev_infos = []
|
||||||
for a in auth_events.values():
|
for a in auth_events.values():
|
||||||
if a.event_id in seen_events:
|
if a.event_id in seen_events:
|
||||||
|
@ -372,6 +399,7 @@ class FederationHandler(BaseHandler):
|
||||||
(auth_events[a_id].type, auth_events[a_id].state_key):
|
(auth_events[a_id].type, auth_events[a_id].state_key):
|
||||||
auth_events[a_id]
|
auth_events[a_id]
|
||||||
for a_id, _ in a.auth_events
|
for a_id, _ in a.auth_events
|
||||||
|
if a_id in auth_events
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -383,6 +411,7 @@ class FederationHandler(BaseHandler):
|
||||||
(auth_events[a_id].type, auth_events[a_id].state_key):
|
(auth_events[a_id].type, auth_events[a_id].state_key):
|
||||||
auth_events[a_id]
|
auth_events[a_id]
|
||||||
for a_id, _ in event_map[e_id].auth_events
|
for a_id, _ in event_map[e_id].auth_events
|
||||||
|
if a_id in auth_events
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue