Split the sections of EventStreamHandler.get_stream that handle presence
into separate functions. This makes the code a bit easier to read, and means that we can reuse the logic when implementing the v2 sync API.pull/296/head
parent
49ebd472fa
commit
1b9802a0d9
|
@ -46,6 +46,56 @@ class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def started_stream(self, user):
|
||||||
|
"""Tells the presence handler that we have started an eventstream for
|
||||||
|
the user:
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user (User): The user who started a stream.
|
||||||
|
Returns:
|
||||||
|
A deferred that completes once their presence has been updated.
|
||||||
|
"""
|
||||||
|
if user not in self._streams_per_user:
|
||||||
|
self._streams_per_user[user] = 0
|
||||||
|
if user in self._stop_timer_per_user:
|
||||||
|
try:
|
||||||
|
self.clock.cancel_call_later(
|
||||||
|
self._stop_timer_per_user.pop(user)
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception("Failed to cancel event timer")
|
||||||
|
else:
|
||||||
|
yield self.distributor.fire("started_user_eventstream", user)
|
||||||
|
|
||||||
|
self._streams_per_user[user] += 1
|
||||||
|
|
||||||
|
def stopped_stream(self, user):
|
||||||
|
"""If there are no streams for a user this starts a timer that will
|
||||||
|
notify the presence handler that we haven't got an event stream for
|
||||||
|
the user unless the user starts a new stream in 30 seconds.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user (User): The user who stopped a stream.
|
||||||
|
"""
|
||||||
|
self._streams_per_user[user] -= 1
|
||||||
|
if not self._streams_per_user[user]:
|
||||||
|
del self._streams_per_user[user]
|
||||||
|
|
||||||
|
# 30 seconds of grace to allow the client to reconnect again
|
||||||
|
# before we think they're gone
|
||||||
|
def _later():
|
||||||
|
logger.debug("_later stopped_user_eventstream %s", user)
|
||||||
|
|
||||||
|
self._stop_timer_per_user.pop(user, None)
|
||||||
|
|
||||||
|
return self.distributor.fire("stopped_user_eventstream", user)
|
||||||
|
|
||||||
|
logger.debug("Scheduling _later: for %s", user)
|
||||||
|
self._stop_timer_per_user[user] = (
|
||||||
|
self.clock.call_later(30, _later)
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def get_stream(self, auth_user_id, pagin_config, timeout=0,
|
def get_stream(self, auth_user_id, pagin_config, timeout=0,
|
||||||
|
@ -59,20 +109,7 @@ class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if affect_presence:
|
if affect_presence:
|
||||||
if auth_user not in self._streams_per_user:
|
yield self.started_stream(auth_user)
|
||||||
self._streams_per_user[auth_user] = 0
|
|
||||||
if auth_user in self._stop_timer_per_user:
|
|
||||||
try:
|
|
||||||
self.clock.cancel_call_later(
|
|
||||||
self._stop_timer_per_user.pop(auth_user)
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
logger.exception("Failed to cancel event timer")
|
|
||||||
else:
|
|
||||||
yield self.distributor.fire(
|
|
||||||
"started_user_eventstream", auth_user
|
|
||||||
)
|
|
||||||
self._streams_per_user[auth_user] += 1
|
|
||||||
|
|
||||||
rm_handler = self.hs.get_handlers().room_member_handler
|
rm_handler = self.hs.get_handlers().room_member_handler
|
||||||
|
|
||||||
|
@ -114,27 +151,7 @@ class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if affect_presence:
|
if affect_presence:
|
||||||
self._streams_per_user[auth_user] -= 1
|
self.stopped_stream(auth_user)
|
||||||
if not self._streams_per_user[auth_user]:
|
|
||||||
del self._streams_per_user[auth_user]
|
|
||||||
|
|
||||||
# 10 seconds of grace to allow the client to reconnect again
|
|
||||||
# before we think they're gone
|
|
||||||
def _later():
|
|
||||||
logger.debug(
|
|
||||||
"_later stopped_user_eventstream %s", auth_user
|
|
||||||
)
|
|
||||||
|
|
||||||
self._stop_timer_per_user.pop(auth_user, None)
|
|
||||||
|
|
||||||
return self.distributor.fire(
|
|
||||||
"stopped_user_eventstream", auth_user
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug("Scheduling _later: for %s", auth_user)
|
|
||||||
self._stop_timer_per_user[auth_user] = (
|
|
||||||
self.clock.call_later(30, _later)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class EventHandler(BaseHandler):
|
class EventHandler(BaseHandler):
|
||||||
|
|
Loading…
Reference in New Issue