Move get_interested_parties
parent
2be8a281d2
commit
414522aed5
|
@ -20,7 +20,7 @@ from synapse.api.constants import EventTypes
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
from synapse.handlers.presence import PresenceHandler
|
from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
|
@ -172,7 +172,6 @@ class SynchrotronPresence(object):
|
||||||
|
|
||||||
get_states = PresenceHandler.get_states.__func__
|
get_states = PresenceHandler.get_states.__func__
|
||||||
get_state = PresenceHandler.get_state.__func__
|
get_state = PresenceHandler.get_state.__func__
|
||||||
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
|
|
||||||
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
||||||
|
|
||||||
def user_syncing(self, user_id, affect_presence):
|
def user_syncing(self, user_id, affect_presence):
|
||||||
|
@ -206,7 +205,7 @@ class SynchrotronPresence(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_from_replication(self, states, stream_id):
|
def notify_from_replication(self, states, stream_id):
|
||||||
parties = yield self._get_interested_parties(states)
|
parties = yield get_interested_parties(self.store, states)
|
||||||
room_ids_to_states, users_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
|
|
|
@ -610,31 +610,6 @@ class PresenceHandler(object):
|
||||||
|
|
||||||
defer.returnValue(states)
|
defer.returnValue(states)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _get_interested_parties(self, states):
|
|
||||||
"""Given a list of states return which entities (rooms, users, servers)
|
|
||||||
are interested in the given states.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
2-tuple: `(room_ids_to_states, users_to_states)`,
|
|
||||||
with each item being a dict of `entity_name` -> `[UserPresenceState]`
|
|
||||||
"""
|
|
||||||
room_ids_to_states = {}
|
|
||||||
users_to_states = {}
|
|
||||||
for state in states:
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(state.user_id)
|
|
||||||
for room_id in room_ids:
|
|
||||||
room_ids_to_states.setdefault(room_id, []).append(state)
|
|
||||||
|
|
||||||
plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
|
|
||||||
for u in plist:
|
|
||||||
users_to_states.setdefault(u, []).append(state)
|
|
||||||
|
|
||||||
# Always notify self
|
|
||||||
users_to_states.setdefault(state.user_id, []).append(state)
|
|
||||||
|
|
||||||
defer.returnValue((room_ids_to_states, users_to_states))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _persist_and_notify(self, states):
|
def _persist_and_notify(self, states):
|
||||||
"""Persist states in the database, poke the notifier and send to
|
"""Persist states in the database, poke the notifier and send to
|
||||||
|
@ -642,7 +617,7 @@ class PresenceHandler(object):
|
||||||
"""
|
"""
|
||||||
stream_id, max_token = yield self.store.update_presence(states)
|
stream_id, max_token = yield self.store.update_presence(states)
|
||||||
|
|
||||||
parties = yield self._get_interested_parties(states)
|
parties = yield get_interested_parties(self.store, states)
|
||||||
room_ids_to_states, users_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
|
@ -654,7 +629,7 @@ class PresenceHandler(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_for_states(self, state, stream_id):
|
def notify_for_states(self, state, stream_id):
|
||||||
parties = yield self._get_interested_parties([state])
|
parties = yield get_interested_parties(self.store, [state])
|
||||||
room_ids_to_states, users_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
|
@ -1316,6 +1291,36 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||||
|
|
||||||
return new_state, persist_and_notify, federation_ping
|
return new_state, persist_and_notify, federation_ping
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_interested_parties(store, states):
|
||||||
|
"""Given a list of states return which entities (rooms, users)
|
||||||
|
are interested in the given states.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
2-tuple: `(room_ids_to_states, users_to_states)`,
|
||||||
|
with each item being a dict of `entity_name` -> `[UserPresenceState]`
|
||||||
|
"""
|
||||||
|
room_ids_to_states = {}
|
||||||
|
users_to_states = {}
|
||||||
|
for state in states:
|
||||||
|
room_ids = yield store.get_rooms_for_user(state.user_id)
|
||||||
|
for room_id in room_ids:
|
||||||
|
room_ids_to_states.setdefault(room_id, []).append(state)
|
||||||
|
|
||||||
|
plist = yield store.get_presence_list_observers_accepted(state.user_id)
|
||||||
|
for u in plist:
|
||||||
|
users_to_states.setdefault(u, []).append(state)
|
||||||
|
|
||||||
|
# Always notify self
|
||||||
|
users_to_states.setdefault(state.user_id, []).append(state)
|
||||||
|
|
||||||
|
defer.returnValue((room_ids_to_states, users_to_states))
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_interested_remotes(store, states):
|
def get_interested_remotes(store, states):
|
||||||
"""Given a list of presence states figure out which remote servers
|
"""Given a list of presence states figure out which remote servers
|
||||||
|
@ -1351,17 +1356,11 @@ def get_interested_remotes(store, states):
|
||||||
users_to_states.setdefault(u, []).append(state)
|
users_to_states.setdefault(u, []).append(state)
|
||||||
|
|
||||||
for room_id, states in room_ids_to_states.items():
|
for room_id, states in room_ids_to_states.items():
|
||||||
if not local_states:
|
|
||||||
continue
|
|
||||||
|
|
||||||
hosts = yield store.get_hosts_in_room(room_id)
|
hosts = yield store.get_hosts_in_room(room_id)
|
||||||
hosts_and_states.append((hosts, local_states))
|
hosts_and_states.append((hosts, states))
|
||||||
|
|
||||||
for user_id, states in users_to_states.items():
|
for user_id, states in users_to_states.items():
|
||||||
if not local_states:
|
|
||||||
continue
|
|
||||||
|
|
||||||
host = get_domain_from_id(user_id)
|
host = get_domain_from_id(user_id)
|
||||||
hosts_and_states.append(([host], local_states))
|
hosts_and_states.append(([host], states))
|
||||||
|
|
||||||
defer.returnValue(hosts_and_states)
|
defer.returnValue(hosts_and_states)
|
||||||
|
|
Loading…
Reference in New Issue