151 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			151 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright 2014 matrix.org
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| from twisted.internet import defer
 | |
| 
 | |
| from ._base import BaseHandler
 | |
| from synapse.api.streams.event import (
 | |
|     EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
 | |
|     RoomDataStreamData
 | |
| )
 | |
| from synapse.handlers.presence import PresenceStreamData
 | |
| 
 | |
| 
 | |
| class EventStreamHandler(BaseHandler):
 | |
| 
 | |
|     stream_data_classes = [
 | |
|         MessagesStreamData,
 | |
|         RoomMemberStreamData,
 | |
|         FeedbackStreamData,
 | |
|         RoomDataStreamData,
 | |
|         PresenceStreamData,
 | |
|     ]
 | |
| 
 | |
|     def __init__(self, hs):
 | |
|         super(EventStreamHandler, self).__init__(hs)
 | |
| 
 | |
|         # Count of active streams per user
 | |
|         self._streams_per_user = {}
 | |
|         # Grace timers per user to delay the "stopped" signal
 | |
|         self._stop_timer_per_user = {}
 | |
| 
 | |
|         self.distributor = hs.get_distributor()
 | |
|         self.distributor.declare("started_user_eventstream")
 | |
|         self.distributor.declare("stopped_user_eventstream")
 | |
| 
 | |
|         self.clock = hs.get_clock()
 | |
| 
 | |
|     def get_event_stream_token(self, stream_type, store_id, start_token):
 | |
|         """Return the next token after this event.
 | |
| 
 | |
|         Args:
 | |
|             stream_type (str): The StreamData.EVENT_TYPE
 | |
|             store_id (int): The new storage ID assigned from the data store.
 | |
|             start_token (str): The token the user started with.
 | |
|         Returns:
 | |
|             str: The end token.
 | |
|         """
 | |
|         for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
 | |
|             if stream_cls.EVENT_TYPE == stream_type:
 | |
|                 # this is the stream for this event, so replace this part of
 | |
|                 # the token
 | |
|                 store_ids = start_token.split(EventStream.SEPARATOR)
 | |
|                 store_ids[i] = str(store_id)
 | |
|                 return EventStream.SEPARATOR.join(store_ids)
 | |
|         raise RuntimeError("Didn't find a stream type %s" % stream_type)
 | |
| 
 | |
|     @defer.inlineCallbacks
 | |
|     def get_stream(self, auth_user_id, pagin_config, timeout=0):
 | |
|         """Gets events as an event stream for this user.
 | |
| 
 | |
|         This function looks for interesting *events* for this user. This is
 | |
|         different from the notifier, which looks for interested *users* who may
 | |
|         want to know about a single event.
 | |
| 
 | |
|         Args:
 | |
|             auth_user_id (str): The user requesting their event stream.
 | |
|             pagin_config (synapse.api.streams.PaginationConfig): The config to
 | |
|             use when obtaining the stream.
 | |
|             timeout (int): The max time to wait for an incoming event in ms.
 | |
|         Returns:
 | |
|             A pagination stream API dict
 | |
|         """
 | |
|         auth_user = self.hs.parse_userid(auth_user_id)
 | |
| 
 | |
|         stream_id = object()
 | |
| 
 | |
|         try:
 | |
|             if auth_user not in self._streams_per_user:
 | |
|                 self._streams_per_user[auth_user] = 0
 | |
|                 if auth_user in self._stop_timer_per_user:
 | |
|                     self.clock.cancel_call_later(
 | |
|                         self._stop_timer_per_user.pop(auth_user))
 | |
|                 else:
 | |
|                     self.distributor.fire(
 | |
|                         "started_user_eventstream", auth_user
 | |
|                     )
 | |
|             self._streams_per_user[auth_user] += 1
 | |
| 
 | |
|             # construct an event stream with the correct data ordering
 | |
|             stream_data_list = []
 | |
|             for stream_class in EventStreamHandler.stream_data_classes:
 | |
|                 stream_data_list.append(stream_class(self.hs))
 | |
|             event_stream = EventStream(auth_user_id, stream_data_list)
 | |
| 
 | |
|             # fix unknown tokens to known tokens
 | |
|             pagin_config = yield event_stream.fix_tokens(pagin_config)
 | |
| 
 | |
|             # register interest in receiving new events
 | |
|             self.notifier.store_events_for(user_id=auth_user_id,
 | |
|                                            stream_id=stream_id,
 | |
|                                            from_tok=pagin_config.from_tok)
 | |
| 
 | |
|             # see if we can grab a chunk now
 | |
|             data_chunk = yield event_stream.get_chunk(config=pagin_config)
 | |
| 
 | |
|             # if there are previous events, return those. If not, wait on the
 | |
|             # new events for 'timeout' seconds.
 | |
|             if len(data_chunk["chunk"]) == 0 and timeout != 0:
 | |
|                 results = yield defer.maybeDeferred(
 | |
|                     self.notifier.get_events_for,
 | |
|                     user_id=auth_user_id,
 | |
|                     stream_id=stream_id,
 | |
|                     timeout=timeout
 | |
|                 )
 | |
|                 if results:
 | |
|                     defer.returnValue(results)
 | |
| 
 | |
|             defer.returnValue(data_chunk)
 | |
|         finally:
 | |
|             # cleanup
 | |
|             self.notifier.purge_events_for(user_id=auth_user_id,
 | |
|                                            stream_id=stream_id)
 | |
| 
 | |
|             self._streams_per_user[auth_user] -= 1
 | |
|             if not self._streams_per_user[auth_user]:
 | |
|                 del self._streams_per_user[auth_user]
 | |
| 
 | |
|                 # 10 seconds of grace to allow the client to reconnect again
 | |
|                 #   before we think they're gone
 | |
|                 def _later():
 | |
|                     self.distributor.fire(
 | |
|                         "stopped_user_eventstream", auth_user
 | |
|                     )
 | |
|                     del self._stop_timer_per_user[auth_user]
 | |
| 
 | |
|                 self._stop_timer_per_user[auth_user] = (
 | |
|                     self.clock.call_later(5, _later)
 | |
|                 )
 |