Correctly filter out extremities with soft failed prevs (#5274)
When we receive a soft failed event we, correctly, *do not* update the forward extremity table with the event. However, if we later receive an event that references the soft failed event we then need to remove the soft failed events prev events from the forward extremities table, otherwise we just build up forward extremities. Fixes #5269release-v0.99.5
parent
3d5bba581b
commit
df9d900544
|
@ -0,0 +1 @@
|
|||
Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
|
|
@ -554,10 +554,18 @@ class EventsStore(
|
|||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
|
||||
# Finally, remove any events which are prev_events of any existing events.
|
||||
# Remove any events which are prev_events of any existing events.
|
||||
existing_prevs = yield self._get_events_which_are_prevs(result)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
# Finally handle the case where the new events have soft-failed prev
|
||||
# events. If they do we need to remove them and their prev events,
|
||||
# otherwise we end up with dangling extremities.
|
||||
existing_prevs = yield self._get_prevs_before_rejected(
|
||||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -573,7 +581,7 @@ class EventsStore(
|
|||
"""
|
||||
results = []
|
||||
|
||||
def _get_events(txn, batch):
|
||||
def _get_events_which_are_prevs_txn(txn, batch):
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
|
@ -596,10 +604,78 @@ class EventsStore(
|
|||
)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
|
||||
yield self.runInteraction(
|
||||
"_get_events_which_are_prevs",
|
||||
_get_events_which_are_prevs_txn,
|
||||
chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_prevs_before_rejected(self, event_ids):
|
||||
"""Get soft-failed ancestors to remove from the extremities.
|
||||
|
||||
Given a set of events, find all those that have been soft-failed or
|
||||
rejected. Returns those soft failed/rejected events and their prev
|
||||
events (whether soft-failed/rejected or not), and recurses up the
|
||||
prev-event graph until it finds no more soft-failed/rejected events.
|
||||
|
||||
This is used to find extremities that are ancestors of new events, but
|
||||
are separated by soft failed events.
|
||||
|
||||
Args:
|
||||
event_ids (Iterable[str]): Events to find prev events for. Note
|
||||
that these must have already been persisted.
|
||||
|
||||
Returns:
|
||||
Deferred[set[str]]
|
||||
"""
|
||||
|
||||
# The set of event_ids to return. This includes all soft-failed events
|
||||
# and their prev events.
|
||||
existing_prevs = set()
|
||||
|
||||
def _get_prevs_before_rejected_txn(txn, batch):
|
||||
to_recursively_check = batch
|
||||
|
||||
while to_recursively_check:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
event_id IN (%s)
|
||||
AND NOT events.outlier
|
||||
""" % (
|
||||
",".join("?" for _ in to_recursively_check),
|
||||
)
|
||||
|
||||
txn.execute(sql, to_recursively_check)
|
||||
to_recursively_check = []
|
||||
|
||||
for event_id, prev_event_id, metadata, rejected in txn:
|
||||
if prev_event_id in existing_prevs:
|
||||
continue
|
||||
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
if soft_failed or rejected:
|
||||
to_recursively_check.append(prev_event_id)
|
||||
existing_prevs.add(prev_event_id)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction(
|
||||
"_get_prevs_before_rejected",
|
||||
_get_prevs_before_rejected_txn,
|
||||
chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(existing_prevs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(
|
||||
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
|
||||
|
|
Loading…
Reference in New Issue