Add some doc-strings to notifier
parent
084c365c3a
commit
0ad1c67234
|
@ -81,6 +81,13 @@ class _NotifierUserStream(object):
|
|||
self.last_notified_ms = time_now_ms
|
||||
|
||||
def notify(self, stream_key, stream_id, time_now_ms):
|
||||
"""Notify any listeners for this user of a new event from an
|
||||
event source.
|
||||
Args:
|
||||
stream_key(str): The stream the event came from.
|
||||
stream_id(str): The new id for the stream the event came from.
|
||||
time_now_ms(int): The current time in milliseconds.
|
||||
"""
|
||||
self.current_token = self.current_token.copy_and_replace(
|
||||
stream_key, stream_id
|
||||
)
|
||||
|
@ -167,17 +174,6 @@ class Notifier(object):
|
|||
lambda: count(bool, self.appservice_to_user_streams.values()),
|
||||
)
|
||||
|
||||
def notify_pending_new_room_events(self, max_room_stream_id):
|
||||
pending = sorted(self.pending_new_room_events)
|
||||
self.pending_new_room_events = []
|
||||
for event, room_stream_id, extra_users in pending:
|
||||
if room_stream_id > max_room_stream_id:
|
||||
self.pending_new_room_events.append((
|
||||
event, room_stream_id, extra_users
|
||||
))
|
||||
else:
|
||||
self._on_new_room_event(event, room_stream_id, extra_users)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
|
||||
|
@ -188,19 +184,37 @@ class Notifier(object):
|
|||
This triggers the notifier to wake up any listeners that are
|
||||
listening to the room, and any listeners for the users in the
|
||||
`extra_users` param.
|
||||
|
||||
The events can be peristed out of order. The notifier will wait
|
||||
until all previous events have been persisted before notifying
|
||||
the client streams.
|
||||
"""
|
||||
yield run_on_reactor()
|
||||
|
||||
self.notify_pending_new_room_events(max_room_stream_id)
|
||||
self.pending_new_room_events.append((
|
||||
event, room_stream_id, extra_users
|
||||
))
|
||||
self._notify_pending_new_room_events(max_room_stream_id)
|
||||
|
||||
if room_stream_id > max_room_stream_id:
|
||||
self.pending_new_room_events.append((
|
||||
event, room_stream_id, extra_users
|
||||
))
|
||||
else:
|
||||
self._on_new_room_event(event, room_stream_id, extra_users)
|
||||
def _notify_pending_new_room_events(self, max_room_stream_id):
|
||||
"""Notify for the room events that were queued waiting for a previous
|
||||
event to be persisted.
|
||||
Args:
|
||||
max_room_stream_id(int): The highest stream_id below which all
|
||||
events have been persisted.
|
||||
"""
|
||||
pending = sorted(self.pending_new_room_events)
|
||||
self.pending_new_room_events = []
|
||||
for event, room_stream_id, extra_users in pending:
|
||||
if room_stream_id > max_room_stream_id:
|
||||
self.pending_new_room_events.append((
|
||||
event, room_stream_id, extra_users
|
||||
))
|
||||
else:
|
||||
self._on_new_room_event(event, room_stream_id, extra_users)
|
||||
|
||||
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
|
||||
"""Notify any user streams that are interested in this room event"""
|
||||
# poke any interested application service.
|
||||
self.hs.get_handlers().appservice_handler.notify_interested_services(
|
||||
event
|
||||
|
|
Loading…
Reference in New Issue