Merge pull request #680 from matrix-org/markjh/remove_is_new_state
Remove the is_new_state argument to persist event.pull/681/head
commit
03e406eefc
|
@ -33,6 +33,9 @@ class _EventInternalMetadata(object):
|
|||
def is_outlier(self):
|
||||
return getattr(self, "outlier", False)
|
||||
|
||||
def is_invite_from_remote(self):
|
||||
return getattr(self, "invite_from_remote", False)
|
||||
|
||||
|
||||
def _event_dict_property(key):
|
||||
def getter(self):
|
||||
|
|
|
@ -102,8 +102,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_receive_pdu(self, origin, pdu, state=None,
|
||||
auth_chain=None):
|
||||
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
|
||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||
do auth checks and put it through the StateHandler.
|
||||
"""
|
||||
|
@ -174,11 +173,7 @@ class FederationHandler(BaseHandler):
|
|||
})
|
||||
seen_ids.add(e.event_id)
|
||||
|
||||
yield self._handle_new_events(
|
||||
origin,
|
||||
event_infos,
|
||||
outliers=True
|
||||
)
|
||||
yield self._handle_new_events(origin, event_infos)
|
||||
|
||||
try:
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
|
@ -761,6 +756,7 @@ class FederationHandler(BaseHandler):
|
|||
event = pdu
|
||||
|
||||
event.internal_metadata.outlier = True
|
||||
event.internal_metadata.invite_from_remote = True
|
||||
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
|
@ -1069,9 +1065,6 @@ class FederationHandler(BaseHandler):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _handle_new_event(self, origin, event, state=None, auth_events=None):
|
||||
|
||||
outlier = event.internal_metadata.is_outlier()
|
||||
|
||||
context = yield self._prep_event(
|
||||
origin, event,
|
||||
state=state,
|
||||
|
@ -1087,14 +1080,12 @@ class FederationHandler(BaseHandler):
|
|||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
is_new_state=not outlier,
|
||||
)
|
||||
|
||||
defer.returnValue((context, event_stream_id, max_stream_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_new_events(self, origin, event_infos, backfilled=False,
|
||||
outliers=False):
|
||||
def _handle_new_events(self, origin, event_infos, backfilled=False):
|
||||
contexts = yield defer.gatherResults(
|
||||
[
|
||||
self._prep_event(
|
||||
|
@ -1113,7 +1104,6 @@ class FederationHandler(BaseHandler):
|
|||
for ev_info, context in itertools.izip(event_infos, contexts)
|
||||
],
|
||||
backfilled=backfilled,
|
||||
is_new_state=(not outliers and not backfilled),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -1176,7 +1166,6 @@ class FederationHandler(BaseHandler):
|
|||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
],
|
||||
is_new_state=False,
|
||||
)
|
||||
|
||||
new_event_context = yield self.state_handler.compute_event_context(
|
||||
|
@ -1185,7 +1174,6 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event, new_event_context,
|
||||
is_new_state=True,
|
||||
current_state=state,
|
||||
)
|
||||
|
||||
|
|
|
@ -61,8 +61,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def persist_events(self, events_and_contexts, backfilled=False,
|
||||
is_new_state=True):
|
||||
def persist_events(self, events_and_contexts, backfilled=False):
|
||||
if not events_and_contexts:
|
||||
return
|
||||
|
||||
|
@ -110,13 +109,11 @@ class EventsStore(SQLBaseStore):
|
|||
self._persist_events_txn,
|
||||
events_and_contexts=chunk,
|
||||
backfilled=backfilled,
|
||||
is_new_state=is_new_state,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def persist_event(self, event, context,
|
||||
is_new_state=True, current_state=None):
|
||||
def persist_event(self, event, context, current_state=None):
|
||||
|
||||
try:
|
||||
with self._stream_id_gen.get_next() as stream_ordering:
|
||||
|
@ -128,7 +125,6 @@ class EventsStore(SQLBaseStore):
|
|||
self._persist_event_txn,
|
||||
event=event,
|
||||
context=context,
|
||||
is_new_state=is_new_state,
|
||||
current_state=current_state,
|
||||
)
|
||||
except _RollbackButIsFineException:
|
||||
|
@ -194,8 +190,7 @@ class EventsStore(SQLBaseStore):
|
|||
defer.returnValue({e.event_id: e for e in events})
|
||||
|
||||
@log_function
|
||||
def _persist_event_txn(self, txn, event, context,
|
||||
is_new_state, current_state):
|
||||
def _persist_event_txn(self, txn, event, context, current_state):
|
||||
# We purposefully do this first since if we include a `current_state`
|
||||
# key, we *want* to update the `current_state_events` table
|
||||
if current_state:
|
||||
|
@ -236,12 +231,10 @@ class EventsStore(SQLBaseStore):
|
|||
txn,
|
||||
[(event, context)],
|
||||
backfilled=False,
|
||||
is_new_state=is_new_state,
|
||||
)
|
||||
|
||||
@log_function
|
||||
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
|
||||
is_new_state):
|
||||
def _persist_events_txn(self, txn, events_and_contexts, backfilled):
|
||||
depth_updates = {}
|
||||
for event, context in events_and_contexts:
|
||||
# Remove the any existing cache entries for the event_ids
|
||||
|
@ -452,10 +445,9 @@ class EventsStore(SQLBaseStore):
|
|||
txn, [event for event, _ in events_and_contexts]
|
||||
)
|
||||
|
||||
state_events_and_contexts = filter(
|
||||
lambda i: i[0].is_state(),
|
||||
events_and_contexts,
|
||||
)
|
||||
state_events_and_contexts = [
|
||||
ec for ec in events_and_contexts if ec[0].is_state()
|
||||
]
|
||||
|
||||
state_values = []
|
||||
for event, context in state_events_and_contexts:
|
||||
|
@ -493,32 +485,50 @@ class EventsStore(SQLBaseStore):
|
|||
],
|
||||
)
|
||||
|
||||
if is_new_state:
|
||||
for event, _ in state_events_and_contexts:
|
||||
if not context.rejected:
|
||||
txn.call_after(
|
||||
self._get_current_state_for_key.invalidate,
|
||||
(event.room_id, event.type, event.state_key,)
|
||||
)
|
||||
if backfilled:
|
||||
# Backfilled events come before the current state so we don't need
|
||||
# to update the current state table
|
||||
return
|
||||
|
||||
if event.type in [EventTypes.Name, EventTypes.Aliases]:
|
||||
txn.call_after(
|
||||
self.get_room_name_and_aliases.invalidate,
|
||||
(event.room_id,)
|
||||
)
|
||||
for event, _ in state_events_and_contexts:
|
||||
if (not event.internal_metadata.is_invite_from_remote()
|
||||
and event.internal_metadata.is_outlier()):
|
||||
# Outlier events generally shouldn't clobber the current state.
|
||||
# However invites from remote severs for rooms we aren't in
|
||||
# are a bit special: they don't come with any associated
|
||||
# state so are technically an outlier, however all the
|
||||
# client-facing code assumes that they are in the current
|
||||
# state table so we insert the event anyway.
|
||||
continue
|
||||
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
"current_state_events",
|
||||
keyvalues={
|
||||
"room_id": event.room_id,
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
},
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
}
|
||||
)
|
||||
if context.rejected:
|
||||
# If the event failed it's auth checks then it shouldn't
|
||||
# clobbler the current state.
|
||||
continue
|
||||
|
||||
txn.call_after(
|
||||
self._get_current_state_for_key.invalidate,
|
||||
(event.room_id, event.type, event.state_key,)
|
||||
)
|
||||
|
||||
if event.type in [EventTypes.Name, EventTypes.Aliases]:
|
||||
txn.call_after(
|
||||
self.get_room_name_and_aliases.invalidate,
|
||||
(event.room_id,)
|
||||
)
|
||||
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
"current_state_events",
|
||||
keyvalues={
|
||||
"room_id": event.room_id,
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
},
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
}
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in New Issue