Intelligently select extremities used in backfill. (#8349)
Instead of just using the most recent extremities let's pick the ones that will give us results that the pagination request cares about, i.e. pick extremities only if they have a smaller depth than the pagination token. This is useful when we fail to backfill an extremity, as we no longer get stuck requesting that same extremity repeatedly.pull/8675/head
parent
104c490274
commit
9eea5c43af
|
@ -0,0 +1 @@
|
|||
Fix a longstanding bug where back pagination over federation could get stuck if it failed to handle a received event.
|
|
@ -943,15 +943,26 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
return events
|
||||
|
||||
async def maybe_backfill(self, room_id, current_depth):
|
||||
async def maybe_backfill(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
) -> bool:
|
||||
"""Checks the database to see if we should backfill before paginating,
|
||||
and if so do.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
current_depth: The depth from which we're paginating from. This is
|
||||
used to decide if we should backfill and what extremities to
|
||||
use.
|
||||
limit: The number of events that the pagination request will
|
||||
return. This is used as part of the heuristic to decide if we
|
||||
should back paginate.
|
||||
"""
|
||||
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
|
||||
|
||||
if not extremities:
|
||||
logger.debug("Not backfilling as no extremeties found.")
|
||||
return
|
||||
return False
|
||||
|
||||
# We only want to paginate if we can actually see the events we'll get,
|
||||
# as otherwise we'll just spend a lot of resources to get redacted
|
||||
|
@ -1004,16 +1015,54 @@ class FederationHandler(BaseHandler):
|
|||
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
|
||||
max_depth = sorted_extremeties_tuple[0][1]
|
||||
|
||||
# If we're approaching an extremity we trigger a backfill, otherwise we
|
||||
# no-op.
|
||||
#
|
||||
# We chose twice the limit here as then clients paginating backwards
|
||||
# will send pagination requests that trigger backfill at least twice
|
||||
# using the most recent extremity before it gets removed (see below). We
|
||||
# chose more than one times the limit in case of failure, but choosing a
|
||||
# much larger factor will result in triggering a backfill request much
|
||||
# earlier than necessary.
|
||||
if current_depth - 2 * limit > max_depth:
|
||||
logger.debug(
|
||||
"Not backfilling as we don't need to. %d < %d - 2 * %d",
|
||||
max_depth,
|
||||
current_depth,
|
||||
limit,
|
||||
)
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
|
||||
room_id,
|
||||
current_depth,
|
||||
max_depth,
|
||||
sorted_extremeties_tuple,
|
||||
)
|
||||
|
||||
# We ignore extremities that have a greater depth than our current depth
|
||||
# as:
|
||||
# 1. we don't really care about getting events that have happened
|
||||
# before our current position; and
|
||||
# 2. we have likely previously tried and failed to backfill from that
|
||||
# extremity, so to avoid getting "stuck" requesting the same
|
||||
# backfill repeatedly we drop those extremities.
|
||||
filtered_sorted_extremeties_tuple = [
|
||||
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
|
||||
]
|
||||
|
||||
# However, we need to check that the filtered extremities are non-empty.
|
||||
# If they are empty then either we can a) bail or b) still attempt to
|
||||
# backill. We opt to try backfilling anyway just in case we do get
|
||||
# relevant events.
|
||||
if filtered_sorted_extremeties_tuple:
|
||||
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
|
||||
|
||||
# We don't want to specify too many extremities as it causes the backfill
|
||||
# request URI to be too long.
|
||||
extremities = dict(sorted_extremeties_tuple[:5])
|
||||
|
||||
if current_depth > max_depth:
|
||||
logger.debug(
|
||||
"Not backfilling as we don't need to. %d < %d", max_depth, current_depth
|
||||
)
|
||||
return
|
||||
|
||||
# Now we need to decide which hosts to hit first.
|
||||
|
||||
# First we try hosts that are already in the room
|
||||
|
|
|
@ -362,9 +362,9 @@ class PaginationHandler:
|
|||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
max_topo = room_token.topological
|
||||
curr_topo = room_token.topological
|
||||
else:
|
||||
max_topo = await self.store.get_max_topological_token(
|
||||
curr_topo = await self.store.get_current_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
|
@ -380,11 +380,11 @@ class PaginationHandler:
|
|||
leave_token = await self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
if RoomStreamToken.parse(leave_token).topological < max_topo:
|
||||
if RoomStreamToken.parse(leave_token).topological < curr_topo:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
await self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||
room_id, max_topo
|
||||
room_id, curr_topo, limit=source_config.limit,
|
||||
)
|
||||
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
|
|
|
@ -648,23 +648,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
)
|
||||
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
|
||||
|
||||
async def get_max_topological_token(self, room_id: str, stream_key: int) -> int:
|
||||
"""Get the max topological token in a room before the given stream
|
||||
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
|
||||
"""Gets the topological token in a room after or at the given stream
|
||||
ordering.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
stream_key
|
||||
|
||||
Returns:
|
||||
The maximum topological token.
|
||||
"""
|
||||
sql = (
|
||||
"SELECT coalesce(max(topological_ordering), 0) FROM events"
|
||||
" WHERE room_id = ? AND stream_ordering < ?"
|
||||
"SELECT coalesce(MIN(topological_ordering), 0) FROM events"
|
||||
" WHERE room_id = ? AND stream_ordering >= ?"
|
||||
)
|
||||
row = await self.db_pool.execute(
|
||||
"get_max_topological_token", None, sql, room_id, stream_key
|
||||
"get_current_topological_token", None, sql, room_id, stream_key
|
||||
)
|
||||
return row[0][0] if row else 0
|
||||
|
||||
|
|
Loading…
Reference in New Issue