Optimise async get event lookups (#13435)
Still maintains local in memory lookup optimisation, but does any external lookup as part of the deferred that prevents duplicate lookups for the same event at once. This makes the assumption that fetching from an external cache is a non-zero load operation.pull/13463/head
parent
6dd7fa12dc
commit
41320a0554
|
@ -0,0 +1 @@
|
||||||
|
Prevent unnecessary lookups to any external `get_event` cache. Contributed by Nick @ Beeper (@fizzadar).
|
|
@ -600,7 +600,11 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
map from event id to result
|
map from event id to result
|
||||||
"""
|
"""
|
||||||
event_entry_map = await self._get_events_from_cache(
|
# Shortcut: check if we have any events in the *in memory* cache - this function
|
||||||
|
# may be called repeatedly for the same event so at this point we cannot reach
|
||||||
|
# out to any external cache for performance reasons. The external cache is
|
||||||
|
# checked later on in the `get_missing_events_from_cache_or_db` function below.
|
||||||
|
event_entry_map = self._get_events_from_local_cache(
|
||||||
event_ids,
|
event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -632,7 +636,9 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
if missing_events_ids:
|
if missing_events_ids:
|
||||||
|
|
||||||
async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
|
async def get_missing_events_from_cache_or_db() -> Dict[
|
||||||
|
str, EventCacheEntry
|
||||||
|
]:
|
||||||
"""Fetches the events in `missing_event_ids` from the database.
|
"""Fetches the events in `missing_event_ids` from the database.
|
||||||
|
|
||||||
Also creates entries in `self._current_event_fetches` to allow
|
Also creates entries in `self._current_event_fetches` to allow
|
||||||
|
@ -657,10 +663,18 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
# the events have been redacted, and if so pulling the redaction event
|
# the events have been redacted, and if so pulling the redaction event
|
||||||
# out of the database to check it.
|
# out of the database to check it.
|
||||||
#
|
#
|
||||||
|
missing_events = {}
|
||||||
try:
|
try:
|
||||||
missing_events = await self._get_events_from_db(
|
# Try to fetch from any external cache. We already checked the
|
||||||
|
# in-memory cache above.
|
||||||
|
missing_events = await self._get_events_from_external_cache(
|
||||||
missing_events_ids,
|
missing_events_ids,
|
||||||
)
|
)
|
||||||
|
# Now actually fetch any remaining events from the DB
|
||||||
|
db_missing_events = await self._get_events_from_db(
|
||||||
|
missing_events_ids - missing_events.keys(),
|
||||||
|
)
|
||||||
|
missing_events.update(db_missing_events)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
fetching_deferred.errback(e)
|
fetching_deferred.errback(e)
|
||||||
|
@ -679,7 +693,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
|
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
|
||||||
# reuse the same fetch.
|
# reuse the same fetch.
|
||||||
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
|
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
|
||||||
get_missing_events_from_db()
|
get_missing_events_from_cache_or_db()
|
||||||
)
|
)
|
||||||
event_entry_map.update(missing_events)
|
event_entry_map.update(missing_events)
|
||||||
|
|
||||||
|
@ -754,7 +768,54 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
async def _get_events_from_cache(
|
async def _get_events_from_cache(
|
||||||
self, events: Iterable[str], update_metrics: bool = True
|
self, events: Iterable[str], update_metrics: bool = True
|
||||||
) -> Dict[str, EventCacheEntry]:
|
) -> Dict[str, EventCacheEntry]:
|
||||||
"""Fetch events from the caches.
|
"""Fetch events from the caches, both in memory and any external.
|
||||||
|
|
||||||
|
May return rejected events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
events: list of event_ids to fetch
|
||||||
|
update_metrics: Whether to update the cache hit ratio metrics
|
||||||
|
"""
|
||||||
|
event_map = self._get_events_from_local_cache(
|
||||||
|
events, update_metrics=update_metrics
|
||||||
|
)
|
||||||
|
|
||||||
|
missing_event_ids = (e for e in events if e not in event_map)
|
||||||
|
event_map.update(
|
||||||
|
await self._get_events_from_external_cache(
|
||||||
|
events=missing_event_ids,
|
||||||
|
update_metrics=update_metrics,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return event_map
|
||||||
|
|
||||||
|
async def _get_events_from_external_cache(
|
||||||
|
self, events: Iterable[str], update_metrics: bool = True
|
||||||
|
) -> Dict[str, EventCacheEntry]:
|
||||||
|
"""Fetch events from any configured external cache.
|
||||||
|
|
||||||
|
May return rejected events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
events: list of event_ids to fetch
|
||||||
|
update_metrics: Whether to update the cache hit ratio metrics
|
||||||
|
"""
|
||||||
|
event_map = {}
|
||||||
|
|
||||||
|
for event_id in events:
|
||||||
|
ret = await self._get_event_cache.get_external(
|
||||||
|
(event_id,), None, update_metrics=update_metrics
|
||||||
|
)
|
||||||
|
if ret:
|
||||||
|
event_map[event_id] = ret
|
||||||
|
|
||||||
|
return event_map
|
||||||
|
|
||||||
|
def _get_events_from_local_cache(
|
||||||
|
self, events: Iterable[str], update_metrics: bool = True
|
||||||
|
) -> Dict[str, EventCacheEntry]:
|
||||||
|
"""Fetch events from the local, in memory, caches.
|
||||||
|
|
||||||
May return rejected events.
|
May return rejected events.
|
||||||
|
|
||||||
|
@ -766,7 +827,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
for event_id in events:
|
for event_id in events:
|
||||||
# First check if it's in the event cache
|
# First check if it's in the event cache
|
||||||
ret = await self._get_event_cache.get(
|
ret = self._get_event_cache.get_local(
|
||||||
(event_id,), None, update_metrics=update_metrics
|
(event_id,), None, update_metrics=update_metrics
|
||||||
)
|
)
|
||||||
if ret:
|
if ret:
|
||||||
|
@ -788,7 +849,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
# We add the entry back into the cache as we want to keep
|
# We add the entry back into the cache as we want to keep
|
||||||
# recently queried events in the cache.
|
# recently queried events in the cache.
|
||||||
await self._get_event_cache.set((event_id,), cache_entry)
|
self._get_event_cache.set_local((event_id,), cache_entry)
|
||||||
|
|
||||||
return event_map
|
return event_map
|
||||||
|
|
||||||
|
|
|
@ -896,7 +896,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
# We don't update the event cache hit ratio as it completely throws off
|
# We don't update the event cache hit ratio as it completely throws off
|
||||||
# the hit ratio counts. After all, we don't populate the cache if we
|
# the hit ratio counts. After all, we don't populate the cache if we
|
||||||
# miss it here
|
# miss it here
|
||||||
event_map = await self._get_events_from_cache(
|
event_map = self._get_events_from_local_cache(
|
||||||
member_event_ids, update_metrics=False
|
member_event_ids, update_metrics=False
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -834,9 +834,26 @@ class AsyncLruCache(Generic[KT, VT]):
|
||||||
) -> Optional[VT]:
|
) -> Optional[VT]:
|
||||||
return self._lru_cache.get(key, update_metrics=update_metrics)
|
return self._lru_cache.get(key, update_metrics=update_metrics)
|
||||||
|
|
||||||
|
async def get_external(
|
||||||
|
self,
|
||||||
|
key: KT,
|
||||||
|
default: Optional[T] = None,
|
||||||
|
update_metrics: bool = True,
|
||||||
|
) -> Optional[VT]:
|
||||||
|
# This method should fetch from any configured external cache, in this case noop.
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_local(
|
||||||
|
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
|
||||||
|
) -> Optional[VT]:
|
||||||
|
return self._lru_cache.get(key, update_metrics=update_metrics)
|
||||||
|
|
||||||
async def set(self, key: KT, value: VT) -> None:
|
async def set(self, key: KT, value: VT) -> None:
|
||||||
self._lru_cache.set(key, value)
|
self._lru_cache.set(key, value)
|
||||||
|
|
||||||
|
def set_local(self, key: KT, value: VT) -> None:
|
||||||
|
self._lru_cache.set(key, value)
|
||||||
|
|
||||||
async def invalidate(self, key: KT) -> None:
|
async def invalidate(self, key: KT) -> None:
|
||||||
# This method should invalidate any external cache and then invalidate the LruCache.
|
# This method should invalidate any external cache and then invalidate the LruCache.
|
||||||
return self._lru_cache.invalidate(key)
|
return self._lru_cache.invalidate(key)
|
||||||
|
|
Loading…
Reference in New Issue