150 lines
5.8 KiB
Python
150 lines
5.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2014 matrix.org
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from twisted.internet import defer
|
|
|
|
from ._base import BaseHandler
|
|
from synapse.api.streams.event import (
|
|
EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
|
|
RoomDataStreamData
|
|
)
|
|
from synapse.handlers.presence import PresenceStreamData
|
|
|
|
|
|
class EventStreamHandler(BaseHandler):
|
|
|
|
stream_data_classes = [
|
|
MessagesStreamData,
|
|
RoomMemberStreamData,
|
|
FeedbackStreamData,
|
|
RoomDataStreamData,
|
|
PresenceStreamData,
|
|
]
|
|
|
|
def __init__(self, hs):
|
|
super(EventStreamHandler, self).__init__(hs)
|
|
|
|
# Count of active streams per user
|
|
self._streams_per_user = {}
|
|
# Grace timers per user to delay the "stopped" signal
|
|
self._stop_timer_per_user = {}
|
|
|
|
self.distributor = hs.get_distributor()
|
|
self.distributor.declare("started_user_eventstream")
|
|
self.distributor.declare("stopped_user_eventstream")
|
|
|
|
self.clock = hs.get_clock()
|
|
|
|
def get_event_stream_token(self, stream_type, store_id, start_token):
|
|
"""Return the next token after this event.
|
|
|
|
Args:
|
|
stream_type (str): The StreamData.EVENT_TYPE
|
|
store_id (int): The new storage ID assigned from the data store.
|
|
start_token (str): The token the user started with.
|
|
Returns:
|
|
str: The end token.
|
|
"""
|
|
for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
|
|
if stream_cls.EVENT_TYPE == stream_type:
|
|
# this is the stream for this event, so replace this part of
|
|
# the token
|
|
store_ids = start_token.split(EventStream.SEPARATOR)
|
|
store_ids[i] = str(store_id)
|
|
return EventStream.SEPARATOR.join(store_ids)
|
|
raise RuntimeError("Didn't find a stream type %s" % stream_type)
|
|
|
|
@defer.inlineCallbacks
|
|
def get_stream(self, auth_user_id, pagin_config, timeout=0):
|
|
"""Gets events as an event stream for this user.
|
|
|
|
This function looks for interesting *events* for this user. This is
|
|
different from the notifier, which looks for interested *users* who may
|
|
want to know about a single event.
|
|
|
|
Args:
|
|
auth_user_id (str): The user requesting their event stream.
|
|
pagin_config (synapse.api.streams.PaginationConfig): The config to
|
|
use when obtaining the stream.
|
|
timeout (int): The max time to wait for an incoming event in ms.
|
|
Returns:
|
|
A pagination stream API dict
|
|
"""
|
|
auth_user = self.hs.parse_userid(auth_user_id)
|
|
|
|
stream_id = object()
|
|
|
|
try:
|
|
if auth_user not in self._streams_per_user:
|
|
self._streams_per_user[auth_user] = 0
|
|
if auth_user in self._stop_timer_per_user:
|
|
self.clock.cancel_call_later(
|
|
self._stop_timer_per_user.pop(auth_user))
|
|
else:
|
|
self.distributor.fire(
|
|
"started_user_eventstream", auth_user
|
|
)
|
|
self._streams_per_user[auth_user] += 1
|
|
|
|
# construct an event stream with the correct data ordering
|
|
stream_data_list = []
|
|
for stream_class in EventStreamHandler.stream_data_classes:
|
|
stream_data_list.append(stream_class(self.hs))
|
|
event_stream = EventStream(auth_user_id, stream_data_list)
|
|
|
|
# fix unknown tokens to known tokens
|
|
pagin_config = yield event_stream.fix_tokens(pagin_config)
|
|
|
|
# register interest in receiving new events
|
|
self.notifier.store_events_for(user_id=auth_user_id,
|
|
stream_id=stream_id,
|
|
from_tok=pagin_config.from_tok)
|
|
|
|
# see if we can grab a chunk now
|
|
data_chunk = yield event_stream.get_chunk(config=pagin_config)
|
|
|
|
# if there are previous events, return those. If not, wait on the
|
|
# new events for 'timeout' seconds.
|
|
if len(data_chunk["chunk"]) == 0 and timeout != 0:
|
|
results = yield defer.maybeDeferred(
|
|
self.notifier.get_events_for,
|
|
user_id=auth_user_id,
|
|
stream_id=stream_id,
|
|
timeout=timeout
|
|
)
|
|
if results:
|
|
defer.returnValue(results)
|
|
|
|
defer.returnValue(data_chunk)
|
|
finally:
|
|
# cleanup
|
|
self.notifier.purge_events_for(user_id=auth_user_id,
|
|
stream_id=stream_id)
|
|
|
|
self._streams_per_user[auth_user] -= 1
|
|
if not self._streams_per_user[auth_user]:
|
|
del self._streams_per_user[auth_user]
|
|
|
|
# 10 seconds of grace to allow the client to reconnect again
|
|
# before we think they're gone
|
|
def _later():
|
|
self.distributor.fire(
|
|
"stopped_user_eventstream", auth_user
|
|
)
|
|
del self._stop_timer_per_user[auth_user]
|
|
|
|
self._stop_timer_per_user[auth_user] = (
|
|
self.clock.call_later(5, _later)
|
|
)
|