When we see a difference in current state, actually use state conflict resolution algorithm
parent
6375abcdac
commit
8c652a2b5f
|
@ -858,6 +858,40 @@ class FederationHandler(BaseHandler):
|
|||
# Do auth conflict res.
|
||||
logger.debug("Different auth: %s", different_auth)
|
||||
|
||||
different_events = yield defer.gatherResults(
|
||||
[
|
||||
self.store.get_event(
|
||||
d,
|
||||
allow_none=True,
|
||||
allow_rejected=False,
|
||||
)
|
||||
for d in different_auth
|
||||
if d in have_events and not have_events[d]
|
||||
],
|
||||
consumeErrors=True
|
||||
)
|
||||
|
||||
if different_events:
|
||||
local_view = dict(auth_events)
|
||||
remote_view = dict(auth_events)
|
||||
remote_view.update({
|
||||
(d.type, d.state_key) for d in different_events
|
||||
})
|
||||
|
||||
new_state, _ = self.state.resolve_events(
|
||||
[local_view, remote_view],
|
||||
event
|
||||
)
|
||||
|
||||
auth_events.update(new_state)
|
||||
|
||||
current_state = set(e.event_id for e in auth_events.values())
|
||||
different_auth = event_auth_events - current_state
|
||||
|
||||
context.current_state.update(auth_events)
|
||||
context.state_group = None
|
||||
|
||||
if different_auth and not event.internal_metadata.is_outlier():
|
||||
# Only do auth resolution if we have something new to say.
|
||||
# We can't rove an auth failure.
|
||||
do_resolution = False
|
||||
|
|
|
@ -259,13 +259,37 @@ class StateHandler(object):
|
|||
|
||||
defer.returnValue((name, state, prev_states))
|
||||
|
||||
new_state, prev_states = self._resolve_events(
|
||||
state_groups.values(), event_type, state_key
|
||||
)
|
||||
|
||||
if self._state_cache is not None:
|
||||
cache = _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=None,
|
||||
ts=self.clock.time_msec()
|
||||
)
|
||||
|
||||
self._state_cache[frozenset(event_ids)] = cache
|
||||
|
||||
defer.returnValue((None, new_state, prev_states))
|
||||
|
||||
def resolve_events(self, state_sets, event):
|
||||
if event.is_state():
|
||||
return self._resolve_events(
|
||||
state_sets, event.type, event.state_key
|
||||
)
|
||||
else:
|
||||
return self._resolve_events(state_sets)
|
||||
|
||||
def _resolve_events(self, state_sets, event_type=None, state_key=""):
|
||||
state = {}
|
||||
for group, g_state in state_groups.items():
|
||||
for s in g_state:
|
||||
for st in state_sets:
|
||||
for e in st:
|
||||
state.setdefault(
|
||||
(s.type, s.state_key),
|
||||
(e.type, e.state_key),
|
||||
{}
|
||||
)[s.event_id] = s
|
||||
)[e.event_id] = e
|
||||
|
||||
unconflicted_state = {
|
||||
k: v.values()[0] for k, v in state.items()
|
||||
|
@ -302,16 +326,7 @@ class StateHandler(object):
|
|||
new_state = unconflicted_state
|
||||
new_state.update(resolved_state)
|
||||
|
||||
if self._state_cache is not None:
|
||||
cache = _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=None,
|
||||
ts=self.clock.time_msec()
|
||||
)
|
||||
|
||||
self._state_cache[frozenset(event_ids)] = cache
|
||||
|
||||
defer.returnValue((None, new_state, prev_states))
|
||||
return new_state, prev_states
|
||||
|
||||
@log_function
|
||||
def _resolve_state_events(self, conflicted_state, auth_events):
|
||||
|
|
Loading…
Reference in New Issue