Don't bother passing the events to the notifier since it isn't using them
parent
5e3b254dc8
commit
e269c511f6
|
@ -17,6 +17,7 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.types import StreamToken
|
||||
import synapse.metrics
|
||||
|
||||
|
@ -50,13 +51,9 @@ class _NotificationListener(object):
|
|||
so that it can remove itself from the indexes in the Notifier class.
|
||||
"""
|
||||
|
||||
def __init__(self, user, rooms, from_token, limit, timeout, deferred,
|
||||
appservice=None):
|
||||
def __init__(self, user, rooms, deferred, appservice=None):
|
||||
self.user = user
|
||||
self.appservice = appservice
|
||||
self.from_token = from_token
|
||||
self.limit = limit
|
||||
self.timeout = timeout
|
||||
self.deferred = deferred
|
||||
self.rooms = rooms
|
||||
self.timer = None
|
||||
|
@ -64,17 +61,14 @@ class _NotificationListener(object):
|
|||
def notified(self):
|
||||
return self.deferred.called
|
||||
|
||||
def notify(self, notifier, events, start_token, end_token):
|
||||
def notify(self, notifier):
|
||||
""" Inform whoever is listening about the new events. This will
|
||||
also remove this listener from all the indexes in the Notifier
|
||||
it knows about.
|
||||
"""
|
||||
|
||||
result = (events, (start_token, end_token))
|
||||
|
||||
try:
|
||||
self.deferred.callback(result)
|
||||
notified_events_counter.inc_by(len(events))
|
||||
self.deferred.callback(None)
|
||||
except defer.AlreadyCalledError:
|
||||
pass
|
||||
|
||||
|
@ -161,6 +155,7 @@ class Notifier(object):
|
|||
listening to the room, and any listeners for the users in the
|
||||
`extra_users` param.
|
||||
"""
|
||||
yield run_on_reactor()
|
||||
# poke any interested application service.
|
||||
self.hs.get_handlers().appservice_handler.notify_interested_services(
|
||||
event
|
||||
|
@ -168,8 +163,6 @@ class Notifier(object):
|
|||
|
||||
room_id = event.room_id
|
||||
|
||||
room_source = self.event_sources.sources["room"]
|
||||
|
||||
room_listeners = self.room_to_listeners.get(room_id, set())
|
||||
|
||||
_discard_if_notified(room_listeners)
|
||||
|
@ -200,34 +193,12 @@ class Notifier(object):
|
|||
|
||||
logger.debug("on_new_room_event listeners %s", listeners)
|
||||
|
||||
# TODO (erikj): Can we make this more efficient by hitting the
|
||||
# db once?
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify(listener):
|
||||
events, end_key = yield room_source.get_new_events_for_user(
|
||||
listener.user,
|
||||
listener.from_token.room_key,
|
||||
listener.limit,
|
||||
)
|
||||
|
||||
if events:
|
||||
end_token = listener.from_token.copy_and_replace(
|
||||
"room_key", end_key
|
||||
)
|
||||
|
||||
listener.notify(
|
||||
self, events, listener.from_token, end_token
|
||||
)
|
||||
|
||||
def eb(failure):
|
||||
logger.exception("Failed to notify listener", failure)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
yield defer.DeferredList(
|
||||
[notify(l).addErrback(eb) for l in listeners],
|
||||
consumeErrors=True,
|
||||
)
|
||||
for listener in listeners:
|
||||
try:
|
||||
listener.notify(self)
|
||||
except:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
@ -237,11 +208,7 @@ class Notifier(object):
|
|||
|
||||
Will wake up all listeners for the given users and rooms.
|
||||
"""
|
||||
# TODO(paul): This is horrible, having to manually list every event
|
||||
# source here individually
|
||||
presence_source = self.event_sources.sources["presence"]
|
||||
typing_source = self.event_sources.sources["typing"]
|
||||
|
||||
yield run_on_reactor()
|
||||
listeners = set()
|
||||
|
||||
for user in users:
|
||||
|
@ -258,51 +225,12 @@ class Notifier(object):
|
|||
|
||||
listeners |= room_listeners
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify(listener):
|
||||
presence_events, presence_end_key = (
|
||||
yield presence_source.get_new_events_for_user(
|
||||
listener.user,
|
||||
listener.from_token.presence_key,
|
||||
listener.limit,
|
||||
)
|
||||
)
|
||||
typing_events, typing_end_key = (
|
||||
yield typing_source.get_new_events_for_user(
|
||||
listener.user,
|
||||
listener.from_token.typing_key,
|
||||
listener.limit,
|
||||
)
|
||||
)
|
||||
|
||||
if presence_events or typing_events:
|
||||
end_token = listener.from_token.copy_and_replace(
|
||||
"presence_key", presence_end_key
|
||||
).copy_and_replace(
|
||||
"typing_key", typing_end_key
|
||||
)
|
||||
|
||||
listener.notify(
|
||||
self,
|
||||
presence_events + typing_events,
|
||||
listener.from_token,
|
||||
end_token
|
||||
)
|
||||
|
||||
def eb(failure):
|
||||
logger.error(
|
||||
"Failed to notify listener",
|
||||
exc_info=(
|
||||
failure.type,
|
||||
failure.value,
|
||||
failure.getTracebackObject())
|
||||
)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
yield defer.DeferredList(
|
||||
[notify(l).addErrback(eb) for l in listeners],
|
||||
consumeErrors=True,
|
||||
)
|
||||
for listener in listeners:
|
||||
try:
|
||||
listener.notify(self)
|
||||
except:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_events(self, user, rooms, timeout, callback,
|
||||
|
@ -319,9 +247,6 @@ class Notifier(object):
|
|||
listener = [_NotificationListener(
|
||||
user=user,
|
||||
rooms=rooms,
|
||||
from_token=from_token,
|
||||
limit=1,
|
||||
timeout=timeout,
|
||||
deferred=deferred,
|
||||
appservice=appservice,
|
||||
)]
|
||||
|
@ -338,7 +263,7 @@ class Notifier(object):
|
|||
def _timeout_listener():
|
||||
timed_out[0] = True
|
||||
timer[0] = None
|
||||
listener[0].notify(self, [], from_token, from_token)
|
||||
listener[0].notify(self)
|
||||
|
||||
# We create multiple notification listeners so we have to manage
|
||||
# canceling the timeout ourselves.
|
||||
|
@ -350,10 +275,8 @@ class Notifier(object):
|
|||
listener[0] = _NotificationListener(
|
||||
user=user,
|
||||
rooms=rooms,
|
||||
from_token=from_token,
|
||||
limit=1,
|
||||
timeout=timeout,
|
||||
deferred=deferred,
|
||||
appservice=appservice,
|
||||
)
|
||||
self._register_with_keys(listener[0])
|
||||
result = yield callback()
|
||||
|
|
Loading…
Reference in New Issue