Reduce calls to `send_presence_to_destinations` (#16385)
parent
2763c49eca
commit
47ffc7e548
|
@ -0,0 +1 @@
|
||||||
|
Minor performance improvement when sending presence to federated servers.
|
|
@ -401,9 +401,9 @@ class BasePresenceHandler(abc.ABC):
|
||||||
states,
|
states,
|
||||||
)
|
)
|
||||||
|
|
||||||
for destination, host_states in hosts_to_states.items():
|
for destinations, host_states in hosts_to_states:
|
||||||
await self._federation.send_presence_to_destinations(
|
await self._federation.send_presence_to_destinations(
|
||||||
host_states, [destination]
|
host_states, destinations
|
||||||
)
|
)
|
||||||
|
|
||||||
async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
|
async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
|
||||||
|
@ -1000,9 +1000,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
list(to_federation_ping.values()),
|
list(to_federation_ping.values()),
|
||||||
)
|
)
|
||||||
|
|
||||||
for destination, states in hosts_to_states.items():
|
for destinations, states in hosts_to_states:
|
||||||
await self._federation_queue.send_presence_to_destinations(
|
await self._federation_queue.send_presence_to_destinations(
|
||||||
states, [destination]
|
states, destinations
|
||||||
)
|
)
|
||||||
|
|
||||||
@wrap_as_background_process("handle_presence_timeouts")
|
@wrap_as_background_process("handle_presence_timeouts")
|
||||||
|
@ -2276,7 +2276,7 @@ async def get_interested_remotes(
|
||||||
store: DataStore,
|
store: DataStore,
|
||||||
presence_router: PresenceRouter,
|
presence_router: PresenceRouter,
|
||||||
states: List[UserPresenceState],
|
states: List[UserPresenceState],
|
||||||
) -> Dict[str, Set[UserPresenceState]]:
|
) -> List[Tuple[StrCollection, Collection[UserPresenceState]]]:
|
||||||
"""Given a list of presence states figure out which remote servers
|
"""Given a list of presence states figure out which remote servers
|
||||||
should be sent which.
|
should be sent which.
|
||||||
|
|
||||||
|
@ -2290,23 +2290,26 @@ async def get_interested_remotes(
|
||||||
Returns:
|
Returns:
|
||||||
A map from destinations to presence states to send to that destination.
|
A map from destinations to presence states to send to that destination.
|
||||||
"""
|
"""
|
||||||
hosts_and_states: Dict[str, Set[UserPresenceState]] = {}
|
hosts_and_states: List[Tuple[StrCollection, Collection[UserPresenceState]]] = []
|
||||||
|
|
||||||
# First we look up the rooms each user is in (as well as any explicit
|
# First we look up the rooms each user is in (as well as any explicit
|
||||||
# subscriptions), then for each distinct room we look up the remote
|
# subscriptions), then for each distinct room we look up the remote
|
||||||
# hosts in those rooms.
|
# hosts in those rooms.
|
||||||
room_ids_to_states, users_to_states = await get_interested_parties(
|
for state in states:
|
||||||
store, presence_router, states
|
room_ids = await store.get_rooms_for_user(state.user_id)
|
||||||
)
|
hosts: Set[str] = set()
|
||||||
|
for room_id in room_ids:
|
||||||
|
room_hosts = await store.get_current_hosts_in_room(room_id)
|
||||||
|
hosts.update(room_hosts)
|
||||||
|
hosts_and_states.append((hosts, [state]))
|
||||||
|
|
||||||
for room_id, states in room_ids_to_states.items():
|
# Ask a presence routing module for any additional parties if one
|
||||||
hosts = await store.get_current_hosts_in_room(room_id)
|
# is loaded.
|
||||||
for host in hosts:
|
router_users_to_states = await presence_router.get_users_for_states(states)
|
||||||
hosts_and_states.setdefault(host, set()).update(states)
|
|
||||||
|
|
||||||
for user_id, states in users_to_states.items():
|
for user_id, user_states in router_users_to_states.items():
|
||||||
host = get_domain_from_id(user_id)
|
host = get_domain_from_id(user_id)
|
||||||
hosts_and_states.setdefault(host, set()).update(states)
|
hosts_and_states.append(([host], user_states))
|
||||||
|
|
||||||
return hosts_and_states
|
return hosts_and_states
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue