commit
						6dc92d3427
					
				|  | @ -89,7 +89,47 @@ def prune_event(event): | |||
|     return type(event)(allowed_fields) | ||||
| 
 | ||||
| 
 | ||||
| def serialize_event(e, time_now_ms, client_event=True): | ||||
| def format_event_raw(d): | ||||
|     return d | ||||
| 
 | ||||
| 
 | ||||
| def format_event_for_client_v1(d): | ||||
|     d["user_id"] = d.pop("sender", None) | ||||
| 
 | ||||
|     move_keys = ("age", "redacted_because", "replaces_state", "prev_content") | ||||
|     for key in move_keys: | ||||
|         if key in d["unsigned"]: | ||||
|             d[key] = d["unsigned"][key] | ||||
| 
 | ||||
|     drop_keys = ( | ||||
|         "auth_events", "prev_events", "hashes", "signatures", "depth", | ||||
|         "unsigned", "origin", "prev_state" | ||||
|     ) | ||||
|     for key in drop_keys: | ||||
|         d.pop(key, None) | ||||
|     return d | ||||
| 
 | ||||
| 
 | ||||
| def format_event_for_client_v2(d): | ||||
|     drop_keys = ( | ||||
|         "auth_events", "prev_events", "hashes", "signatures", "depth", | ||||
|         "origin", "prev_state", | ||||
|     ) | ||||
|     for key in drop_keys: | ||||
|         d.pop(key, None) | ||||
|     return d | ||||
| 
 | ||||
| 
 | ||||
| def format_event_for_client_v2_without_event_id(d): | ||||
|     d = format_event_for_client_v2(d) | ||||
|     d.pop("room_id", None) | ||||
|     d.pop("event_id", None) | ||||
|     return d | ||||
| 
 | ||||
| 
 | ||||
| def serialize_event(e, time_now_ms, as_client_event=True, | ||||
|                     event_format=format_event_for_client_v1, | ||||
|                     token_id=None): | ||||
|     # FIXME(erikj): To handle the case of presence events and the like | ||||
|     if not isinstance(e, EventBase): | ||||
|         return e | ||||
|  | @ -99,43 +139,22 @@ def serialize_event(e, time_now_ms, client_event=True): | |||
|     # Should this strip out None's? | ||||
|     d = {k: v for k, v in e.get_dict().items()} | ||||
| 
 | ||||
|     if not client_event: | ||||
|         # set the age and keep all other keys | ||||
|         if "age_ts" in d["unsigned"]: | ||||
|             d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] | ||||
|         return d | ||||
| 
 | ||||
|     if "age_ts" in d["unsigned"]: | ||||
|         d["age"] = time_now_ms - d["unsigned"]["age_ts"] | ||||
|         d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] | ||||
|         del d["unsigned"]["age_ts"] | ||||
| 
 | ||||
|     d["user_id"] = d.pop("sender", None) | ||||
| 
 | ||||
|     if "redacted_because" in e.unsigned: | ||||
|         d["redacted_because"] = serialize_event( | ||||
|         d["unsigned"]["redacted_because"] = serialize_event( | ||||
|             e.unsigned["redacted_because"], time_now_ms | ||||
|         ) | ||||
| 
 | ||||
|         del d["unsigned"]["redacted_because"] | ||||
|     if token_id is not None: | ||||
|         if token_id == getattr(e.internal_metadata, "token_id", None): | ||||
|             txn_id = getattr(e.internal_metadata, "txn_id", None) | ||||
|             if txn_id is not None: | ||||
|                 d["unsigned"]["transaction_id"] = txn_id | ||||
| 
 | ||||
|     if "redacted_by" in e.unsigned: | ||||
|         d["redacted_by"] = e.unsigned["redacted_by"] | ||||
|         del d["unsigned"]["redacted_by"] | ||||
| 
 | ||||
|     if "replaces_state" in e.unsigned: | ||||
|         d["replaces_state"] = e.unsigned["replaces_state"] | ||||
|         del d["unsigned"]["replaces_state"] | ||||
| 
 | ||||
|     if "prev_content" in e.unsigned: | ||||
|         d["prev_content"] = e.unsigned["prev_content"] | ||||
|         del d["unsigned"]["prev_content"] | ||||
| 
 | ||||
|     del d["auth_events"] | ||||
|     del d["prev_events"] | ||||
|     del d["hashes"] | ||||
|     del d["signatures"] | ||||
|     d.pop("depth", None) | ||||
|     d.pop("unsigned", None) | ||||
|     d.pop("origin", None) | ||||
| 
 | ||||
|     return d | ||||
|     if as_client_event: | ||||
|         return event_format(d) | ||||
|     else: | ||||
|         return d | ||||
|  |  | |||
|  | @ -26,6 +26,7 @@ from .presence import PresenceHandler | |||
| from .directory import DirectoryHandler | ||||
| from .typing import TypingNotificationHandler | ||||
| from .admin import AdminHandler | ||||
| from .sync import SyncHandler | ||||
| 
 | ||||
| 
 | ||||
| class Handlers(object): | ||||
|  | @ -51,3 +52,4 @@ class Handlers(object): | |||
|         self.directory_handler = DirectoryHandler(hs) | ||||
|         self.typing_notification_handler = TypingNotificationHandler(hs) | ||||
|         self.admin_handler = AdminHandler(hs) | ||||
|         self.sync_handler = SyncHandler(hs) | ||||
|  |  | |||
|  | @ -0,0 +1,434 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2015 OpenMarket Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from ._base import BaseHandler | ||||
| 
 | ||||
| from synapse.streams.config import PaginationConfig | ||||
| from synapse.api.constants import Membership, EventTypes | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| import collections | ||||
| import logging | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| SyncConfig = collections.namedtuple("SyncConfig", [ | ||||
|     "user", | ||||
|     "client_info", | ||||
|     "limit", | ||||
|     "gap", | ||||
|     "sort", | ||||
|     "backfill", | ||||
|     "filter", | ||||
| ]) | ||||
| 
 | ||||
| 
 | ||||
| class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ | ||||
|     "room_id", | ||||
|     "limited", | ||||
|     "published", | ||||
|     "events", | ||||
|     "state", | ||||
|     "prev_batch", | ||||
|     "ephemeral", | ||||
| ])): | ||||
|     __slots__ = [] | ||||
| 
 | ||||
|     def __nonzero__(self): | ||||
|         """Make the result appear empty if there are no updates. This is used | ||||
|         to tell if room needs to be part of the sync result. | ||||
|         """ | ||||
|         return bool(self.events or self.state or self.ephemeral) | ||||
| 
 | ||||
| 
 | ||||
| class SyncResult(collections.namedtuple("SyncResult", [ | ||||
|     "next_batch",  # Token for the next sync | ||||
|     "private_user_data",  # List of private events for the user. | ||||
|     "public_user_data",  # List of public events for all users. | ||||
|     "rooms",  # RoomSyncResult for each room. | ||||
| ])): | ||||
|     __slots__ = [] | ||||
| 
 | ||||
|     def __nonzero__(self): | ||||
|         """Make the result appear empty if there are no updates. This is used | ||||
|         to tell if the notifier needs to wait for more events when polling for | ||||
|         events. | ||||
|         """ | ||||
|         return bool( | ||||
|             self.private_user_data or self.public_user_data or self.rooms | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| class SyncHandler(BaseHandler): | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         super(SyncHandler, self).__init__(hs) | ||||
|         self.event_sources = hs.get_event_sources() | ||||
|         self.clock = hs.get_clock() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): | ||||
|         """Get the sync for a client if we have new data for it now. Otherwise | ||||
|         wait for new data to arrive on the server. If the timeout expires, then | ||||
|         return an empty sync result. | ||||
|         Returns: | ||||
|             A Deferred SyncResult. | ||||
|         """ | ||||
|         if timeout == 0 or since_token is None: | ||||
|             result = yield self.current_sync_for_user(sync_config, since_token) | ||||
|             defer.returnValue(result) | ||||
|         else: | ||||
|             def current_sync_callback(): | ||||
|                 return self.current_sync_for_user(sync_config, since_token) | ||||
| 
 | ||||
|             rm_handler = self.hs.get_handlers().room_member_handler | ||||
|             room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) | ||||
|             result = yield self.notifier.wait_for_events( | ||||
|                 sync_config.user, room_ids, | ||||
|                 sync_config.filter, timeout, current_sync_callback | ||||
|             ) | ||||
|             defer.returnValue(result) | ||||
| 
 | ||||
|     def current_sync_for_user(self, sync_config, since_token=None): | ||||
|         """Get the sync for client needed to match what the server has now. | ||||
|         Returns: | ||||
|             A Deferred SyncResult. | ||||
|         """ | ||||
|         if since_token is None: | ||||
|             return self.initial_sync(sync_config) | ||||
|         else: | ||||
|             if sync_config.gap: | ||||
|                 return self.incremental_sync_with_gap(sync_config, since_token) | ||||
|             else: | ||||
|                 #TODO(mjark): Handle gapless sync | ||||
|                 raise NotImplementedError() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def initial_sync(self, sync_config): | ||||
|         """Get a sync for a client which is starting without any state | ||||
|         Returns: | ||||
|             A Deferred SyncResult. | ||||
|         """ | ||||
|         if sync_config.sort == "timeline,desc": | ||||
|             # TODO(mjark): Handle going through events in reverse order?. | ||||
|             # What does "most recent events" mean when applying the limits mean | ||||
|             # in this case? | ||||
|             raise NotImplementedError() | ||||
| 
 | ||||
|         now_token = yield self.event_sources.get_current_token() | ||||
| 
 | ||||
|         presence_stream = self.event_sources.sources["presence"] | ||||
|         # TODO (mjark): This looks wrong, shouldn't we be getting the presence | ||||
|         # UP to the present rather than after the present? | ||||
|         pagination_config = PaginationConfig(from_token=now_token) | ||||
|         presence, _ = yield presence_stream.get_pagination_rows( | ||||
|             user=sync_config.user, | ||||
|             pagination_config=pagination_config.get_source_config("presence"), | ||||
|             key=None | ||||
|         ) | ||||
|         room_list = yield self.store.get_rooms_for_user_where_membership_is( | ||||
|             user_id=sync_config.user.to_string(), | ||||
|             membership_list=[Membership.INVITE, Membership.JOIN] | ||||
|         ) | ||||
| 
 | ||||
|         # TODO (mjark): Does public mean "published"? | ||||
|         published_rooms = yield self.store.get_rooms(is_public=True) | ||||
|         published_room_ids = set(r["room_id"] for r in published_rooms) | ||||
| 
 | ||||
|         rooms = [] | ||||
|         for event in room_list: | ||||
|             room_sync = yield self.initial_sync_for_room( | ||||
|                 event.room_id, sync_config, now_token, published_room_ids | ||||
|             ) | ||||
|             rooms.append(room_sync) | ||||
| 
 | ||||
|         defer.returnValue(SyncResult( | ||||
|             public_user_data=presence, | ||||
|             private_user_data=[], | ||||
|             rooms=rooms, | ||||
|             next_batch=now_token, | ||||
|         )) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def initial_sync_for_room(self, room_id, sync_config, now_token, | ||||
|                               published_room_ids): | ||||
|         """Sync a room for a client which is starting without any state | ||||
|         Returns: | ||||
|             A Deferred RoomSyncResult. | ||||
|         """ | ||||
| 
 | ||||
|         recents, prev_batch_token, limited = yield self.load_filtered_recents( | ||||
|             room_id, sync_config, now_token, | ||||
|         ) | ||||
| 
 | ||||
|         current_state_events = yield self.state_handler.get_current_state( | ||||
|             room_id | ||||
|         ) | ||||
| 
 | ||||
|         defer.returnValue(RoomSyncResult( | ||||
|             room_id=room_id, | ||||
|             published=room_id in published_room_ids, | ||||
|             events=recents, | ||||
|             prev_batch=prev_batch_token, | ||||
|             state=current_state_events, | ||||
|             limited=limited, | ||||
|             ephemeral=[], | ||||
|         )) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def incremental_sync_with_gap(self, sync_config, since_token): | ||||
|         """ Get the incremental delta needed to bring the client up to | ||||
|         date with the server. | ||||
|         Returns: | ||||
|             A Deferred SyncResult. | ||||
|         """ | ||||
|         if sync_config.sort == "timeline,desc": | ||||
|             # TODO(mjark): Handle going through events in reverse order?. | ||||
|             # What does "most recent events" mean when applying the limits mean | ||||
|             # in this case? | ||||
|             raise NotImplementedError() | ||||
| 
 | ||||
|         now_token = yield self.event_sources.get_current_token() | ||||
| 
 | ||||
|         presence_source = self.event_sources.sources["presence"] | ||||
|         presence, presence_key = yield presence_source.get_new_events_for_user( | ||||
|             user=sync_config.user, | ||||
|             from_key=since_token.presence_key, | ||||
|             limit=sync_config.limit, | ||||
|         ) | ||||
|         now_token = now_token.copy_and_replace("presence_key", presence_key) | ||||
| 
 | ||||
|         typing_source = self.event_sources.sources["typing"] | ||||
|         typing, typing_key = yield typing_source.get_new_events_for_user( | ||||
|             user=sync_config.user, | ||||
|             from_key=since_token.typing_key, | ||||
|             limit=sync_config.limit, | ||||
|         ) | ||||
|         now_token = now_token.copy_and_replace("typing_key", typing_key) | ||||
| 
 | ||||
|         typing_by_room = {event["room_id"]: [event] for event in typing} | ||||
|         for event in typing: | ||||
|             event.pop("room_id") | ||||
|         logger.debug("Typing %r", typing_by_room) | ||||
| 
 | ||||
|         rm_handler = self.hs.get_handlers().room_member_handler | ||||
|         room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) | ||||
| 
 | ||||
|         # TODO (mjark): Does public mean "published"? | ||||
|         published_rooms = yield self.store.get_rooms(is_public=True) | ||||
|         published_room_ids = set(r["room_id"] for r in published_rooms) | ||||
| 
 | ||||
|         room_events, _ = yield self.store.get_room_events_stream( | ||||
|             sync_config.user.to_string(), | ||||
|             from_key=since_token.room_key, | ||||
|             to_key=now_token.room_key, | ||||
|             room_id=None, | ||||
|             limit=sync_config.limit + 1, | ||||
|         ) | ||||
| 
 | ||||
|         rooms = [] | ||||
|         if len(room_events) <= sync_config.limit: | ||||
|             # There is no gap in any of the rooms. Therefore we can just | ||||
|             # partition the new events by room and return them. | ||||
|             events_by_room_id = {} | ||||
|             for event in room_events: | ||||
|                 events_by_room_id.setdefault(event.room_id, []).append(event) | ||||
| 
 | ||||
|             for room_id in room_ids: | ||||
|                 recents = events_by_room_id.get(room_id, []) | ||||
|                 state = [event for event in recents if event.is_state()] | ||||
|                 if recents: | ||||
|                     prev_batch = now_token.copy_and_replace( | ||||
|                         "room_key", recents[0].internal_metadata.before | ||||
|                     ) | ||||
|                 else: | ||||
|                     prev_batch = now_token | ||||
| 
 | ||||
|                 state = yield self.check_joined_room( | ||||
|                     sync_config, room_id, state | ||||
|                 ) | ||||
| 
 | ||||
|                 room_sync = RoomSyncResult( | ||||
|                     room_id=room_id, | ||||
|                     published=room_id in published_room_ids, | ||||
|                     events=recents, | ||||
|                     prev_batch=prev_batch, | ||||
|                     state=state, | ||||
|                     limited=False, | ||||
|                     ephemeral=typing_by_room.get(room_id, []) | ||||
|                 ) | ||||
|                 if room_sync: | ||||
|                     rooms.append(room_sync) | ||||
|         else: | ||||
|             for room_id in room_ids: | ||||
|                 room_sync = yield self.incremental_sync_with_gap_for_room( | ||||
|                     room_id, sync_config, since_token, now_token, | ||||
|                     published_room_ids, typing_by_room | ||||
|                 ) | ||||
|                 if room_sync: | ||||
|                     rooms.append(room_sync) | ||||
| 
 | ||||
|         defer.returnValue(SyncResult( | ||||
|             public_user_data=presence, | ||||
|             private_user_data=[], | ||||
|             rooms=rooms, | ||||
|             next_batch=now_token, | ||||
|         )) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def load_filtered_recents(self, room_id, sync_config, now_token, | ||||
|                               since_token=None): | ||||
|         limited = True | ||||
|         recents = [] | ||||
|         filtering_factor = 2 | ||||
|         load_limit = max(sync_config.limit * filtering_factor, 100) | ||||
|         max_repeat = 3  # Only try a few times per room, otherwise | ||||
|         room_key = now_token.room_key | ||||
| 
 | ||||
|         while limited and len(recents) < sync_config.limit and max_repeat: | ||||
|             events, keys = yield self.store.get_recent_events_for_room( | ||||
|                 room_id, | ||||
|                 limit=load_limit + 1, | ||||
|                 from_token=since_token.room_key if since_token else None, | ||||
|                 end_token=room_key, | ||||
|             ) | ||||
|             (room_key, _) = keys | ||||
|             loaded_recents = sync_config.filter.filter_room_events(events) | ||||
|             loaded_recents.extend(recents) | ||||
|             recents = loaded_recents | ||||
|             if len(events) <= load_limit: | ||||
|                 limited = False | ||||
|             max_repeat -= 1 | ||||
| 
 | ||||
|         if len(recents) > sync_config.limit: | ||||
|             recents = recents[-sync_config.limit:] | ||||
|             room_key = recents[0].internal_metadata.before | ||||
| 
 | ||||
|         prev_batch_token = now_token.copy_and_replace( | ||||
|             "room_key", room_key | ||||
|         ) | ||||
| 
 | ||||
|         defer.returnValue((recents, prev_batch_token, limited)) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def incremental_sync_with_gap_for_room(self, room_id, sync_config, | ||||
|                                            since_token, now_token, | ||||
|                                            published_room_ids, typing_by_room): | ||||
|         """ Get the incremental delta needed to bring the client up to date for | ||||
|         the room. Gives the client the most recent events and the changes to | ||||
|         state. | ||||
|         Returns: | ||||
|             A Deferred RoomSyncResult | ||||
|         """ | ||||
| 
 | ||||
|         # TODO(mjark): Check for redactions we might have missed. | ||||
| 
 | ||||
|         recents, prev_batch_token, limited = yield self.load_filtered_recents( | ||||
|             room_id, sync_config, now_token, since_token, | ||||
|         ) | ||||
| 
 | ||||
|         logging.debug("Recents %r", recents) | ||||
| 
 | ||||
|         # TODO(mjark): This seems racy since this isn't being passed a | ||||
|         # token to indicate what point in the stream this is | ||||
|         current_state_events = yield self.state_handler.get_current_state( | ||||
|             room_id | ||||
|         ) | ||||
| 
 | ||||
|         state_at_previous_sync = yield self.get_state_at_previous_sync( | ||||
|             room_id, since_token=since_token | ||||
|         ) | ||||
| 
 | ||||
|         state_events_delta = yield self.compute_state_delta( | ||||
|             since_token=since_token, | ||||
|             previous_state=state_at_previous_sync, | ||||
|             current_state=current_state_events, | ||||
|         ) | ||||
| 
 | ||||
|         state_events_delta = yield self.check_joined_room( | ||||
|             sync_config, room_id, state_events_delta | ||||
|         ) | ||||
| 
 | ||||
|         room_sync = RoomSyncResult( | ||||
|             room_id=room_id, | ||||
|             published=room_id in published_room_ids, | ||||
|             events=recents, | ||||
|             prev_batch=prev_batch_token, | ||||
|             state=state_events_delta, | ||||
|             limited=limited, | ||||
|             ephemeral=typing_by_room.get(room_id, None) | ||||
|         ) | ||||
| 
 | ||||
|         logging.debug("Room sync: %r", room_sync) | ||||
| 
 | ||||
|         defer.returnValue(room_sync) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def get_state_at_previous_sync(self, room_id, since_token): | ||||
|         """ Get the room state at the previous sync the client made. | ||||
|         Returns: | ||||
|             A Deferred list of Events. | ||||
|         """ | ||||
|         last_events, token = yield self.store.get_recent_events_for_room( | ||||
|             room_id, end_token=since_token.room_key, limit=1, | ||||
|         ) | ||||
| 
 | ||||
|         if last_events: | ||||
|             last_event = last_events[0] | ||||
|             last_context = yield self.state_handler.compute_event_context( | ||||
|                 last_event | ||||
|             ) | ||||
|             if last_event.is_state(): | ||||
|                 state = [last_event] + last_context.current_state.values() | ||||
|             else: | ||||
|                 state = last_context.current_state.values() | ||||
|         else: | ||||
|             state = () | ||||
|         defer.returnValue(state) | ||||
| 
 | ||||
|     def compute_state_delta(self, since_token, previous_state, current_state): | ||||
|         """ Works out the differnce in state between the current state and the | ||||
|         state the client got when it last performed a sync. | ||||
|         Returns: | ||||
|             A list of events. | ||||
|         """ | ||||
|         # TODO(mjark) Check if the state events were received by the server | ||||
|         # after the previous sync, since we need to include those state | ||||
|         # updates even if they occured logically before the previous event. | ||||
|         # TODO(mjark) Check for new redactions in the state events. | ||||
|         previous_dict = {event.event_id: event for event in previous_state} | ||||
|         state_delta = [] | ||||
|         for event in current_state: | ||||
|             if event.event_id not in previous_dict: | ||||
|                 state_delta.append(event) | ||||
|         return state_delta | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def check_joined_room(self, sync_config, room_id, state_delta): | ||||
|         joined = False | ||||
|         for event in state_delta: | ||||
|             if ( | ||||
|                 event.type == EventTypes.Member | ||||
|                 and event.state_key == sync_config.user.to_string() | ||||
|             ): | ||||
|                 if event.content["membership"] == Membership.JOIN: | ||||
|                     joined = True | ||||
| 
 | ||||
|         if joined: | ||||
|             state_delta = yield self.state_handler.get_current_state(room_id) | ||||
| 
 | ||||
|         defer.returnValue(state_delta) | ||||
|  | @ -18,6 +18,7 @@ from twisted.internet import defer | |||
| from synapse.util.logutils import log_function | ||||
| from synapse.util.logcontext import PreserveLoggingContext | ||||
| from synapse.util.async import run_on_reactor | ||||
| from synapse.types import StreamToken | ||||
| 
 | ||||
| import logging | ||||
| 
 | ||||
|  | @ -205,6 +206,53 @@ class Notifier(object): | |||
|                 [notify(l).addErrback(eb) for l in listeners] | ||||
|             ) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def wait_for_events(self, user, rooms, filter, timeout, callback): | ||||
|         """Wait until the callback returns a non empty response or the | ||||
|         timeout fires. | ||||
|         """ | ||||
| 
 | ||||
|         deferred = defer.Deferred() | ||||
| 
 | ||||
|         from_token = StreamToken("s0", "0", "0") | ||||
| 
 | ||||
|         listener = [_NotificationListener( | ||||
|             user=user, | ||||
|             rooms=rooms, | ||||
|             from_token=from_token, | ||||
|             limit=1, | ||||
|             timeout=timeout, | ||||
|             deferred=deferred, | ||||
|         )] | ||||
| 
 | ||||
|         if timeout: | ||||
|             self._register_with_keys(listener[0]) | ||||
| 
 | ||||
|         result = yield callback() | ||||
|         if timeout: | ||||
|             timed_out = [False] | ||||
| 
 | ||||
|             def _timeout_listener(): | ||||
|                 timed_out[0] = True | ||||
|                 listener[0].notify(self, [], from_token, from_token) | ||||
| 
 | ||||
|             self.clock.call_later(timeout/1000., _timeout_listener) | ||||
|             while not result and not timed_out[0]: | ||||
|                 yield deferred | ||||
|                 deferred = defer.Deferred() | ||||
|                 listener[0] = _NotificationListener( | ||||
|                     user=user, | ||||
|                     rooms=rooms, | ||||
|                     from_token=from_token, | ||||
|                     limit=1, | ||||
|                     timeout=timeout, | ||||
|                     deferred=deferred, | ||||
|                 ) | ||||
|                 self._register_with_keys(listener[0]) | ||||
|                 result = yield callback() | ||||
| 
 | ||||
|         defer.returnValue(result) | ||||
| 
 | ||||
|     def get_events_for(self, user, rooms, pagination_config, timeout): | ||||
|         """ For the given user and rooms, return any new events for them. If | ||||
|         there are no new events wait for up to `timeout` milliseconds for any | ||||
|  |  | |||
|  | @ -13,12 +13,11 @@ | |||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| 
 | ||||
| from . import ( | ||||
|     sync, | ||||
|     filter | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| from synapse.http.server import JsonResource | ||||
| 
 | ||||
| 
 | ||||
|  | @ -31,4 +30,5 @@ class ClientV2AlphaRestResource(JsonResource): | |||
| 
 | ||||
|     @staticmethod | ||||
|     def register_servlets(client_resource, hs): | ||||
|         sync.register_servlets(hs, client_resource) | ||||
|         filter.register_servlets(hs, client_resource) | ||||
|  |  | |||
|  | @ -0,0 +1,207 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2015 OpenMarket Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.http.servlet import RestServlet | ||||
| from synapse.handlers.sync import SyncConfig | ||||
| from synapse.types import StreamToken | ||||
| from synapse.events.utils import ( | ||||
|     serialize_event, format_event_for_client_v2_without_event_id, | ||||
| ) | ||||
| from synapse.api.filtering import Filter | ||||
| from ._base import client_v2_pattern | ||||
| 
 | ||||
| import logging | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class SyncRestServlet(RestServlet): | ||||
|     """ | ||||
| 
 | ||||
|     GET parameters:: | ||||
|         timeout(int): How long to wait for new events in milliseconds. | ||||
|         limit(int): Maxiumum number of events per room to return. | ||||
|         gap(bool): Create gaps the message history if limit is exceeded to | ||||
|             ensure that the client has the most recent messages. Defaults to | ||||
|             "true". | ||||
|         sort(str,str): tuple of sort key (e.g. "timeline") and direction | ||||
|             (e.g. "asc", "desc"). Defaults to "timeline,asc". | ||||
|         since(batch_token): Batch token when asking for incremental deltas. | ||||
|         set_presence(str): What state the device presence should be set to. | ||||
|             default is "online". | ||||
|         backfill(bool): Should the HS request message history from other | ||||
|             servers. This may take a long time making it unsuitable for clients | ||||
|             expecting a prompt response. Defaults to "true". | ||||
|         filter(filter_id): A filter to apply to the events returned. | ||||
|         filter_*: Filter override parameters. | ||||
| 
 | ||||
|     Response JSON:: | ||||
|         { | ||||
|             "next_batch": // batch token for the next /sync | ||||
|             "private_user_data": // private events for this user. | ||||
|             "public_user_data": // public events for all users including the | ||||
|                                 // public events for this user. | ||||
|             "rooms": [{ // List of rooms with updates. | ||||
|                 "room_id": // Id of the room being updated | ||||
|                 "limited": // Was the per-room event limit exceeded? | ||||
|                 "published": // Is the room published by our HS? | ||||
|                 "event_map": // Map of EventID -> event JSON. | ||||
|                 "events": { // The recent events in the room if gap is "true" | ||||
|                             // otherwise the next events in the room. | ||||
|                     "batch": [] // list of EventIDs in the "event_map". | ||||
|                     "prev_batch": // back token for getting previous events. | ||||
|                 } | ||||
|                 "state": [] // list of EventIDs updating the current state to | ||||
|                             // be what it should be at the end of the batch. | ||||
|                 "ephemeral": [] | ||||
|             }] | ||||
|         } | ||||
|     """ | ||||
| 
 | ||||
|     PATTERN = client_v2_pattern("/sync$") | ||||
|     ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) | ||||
|     ALLOWED_PRESENCE = set(["online", "offline", "idle"]) | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         super(SyncRestServlet, self).__init__() | ||||
|         self.auth = hs.get_auth() | ||||
|         self.sync_handler = hs.get_handlers().sync_handler | ||||
|         self.clock = hs.get_clock() | ||||
|         self.filtering = hs.get_filtering() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def on_GET(self, request): | ||||
|         user, client = yield self.auth.get_user_by_req(request) | ||||
| 
 | ||||
|         timeout = self.parse_integer(request, "timeout", default=0) | ||||
|         limit = self.parse_integer(request, "limit", required=True) | ||||
|         gap = self.parse_boolean(request, "gap", default=True) | ||||
|         sort = self.parse_string( | ||||
|             request, "sort", default="timeline,asc", | ||||
|             allowed_values=self.ALLOWED_SORT | ||||
|         ) | ||||
|         since = self.parse_string(request, "since") | ||||
|         set_presence = self.parse_string( | ||||
|             request, "set_presence", default="online", | ||||
|             allowed_values=self.ALLOWED_PRESENCE | ||||
|         ) | ||||
|         backfill = self.parse_boolean(request, "backfill", default=False) | ||||
|         filter_id = self.parse_string(request, "filter", default=None) | ||||
| 
 | ||||
|         logger.info( | ||||
|             "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r," | ||||
|             " set_presence=%r, backfill=%r, filter_id=%r" % ( | ||||
|                 user, timeout, limit, gap, sort, since, set_presence, | ||||
|                 backfill, filter_id | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|         # TODO(mjark): Load filter and apply overrides. | ||||
|         try: | ||||
|             filter = yield self.filtering.get_user_filter( | ||||
|                 user.localpart, filter_id | ||||
|             ) | ||||
|         except: | ||||
|             filter = Filter({}) | ||||
|         # filter = filter.apply_overrides(http_request) | ||||
|         #if filter.matches(event): | ||||
|         #   # stuff | ||||
| 
 | ||||
|         sync_config = SyncConfig( | ||||
|             user=user, | ||||
|             client_info=client, | ||||
|             gap=gap, | ||||
|             limit=limit, | ||||
|             sort=sort, | ||||
|             backfill=backfill, | ||||
|             filter=filter, | ||||
|         ) | ||||
| 
 | ||||
|         if since is not None: | ||||
|             since_token = StreamToken.from_string(since) | ||||
|         else: | ||||
|             since_token = None | ||||
| 
 | ||||
|         sync_result = yield self.sync_handler.wait_for_sync_for_user( | ||||
|             sync_config, since_token=since_token, timeout=timeout | ||||
|         ) | ||||
| 
 | ||||
|         time_now = self.clock.time_msec() | ||||
| 
 | ||||
|         response_content = { | ||||
|             "public_user_data": self.encode_user_data( | ||||
|                 sync_result.public_user_data, filter, time_now | ||||
|             ), | ||||
|             "private_user_data": self.encode_user_data( | ||||
|                 sync_result.private_user_data, filter, time_now | ||||
|             ), | ||||
|             "rooms": self.encode_rooms( | ||||
|                 sync_result.rooms, filter, time_now, client.token_id | ||||
|             ), | ||||
|             "next_batch": sync_result.next_batch.to_string(), | ||||
|         } | ||||
| 
 | ||||
|         defer.returnValue((200, response_content)) | ||||
| 
 | ||||
|     def encode_user_data(self, events, filter, time_now): | ||||
|         return events | ||||
| 
 | ||||
|     def encode_rooms(self, rooms, filter, time_now, token_id): | ||||
|         return [ | ||||
|             self.encode_room(room, filter, time_now, token_id) | ||||
|             for room in rooms | ||||
|         ] | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def encode_room(room, filter, time_now, token_id): | ||||
|         event_map = {} | ||||
|         state_events = filter.filter_room_state(room.state) | ||||
|         recent_events = filter.filter_room_events(room.events) | ||||
|         state_event_ids = [] | ||||
|         recent_event_ids = [] | ||||
|         for event in state_events: | ||||
|             # TODO(mjark): Respect formatting requirements in the filter. | ||||
|             event_map[event.event_id] = serialize_event( | ||||
|                 event, time_now, token_id=token_id, | ||||
|                 event_format=format_event_for_client_v2_without_event_id, | ||||
|             ) | ||||
|             state_event_ids.append(event.event_id) | ||||
| 
 | ||||
|         for event in recent_events: | ||||
|             # TODO(mjark): Respect formatting requirements in the filter. | ||||
|             event_map[event.event_id] = serialize_event( | ||||
|                 event, time_now, token_id=token_id, | ||||
|                 event_format=format_event_for_client_v2_without_event_id, | ||||
|             ) | ||||
|             recent_event_ids.append(event.event_id) | ||||
|         result = { | ||||
|             "room_id": room.room_id, | ||||
|             "event_map": event_map, | ||||
|             "events": { | ||||
|                 "batch": recent_event_ids, | ||||
|                 "prev_batch": room.prev_batch.to_string(), | ||||
|             }, | ||||
|             "state": state_event_ids, | ||||
|             "limited": room.limited, | ||||
|             "published": room.published, | ||||
|             "ephemeral": room.ephemeral, | ||||
|         } | ||||
|         return result | ||||
| 
 | ||||
| 
 | ||||
| def register_servlets(hs, http_server): | ||||
|     SyncRestServlet(hs).register(http_server) | ||||
|  | @ -181,8 +181,11 @@ class StreamStore(SQLBaseStore): | |||
|                 get_prev_content=True | ||||
|             ) | ||||
| 
 | ||||
|             self._set_before_and_after(ret, rows) | ||||
| 
 | ||||
|             if rows: | ||||
|                 key = "s%d" % max([r["stream_ordering"] for r in rows]) | ||||
| 
 | ||||
|             else: | ||||
|                 # Assume we didn't get anything because there was nothing to | ||||
|                 # get. | ||||
|  | @ -260,22 +263,44 @@ class StreamStore(SQLBaseStore): | |||
|                 get_prev_content=True | ||||
|             ) | ||||
| 
 | ||||
|             self._set_before_and_after(events, rows) | ||||
| 
 | ||||
|             return events, next_token, | ||||
| 
 | ||||
|         return self.runInteraction("paginate_room_events", f) | ||||
| 
 | ||||
|     def get_recent_events_for_room(self, room_id, limit, end_token, | ||||
|                                    with_feedback=False): | ||||
|                                    with_feedback=False, from_token=None): | ||||
|         # TODO (erikj): Handle compressed feedback | ||||
| 
 | ||||
|         sql = ( | ||||
|             "SELECT stream_ordering, topological_ordering, event_id FROM events " | ||||
|             "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " | ||||
|             "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " | ||||
|         ) | ||||
|         end_token = _StreamToken.parse_stream_token(end_token) | ||||
| 
 | ||||
|         def f(txn): | ||||
|             txn.execute(sql, (room_id, end_token, limit,)) | ||||
|         if from_token is None: | ||||
|             sql = ( | ||||
|                 "SELECT stream_ordering, topological_ordering, event_id" | ||||
|                 " FROM events" | ||||
|                 " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" | ||||
|                 " ORDER BY topological_ordering DESC, stream_ordering DESC" | ||||
|                 " LIMIT ?" | ||||
|             ) | ||||
|         else: | ||||
|             from_token = _StreamToken.parse_stream_token(from_token) | ||||
|             sql = ( | ||||
|                 "SELECT stream_ordering, topological_ordering, event_id" | ||||
|                 " FROM events" | ||||
|                 " WHERE room_id = ? AND stream_ordering > ?" | ||||
|                 " AND stream_ordering <= ? AND outlier = 0" | ||||
|                 " ORDER BY topological_ordering DESC, stream_ordering DESC" | ||||
|                 " LIMIT ?" | ||||
|             ) | ||||
| 
 | ||||
|         def get_recent_events_for_room_txn(txn): | ||||
|             if from_token is None: | ||||
|                 txn.execute(sql, (room_id, end_token.stream, limit,)) | ||||
|             else: | ||||
|                 txn.execute(sql, ( | ||||
|                     room_id, from_token.stream, end_token.stream, limit | ||||
|                 )) | ||||
| 
 | ||||
|             rows = self.cursor_to_dict(txn) | ||||
| 
 | ||||
|  | @ -291,9 +316,9 @@ class StreamStore(SQLBaseStore): | |||
|                 toke = rows[0]["stream_ordering"] - 1 | ||||
|                 start_token = str(_StreamToken(topo, toke)) | ||||
| 
 | ||||
|                 token = (start_token, end_token) | ||||
|                 token = (start_token, str(end_token)) | ||||
|             else: | ||||
|                 token = (end_token, end_token) | ||||
|                 token = (str(end_token), str(end_token)) | ||||
| 
 | ||||
|             events = self._get_events_txn( | ||||
|                 txn, | ||||
|  | @ -301,9 +326,13 @@ class StreamStore(SQLBaseStore): | |||
|                 get_prev_content=True | ||||
|             ) | ||||
| 
 | ||||
|             self._set_before_and_after(events, rows) | ||||
| 
 | ||||
|             return events, token | ||||
| 
 | ||||
|         return self.runInteraction("get_recent_events_for_room", f) | ||||
|         return self.runInteraction( | ||||
|             "get_recent_events_for_room", get_recent_events_for_room_txn | ||||
|         ) | ||||
| 
 | ||||
|     def get_room_events_max_id(self): | ||||
|         return self.runInteraction( | ||||
|  | @ -325,3 +354,12 @@ class StreamStore(SQLBaseStore): | |||
| 
 | ||||
|         key = res[0]["m"] | ||||
|         return "s%d" % (key,) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def _set_before_and_after(events, rows): | ||||
|         for event, row in zip(events, rows): | ||||
|             stream = row["stream_ordering"] | ||||
|             topo = event.depth | ||||
|             internal = event.internal_metadata | ||||
|             internal.before = str(_StreamToken(topo, stream - 1)) | ||||
|             internal.after = str(_StreamToken(topo, stream)) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Mark Haines
						Mark Haines