Don't set the external cache if its been done recently (#9905)
parent
d0aee697ac
commit
de8f0a03a3
|
@ -0,0 +1 @@
|
||||||
|
Improve performance of sending events for worker-based deployments using Redis.
|
|
@ -2446,7 +2446,9 @@ class FederationHandler(BaseHandler):
|
||||||
# If we are going to send this event over federation we precaclculate
|
# If we are going to send this event over federation we precaclculate
|
||||||
# the joined hosts.
|
# the joined hosts.
|
||||||
if event.internal_metadata.get_send_on_behalf_of():
|
if event.internal_metadata.get_send_on_behalf_of():
|
||||||
await self.event_creation_handler.cache_joined_hosts_for_event(event)
|
await self.event_creation_handler.cache_joined_hosts_for_event(
|
||||||
|
event, context
|
||||||
|
)
|
||||||
|
|
||||||
return context
|
return context
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter
|
||||||
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
||||||
from synapse.util import json_decoder, json_encoder
|
from synapse.util import json_decoder, json_encoder
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
|
@ -457,6 +458,19 @@ class EventCreationHandler:
|
||||||
|
|
||||||
self._external_cache = hs.get_external_cache()
|
self._external_cache = hs.get_external_cache()
|
||||||
|
|
||||||
|
# Stores the state groups we've recently added to the joined hosts
|
||||||
|
# external cache. Note that the timeout must be significantly less than
|
||||||
|
# the TTL on the external cache.
|
||||||
|
self._external_cache_joined_hosts_updates = (
|
||||||
|
None
|
||||||
|
) # type: Optional[ExpiringCache]
|
||||||
|
if self._external_cache.is_enabled():
|
||||||
|
self._external_cache_joined_hosts_updates = ExpiringCache(
|
||||||
|
"_external_cache_joined_hosts_updates",
|
||||||
|
self.clock,
|
||||||
|
expiry_ms=30 * 60 * 1000,
|
||||||
|
)
|
||||||
|
|
||||||
async def create_event(
|
async def create_event(
|
||||||
self,
|
self,
|
||||||
requester: Requester,
|
requester: Requester,
|
||||||
|
@ -967,7 +981,7 @@ class EventCreationHandler:
|
||||||
|
|
||||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||||
|
|
||||||
await self.cache_joined_hosts_for_event(event)
|
await self.cache_joined_hosts_for_event(event, context)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# If we're a worker we need to hit out to the master.
|
# If we're a worker we need to hit out to the master.
|
||||||
|
@ -1008,7 +1022,9 @@ class EventCreationHandler:
|
||||||
await self.store.remove_push_actions_from_staging(event.event_id)
|
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
|
async def cache_joined_hosts_for_event(
|
||||||
|
self, event: EventBase, context: EventContext
|
||||||
|
) -> None:
|
||||||
"""Precalculate the joined hosts at the event, when using Redis, so that
|
"""Precalculate the joined hosts at the event, when using Redis, so that
|
||||||
external federation senders don't have to recalculate it themselves.
|
external federation senders don't have to recalculate it themselves.
|
||||||
"""
|
"""
|
||||||
|
@ -1016,6 +1032,9 @@ class EventCreationHandler:
|
||||||
if not self._external_cache.is_enabled():
|
if not self._external_cache.is_enabled():
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# If external cache is enabled we should always have this.
|
||||||
|
assert self._external_cache_joined_hosts_updates is not None
|
||||||
|
|
||||||
# We actually store two mappings, event ID -> prev state group,
|
# We actually store two mappings, event ID -> prev state group,
|
||||||
# state group -> joined hosts, which is much more space efficient
|
# state group -> joined hosts, which is much more space efficient
|
||||||
# than event ID -> joined hosts.
|
# than event ID -> joined hosts.
|
||||||
|
@ -1023,16 +1042,21 @@ class EventCreationHandler:
|
||||||
# Note: We have to cache event ID -> prev state group, as we don't
|
# Note: We have to cache event ID -> prev state group, as we don't
|
||||||
# store that in the DB.
|
# store that in the DB.
|
||||||
#
|
#
|
||||||
# Note: We always set the state group -> joined hosts cache, even if
|
# Note: We set the state group -> joined hosts cache if it hasn't been
|
||||||
# we already set it, so that the expiry time is reset.
|
# set for a while, so that the expiry time is reset.
|
||||||
|
|
||||||
state_entry = await self.state.resolve_state_groups_for_events(
|
state_entry = await self.state.resolve_state_groups_for_events(
|
||||||
event.room_id, event_ids=event.prev_event_ids()
|
event.room_id, event_ids=event.prev_event_ids()
|
||||||
)
|
)
|
||||||
|
|
||||||
if state_entry.state_group:
|
if state_entry.state_group:
|
||||||
|
if state_entry.state_group in self._external_cache_joined_hosts_updates:
|
||||||
|
return
|
||||||
|
|
||||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||||
|
|
||||||
|
# Note that the expiry times must be larger than the expiry time in
|
||||||
|
# _external_cache_joined_hosts_updates.
|
||||||
await self._external_cache.set(
|
await self._external_cache.set(
|
||||||
"event_to_prev_state_group",
|
"event_to_prev_state_group",
|
||||||
event.event_id,
|
event.event_id,
|
||||||
|
@ -1046,6 +1070,8 @@ class EventCreationHandler:
|
||||||
expiry_ms=60 * 60 * 1000,
|
expiry_ms=60 * 60 * 1000,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._external_cache_joined_hosts_updates[state_entry.state_group] = None
|
||||||
|
|
||||||
async def _validate_canonical_alias(
|
async def _validate_canonical_alias(
|
||||||
self, directory_handler, room_alias_str: str, expected_room_id: str
|
self, directory_handler, room_alias_str: str, expected_room_id: str
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
Loading…
Reference in New Issue