Move get_interested_remotes back to presence handler
parent
b9b72bc6e2
commit
6308ac45b0
|
@ -25,7 +25,7 @@ from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
|
||||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -251,7 +251,10 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
# First we queue up the new presence by user ID, so multiple presence
|
# First we queue up the new presence by user ID, so multiple presence
|
||||||
# updates in quick successtion are correctly handled
|
# updates in quick successtion are correctly handled
|
||||||
self.pending_presence.update({state.user_id: state for state in states})
|
self.pending_presence.update({
|
||||||
|
state.user_id: state for state in states
|
||||||
|
if self.is_mine_id(state.user_id)
|
||||||
|
})
|
||||||
|
|
||||||
# We then handle the new pending presence in batches, first figuring
|
# We then handle the new pending presence in batches, first figuring
|
||||||
# out the destinations we need to send each state to and then poking it
|
# out the destinations we need to send each state to and then poking it
|
||||||
|
@ -283,40 +286,8 @@ class TransactionQueue(object):
|
||||||
Args:
|
Args:
|
||||||
states (list(UserPresenceState))
|
states (list(UserPresenceState))
|
||||||
"""
|
"""
|
||||||
# First we look up the rooms each user is in (as well as any explicit
|
hosts_and_states = yield get_interested_remotes(self.store, states)
|
||||||
# subscriptions), then for each distinct room we look up the remote
|
|
||||||
# hosts in those rooms.
|
|
||||||
room_ids_to_states = {}
|
|
||||||
users_to_states = {}
|
|
||||||
for state in states.itervalues():
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(state.user_id)
|
|
||||||
for room_id in room_ids:
|
|
||||||
room_ids_to_states.setdefault(room_id, []).append(state)
|
|
||||||
|
|
||||||
plist = yield self.store.get_presence_list_observers_accepted(
|
|
||||||
state.user_id,
|
|
||||||
)
|
|
||||||
for u in plist:
|
|
||||||
users_to_states.setdefault(u, []).append(state)
|
|
||||||
|
|
||||||
hosts_and_states = []
|
|
||||||
for room_id, states in room_ids_to_states.items():
|
|
||||||
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
|
|
||||||
if not local_states:
|
|
||||||
continue
|
|
||||||
|
|
||||||
hosts = yield self.store.get_hosts_in_room(room_id)
|
|
||||||
hosts_and_states.append((hosts, local_states))
|
|
||||||
|
|
||||||
for user_id, states in users_to_states.items():
|
|
||||||
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
|
|
||||||
if not local_states:
|
|
||||||
continue
|
|
||||||
|
|
||||||
host = get_domain_from_id(user_id)
|
|
||||||
hosts_and_states.append(([host], local_states))
|
|
||||||
|
|
||||||
# And now finally queue up new transactions
|
|
||||||
for destinations, states in hosts_and_states:
|
for destinations, states in hosts_and_states:
|
||||||
for destination in destinations:
|
for destination in destinations:
|
||||||
if not self.can_send_to(destination):
|
if not self.can_send_to(destination):
|
||||||
|
|
|
@ -633,9 +633,6 @@ class PresenceHandler(object):
|
||||||
# Always notify self
|
# Always notify self
|
||||||
users_to_states.setdefault(state.user_id, []).append(state)
|
users_to_states.setdefault(state.user_id, []).append(state)
|
||||||
|
|
||||||
# TODO: de-dup hosts_to_states, as a single host might have multiple
|
|
||||||
# of same presence
|
|
||||||
|
|
||||||
defer.returnValue((room_ids_to_states, users_to_states))
|
defer.returnValue((room_ids_to_states, users_to_states))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -1318,3 +1315,52 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||||
persist_and_notify = True
|
persist_and_notify = True
|
||||||
|
|
||||||
return new_state, persist_and_notify, federation_ping
|
return new_state, persist_and_notify, federation_ping
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_interested_remotes(store, states):
|
||||||
|
"""Given a list of presence states figure out which remote servers
|
||||||
|
should be sent which.
|
||||||
|
|
||||||
|
All the presence states should be for local users only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
store (DataStore)
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred list of ([destinations], [UserPresenceState]), where for
|
||||||
|
each row the list of UserPresenceState should be sent to each
|
||||||
|
destination
|
||||||
|
"""
|
||||||
|
# First we look up the rooms each user is in (as well as any explicit
|
||||||
|
# subscriptions), then for each distinct room we look up the remote
|
||||||
|
# hosts in those rooms.
|
||||||
|
room_ids_to_states = {}
|
||||||
|
users_to_states = {}
|
||||||
|
for state in states.itervalues():
|
||||||
|
room_ids = yield store.get_rooms_for_user(state.user_id)
|
||||||
|
for room_id in room_ids:
|
||||||
|
room_ids_to_states.setdefault(room_id, []).append(state)
|
||||||
|
|
||||||
|
plist = yield store.get_presence_list_observers_accepted(
|
||||||
|
state.user_id,
|
||||||
|
)
|
||||||
|
for u in plist:
|
||||||
|
users_to_states.setdefault(u, []).append(state)
|
||||||
|
|
||||||
|
hosts_and_states = []
|
||||||
|
for room_id, states in room_ids_to_states.items():
|
||||||
|
if not local_states:
|
||||||
|
continue
|
||||||
|
|
||||||
|
hosts = yield store.get_hosts_in_room(room_id)
|
||||||
|
hosts_and_states.append((hosts, local_states))
|
||||||
|
|
||||||
|
for user_id, states in users_to_states.items():
|
||||||
|
if not local_states:
|
||||||
|
continue
|
||||||
|
|
||||||
|
host = get_domain_from_id(user_id)
|
||||||
|
hosts_and_states.append(([host], local_states))
|
||||||
|
|
||||||
|
defer.returnValue(hosts_and_states)
|
||||||
|
|
Loading…
Reference in New Issue