From f31014b18f618d81cb667c2b01146b246d32760c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 1 Oct 2015 17:53:07 +0100 Subject: [PATCH 01/17] Start updating the sync API to match the specification --- synapse/api/filtering.py | 5 +- synapse/handlers/sync.py | 64 +++++++++++------------- synapse/rest/client/v2_alpha/sync.py | 75 ++++++++-------------------- 3 files changed, 54 insertions(+), 90 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 4d570b74f8..c066ce89ef 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -54,7 +54,7 @@ class Filtering(object): ] room_level_definitions = [ - "state", "events", "ephemeral" + "state", "timeline", "ephemeral" ] for key in top_level_definitions: @@ -135,6 +135,9 @@ class Filter(object): def __init__(self, filter_json): self.filter_json = filter_json + def timeline_limit(self): + return self.filter_json.get("room", {}).get("timeline", {}).get(limit, 10) + def filter_public_user_data(self, events): return self._filter_on_key(events, ["public_user_data"]) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9914ff6f9c..2a0e045430 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,21 +28,26 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", - "limit", - "gap", - "sort", - "backfill", "filter", ]) +class TimelineBatch(collections.namedtuple("TimelineBatch", [ + "prev_batch", + "events", + "limited", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.events) class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "room_id", - "limited", - "published", - "events", + "timeline", "state", - "prev_batch", "ephemeral", ])): __slots__ = [] @@ -51,13 +56,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.events or self.state or self.ephemeral) + return bool(self.timeline or self.state or self.ephemeral) class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync - "private_user_data", # List of private events for the user. - "public_user_data", # List of public events for all users. + "presence", # List of presence events for the user. "rooms", # RoomSyncResult for each room. ])): __slots__ = [] @@ -133,12 +137,6 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ - if sync_config.sort == "timeline,desc": - # TODO(mjark): Handle going through events in reverse order?. - # What does "most recent events" mean when applying the limits mean - # in this case? - raise NotImplementedError() - now_token = yield self.event_sources.get_current_token() presence_stream = self.event_sources.sources["presence"] @@ -155,20 +153,15 @@ class SyncHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - # TODO (mjark): Does public mean "published"? - published_rooms = yield self.store.get_rooms(is_public=True) - published_room_ids = set(r["room_id"] for r in published_rooms) - rooms = [] for event in room_list: room_sync = yield self.initial_sync_for_room( - event.room_id, sync_config, now_token, published_room_ids + event.room_id, sync_config, now_token, ) rooms.append(room_sync) defer.returnValue(SyncResult( - public_user_data=presence, - private_user_data=[], + presence=presence, rooms=rooms, next_batch=now_token, )) @@ -192,7 +185,6 @@ class SyncHandler(BaseHandler): defer.returnValue(RoomSyncResult( room_id=room_id, - published=room_id in published_room_ids, events=recents, prev_batch=prev_batch_token, state=current_state_events, @@ -219,7 +211,6 @@ class SyncHandler(BaseHandler): presence, presence_key = yield presence_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.presence_key, - limit=sync_config.limit, ) now_token = now_token.copy_and_replace("presence_key", presence_key) @@ -227,7 +218,6 @@ class SyncHandler(BaseHandler): typing, typing_key = yield typing_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.typing_key, - limit=sync_config.limit, ) now_token = now_token.copy_and_replace("typing_key", typing_key) @@ -252,16 +242,18 @@ class SyncHandler(BaseHandler): published_rooms = yield self.store.get_rooms(is_public=True) published_room_ids = set(r["room_id"] for r in published_rooms) + timeline_limit = sync_config.filter.timeline_limit() + room_events, _ = yield self.store.get_room_events_stream( sync_config.user.to_string(), from_key=since_token.room_key, to_key=now_token.room_key, room_id=None, - limit=sync_config.limit + 1, + limit=timeline_limit + 1, ) rooms = [] - if len(room_events) <= sync_config.limit: + if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. events_by_room_id = {} @@ -365,8 +357,9 @@ class SyncHandler(BaseHandler): max_repeat = 3 # Only try a few times per room, otherwise room_key = now_token.room_key end_key = room_key + timeline_limit = sync_config.filter.timeline_limit() - while limited and len(recents) < sync_config.limit and max_repeat: + while limited and len(recents) < timeline_limit and max_repeat: events, keys = yield self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, @@ -393,7 +386,9 @@ class SyncHandler(BaseHandler): "room_key", room_key ) - defer.returnValue((recents, prev_batch_token, limited)) + defer.returnValue(TimelineBatch( + events=recents, prev_batch=prev_batch_token, limited=limited + )) @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, @@ -408,7 +403,7 @@ class SyncHandler(BaseHandler): # TODO(mjark): Check for redactions we might have missed. - recents, prev_batch_token, limited = yield self.load_filtered_recents( + batch = yield self.load_filtered_recents( room_id, sync_config, now_token, since_token, ) @@ -437,11 +432,8 @@ class SyncHandler(BaseHandler): room_sync = RoomSyncResult( room_id=room_id, - published=room_id in published_room_ids, - events=recents, - prev_batch=prev_batch_token, + timeline=batch, state=state_events_delta, - limited=limited, ephemeral=typing_by_room.get(room_id, []) ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cac28b47b6..ea6600b1d5 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -36,47 +36,35 @@ class SyncRestServlet(RestServlet): GET parameters:: timeout(int): How long to wait for new events in milliseconds. - limit(int): Maxiumum number of events per room to return. - gap(bool): Create gaps the message history if limit is exceeded to - ensure that the client has the most recent messages. Defaults to - "true". - sort(str,str): tuple of sort key (e.g. "timeline") and direction - (e.g. "asc", "desc"). Defaults to "timeline,asc". since(batch_token): Batch token when asking for incremental deltas. set_presence(str): What state the device presence should be set to. default is "online". - backfill(bool): Should the HS request message history from other - servers. This may take a long time making it unsuitable for clients - expecting a prompt response. Defaults to "true". filter(filter_id): A filter to apply to the events returned. - filter_*: Filter override parameters. Response JSON:: { - "next_batch": // batch token for the next /sync - "private_user_data": // private events for this user. - "public_user_data": // public events for all users including the - // public events for this user. - "rooms": [{ // List of rooms with updates. - "room_id": // Id of the room being updated - "limited": // Was the per-room event limit exceeded? - "published": // Is the room published by our HS? - "event_map": // Map of EventID -> event JSON. - "events": { // The recent events in the room if gap is "true" - // otherwise the next events in the room. - "batch": [] // list of EventIDs in the "event_map". - "prev_batch": // back token for getting previous events. - } - "state": [] // list of EventIDs updating the current state to - // be what it should be at the end of the batch. - "ephemeral": [] + "next_batch": // batch token for the next /sync + "presence": // presence data for the user. + "rooms": { + "roomlist": [{ // List of rooms with updates. + "room_id": // Id of the room being updated + "event_map": // Map of EventID -> event JSON. + "timeline": { // The recent events in the room if gap is "true" + "limited": // Was the per-room event limit exceeded? + // otherwise the next events in the room. + "batch": [] // list of EventIDs in the "event_map". + "prev_batch": // back token for getting previous events. + } + "state": [] // list of EventIDs updating the current state to + // be what it should be at the end of the batch. + "ephemeral": [] }] + } } """ PATTERN = client_v2_pattern("/sync$") - ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) - ALLOWED_PRESENCE = set(["online", "offline", "idle"]) + ALLOWED_PRESENCE = set(["online", "offline"]) def __init__(self, hs): super(SyncRestServlet, self).__init__() @@ -90,45 +78,29 @@ class SyncRestServlet(RestServlet): user, token_id = yield self.auth.get_user_by_req(request) timeout = parse_integer(request, "timeout", default=0) - limit = parse_integer(request, "limit", required=True) - gap = parse_boolean(request, "gap", default=True) - sort = parse_string( - request, "sort", default="timeline,asc", - allowed_values=self.ALLOWED_SORT - ) since = parse_string(request, "since") set_presence = parse_string( request, "set_presence", default="online", allowed_values=self.ALLOWED_PRESENCE ) - backfill = parse_boolean(request, "backfill", default=False) filter_id = parse_string(request, "filter", default=None) logger.info( - "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r," - " set_presence=%r, backfill=%r, filter_id=%r" % ( - user, timeout, limit, gap, sort, since, set_presence, - backfill, filter_id + "/sync: user=%r, timeout=%r, since=%r," + " set_presence=%r, filter_id=%r" % ( + user, timeout, since, set_presence, filter_id ) ) - # TODO(mjark): Load filter and apply overrides. try: filter = yield self.filtering.get_user_filter( user.localpart, filter_id ) except: filter = Filter({}) - # filter = filter.apply_overrides(http_request) - # if filter.matches(event): - # # stuff sync_config = SyncConfig( user=user, - gap=gap, - limit=limit, - sort=sort, - backfill=backfill, filter=filter, ) @@ -144,11 +116,8 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() response_content = { - "public_user_data": self.encode_user_data( - sync_result.public_user_data, filter, time_now - ), - "private_user_data": self.encode_user_data( - sync_result.private_user_data, filter, time_now + "presence": self.encode_user_data( + sync_result.presence, filter, time_now ), "rooms": self.encode_rooms( sync_result.rooms, filter, time_now, token_id From 471555b3a815968c4d7e41a1b99390c6a7917a21 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 5 Oct 2015 16:39:22 +0100 Subject: [PATCH 02/17] Move the rooms out into a room_map mapping from room_id to room. --- synapse/api/filtering.py | 8 ++++- synapse/handlers/sync.py | 27 ++++++----------- synapse/rest/client/v2_alpha/sync.py | 44 ++++++++++++++++++++-------- 3 files changed, 47 insertions(+), 32 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index c066ce89ef..2d5431ba60 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -136,7 +136,13 @@ class Filter(object): self.filter_json = filter_json def timeline_limit(self): - return self.filter_json.get("room", {}).get("timeline", {}).get(limit, 10) + return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10) + + def presence_limit(self): + return self.filter_json.get("presence", {}).get("limit", 10) + + def ephemeral_limit(self): + return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10) def filter_public_user_data(self, events): return self._filter_on_key(events, ["public_user_data"]) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2a0e045430..9d488fa251 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -31,6 +31,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [ "filter", ]) + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -44,6 +45,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ """ return bool(self.events) + class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "room_id", "timeline", @@ -125,11 +127,7 @@ class SyncHandler(BaseHandler): if since_token is None: return self.initial_sync(sync_config) else: - if sync_config.gap: - return self.incremental_sync_with_gap(sync_config, since_token) - else: - # TODO(mjark): Handle gapless sync - raise NotImplementedError() + return self.incremental_sync_with_gap(sync_config, since_token) @defer.inlineCallbacks def initial_sync(self, sync_config): @@ -174,7 +172,7 @@ class SyncHandler(BaseHandler): A Deferred RoomSyncResult. """ - recents, prev_batch_token, limited = yield self.load_filtered_recents( + batch = yield self.load_filtered_recents( room_id, sync_config, now_token, ) @@ -185,10 +183,8 @@ class SyncHandler(BaseHandler): defer.returnValue(RoomSyncResult( room_id=room_id, - events=recents, - prev_batch=prev_batch_token, + timeline=batch, state=current_state_events, - limited=limited, ephemeral=[], )) @@ -199,18 +195,13 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ - if sync_config.sort == "timeline,desc": - # TODO(mjark): Handle going through events in reverse order?. - # What does "most recent events" mean when applying the limits mean - # in this case? - raise NotImplementedError() - now_token = yield self.event_sources.get_current_token() presence_source = self.event_sources.sources["presence"] presence, presence_key = yield presence_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.presence_key, + limit=sync_config.filter.presence_limit(), ) now_token = now_token.copy_and_replace("presence_key", presence_key) @@ -218,6 +209,7 @@ class SyncHandler(BaseHandler): typing, typing_key = yield typing_source.get_new_events_for_user( user=sync_config.user, from_key=since_token.typing_key, + limit=sync_config.filter.ephemeral_limit(), ) now_token = now_token.copy_and_replace("typing_key", typing_key) @@ -295,8 +287,7 @@ class SyncHandler(BaseHandler): rooms.append(room_sync) defer.returnValue(SyncResult( - public_user_data=presence, - private_user_data=[], + presence=presence, rooms=rooms, next_batch=now_token, )) @@ -407,7 +398,7 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token, ) - logging.debug("Recents %r", recents) + logging.debug("Recents %r", batch) # TODO(mjark): This seems racy since this isn't being passed a # token to indicate what point in the stream this is diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index ea6600b1d5..1f3824d924 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer, parse_boolean + RestServlet, parse_string, parse_integer ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken @@ -46,8 +46,14 @@ class SyncRestServlet(RestServlet): "next_batch": // batch token for the next /sync "presence": // presence data for the user. "rooms": { - "roomlist": [{ // List of rooms with updates. - "room_id": // Id of the room being updated + "default": { + "invited": [], // Ids of invited rooms being updated. + "joined": [], // Ids of joined rooms being updated. + "archived": [] // Ids of archived rooms being updated. + } + } + "room_map": { + "${room_id}": { // Id of the room being updated "event_map": // Map of EventID -> event JSON. "timeline": { // The recent events in the room if gap is "true" "limited": // Was the per-room event limit exceeded? @@ -58,7 +64,7 @@ class SyncRestServlet(RestServlet): "state": [] // list of EventIDs updating the current state to // be what it should be at the end of the batch. "ephemeral": [] - }] + } } } """ @@ -115,13 +121,16 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() + room_map, rooms = self.encode_rooms( + sync_result.rooms, filter, time_now, token_id + ) + response_content = { "presence": self.encode_user_data( sync_result.presence, filter, time_now ), - "rooms": self.encode_rooms( - sync_result.rooms, filter, time_now, token_id - ), + "room_map": room_map, + "rooms": rooms, "next_batch": sync_result.next_batch.to_string(), } @@ -131,10 +140,21 @@ class SyncRestServlet(RestServlet): return events def encode_rooms(self, rooms, filter, time_now, token_id): - return [ - self.encode_room(room, filter, time_now, token_id) - for room in rooms - ] + room_map = {} + joined = [] + for room in rooms: + room_map[room.room_id] = self.encode_room( + room, filter, time_now, token_id + ) + joined.append(room.room_id) + + return room_map, { + "default": { + "joined": joined, + "invited": [], + "archived": [], + } + } @staticmethod def encode_room(room, filter, time_now, token_id): @@ -159,7 +179,6 @@ class SyncRestServlet(RestServlet): ) recent_event_ids.append(event.event_id) result = { - "room_id": room.room_id, "event_map": event_map, "events": { "batch": recent_event_ids, @@ -167,7 +186,6 @@ class SyncRestServlet(RestServlet): }, "state": state_event_ids, "limited": room.limited, - "published": room.published, "ephemeral": room.ephemeral, } return result From e3d3205cd953342ce84b8a148c4f469ce7b79b7a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Oct 2015 15:55:20 +0100 Subject: [PATCH 03/17] Update the sync response to match the latest spec --- synapse/rest/client/v2_alpha/sync.py | 46 +++++++++++++--------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1f3824d924..84011918af 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -45,26 +45,29 @@ class SyncRestServlet(RestServlet): { "next_batch": // batch token for the next /sync "presence": // presence data for the user. - "rooms": { - "default": { "invited": [], // Ids of invited rooms being updated. "joined": [], // Ids of joined rooms being updated. "archived": [] // Ids of archived rooms being updated. } } - "room_map": { - "${room_id}": { // Id of the room being updated - "event_map": // Map of EventID -> event JSON. - "timeline": { // The recent events in the room if gap is "true" + "rooms": { + "joined": { // Joined rooms being updated. + "${room_id}": { // Id of the room being updated + "event_map": // Map of EventID -> event JSON. + "timeline": { // The recent events in the room if gap is "true" "limited": // Was the per-room event limit exceeded? - // otherwise the next events in the room. - "batch": [] // list of EventIDs in the "event_map". + // otherwise the next events in the room. + "events": [] // list of EventIDs in the "event_map". "prev_batch": // back token for getting previous events. + } + "state": {"events": []} // list of EventIDs updating the + // current state to be what it should + // be at the end of the batch. + "ephemeral": {"events": []} // list of event objects } - "state": [] // list of EventIDs updating the current state to - // be what it should be at the end of the batch. - "ephemeral": [] - } + }, + "invited": {}, // Ids of invited rooms being updated. + "archived": {} // Ids of archived rooms being updated. } } """ @@ -121,7 +124,7 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() - room_map, rooms = self.encode_rooms( + rooms = self.encode_rooms( sync_result.rooms, filter, time_now, token_id ) @@ -129,7 +132,6 @@ class SyncRestServlet(RestServlet): "presence": self.encode_user_data( sync_result.presence, filter, time_now ), - "room_map": room_map, "rooms": rooms, "next_batch": sync_result.next_batch.to_string(), } @@ -140,20 +142,16 @@ class SyncRestServlet(RestServlet): return events def encode_rooms(self, rooms, filter, time_now, token_id): - room_map = {} - joined = [] + joined = {} for room in rooms: - room_map[room.room_id] = self.encode_room( + joined[room.room_id] = self.encode_room( room, filter, time_now, token_id ) - joined.append(room.room_id) - return room_map, { - "default": { - "joined": joined, - "invited": [], - "archived": [], - } + return { + "joined": joined, + "invited": {}, + "archived": {}, } @staticmethod From dfef2b41aa3202b130661c3c423b2cf7d0dbba97 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 8 Oct 2015 15:17:43 +0100 Subject: [PATCH 04/17] Update the v2 room sync format to match the current v2 spec --- synapse/handlers/sync.py | 25 +++++++++++-------------- synapse/rest/client/v2_alpha/sync.py | 14 +++++++------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9d488fa251..76cca7c621 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -165,8 +165,7 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def initial_sync_for_room(self, room_id, sync_config, now_token, - published_room_ids): + def initial_sync_for_room(self, room_id, sync_config, now_token): """Sync a room for a client which is starting without any state Returns: A Deferred RoomSyncResult. @@ -230,10 +229,6 @@ class SyncHandler(BaseHandler): sync_config.user ) - # TODO (mjark): Does public mean "published"? - published_rooms = yield self.store.get_rooms(is_public=True) - published_room_ids = set(r["room_id"] for r in published_rooms) - timeline_limit = sync_config.filter.timeline_limit() room_events, _ = yield self.store.get_room_events_stream( @@ -268,11 +263,12 @@ class SyncHandler(BaseHandler): room_sync = RoomSyncResult( room_id=room_id, - published=room_id in published_room_ids, - events=recents, - prev_batch=prev_batch, + timeline=TimelineBatch( + events=recents, + prev_batch=prev_batch, + limited=False, + ), state=state, - limited=False, ephemeral=typing_by_room.get(room_id, []) ) if room_sync: @@ -344,11 +340,11 @@ class SyncHandler(BaseHandler): limited = True recents = [] filtering_factor = 2 - load_limit = max(sync_config.limit * filtering_factor, 100) + timeline_limit = sync_config.filter.timeline_limit() + load_limit = max(timeline_limit * filtering_factor, 100) max_repeat = 3 # Only try a few times per room, otherwise room_key = now_token.room_key end_key = room_key - timeline_limit = sync_config.filter.timeline_limit() while limited and len(recents) < timeline_limit and max_repeat: events, keys = yield self.store.get_recent_events_for_room( @@ -369,8 +365,9 @@ class SyncHandler(BaseHandler): limited = False max_repeat -= 1 - if len(recents) > sync_config.limit: - recents = recents[-sync_config.limit:] + if len(recents) > timeline_limit: + limited = True + recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before prev_batch_token = now_token.copy_and_replace( diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 84011918af..97bf95acfb 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -158,7 +158,7 @@ class SyncRestServlet(RestServlet): def encode_room(room, filter, time_now, token_id): event_map = {} state_events = filter.filter_room_state(room.state) - recent_events = filter.filter_room_events(room.events) + recent_events = filter.filter_room_events(room.timeline.events) state_event_ids = [] recent_event_ids = [] for event in state_events: @@ -178,13 +178,13 @@ class SyncRestServlet(RestServlet): recent_event_ids.append(event.event_id) result = { "event_map": event_map, - "events": { - "batch": recent_event_ids, - "prev_batch": room.prev_batch.to_string(), + "timeline": { + "events": recent_event_ids, + "prev_batch": room.timeline.prev_batch.to_string(), + "limited": room.timeline.limited, }, - "state": state_event_ids, - "limited": room.limited, - "ephemeral": room.ephemeral, + "state": {"events": state_event_ids}, + "ephemeral": {"events": room.ephemeral}, } return result From c15cf6ac069386df3095b5c69af96f0c76ce5276 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Oct 2015 18:50:15 +0100 Subject: [PATCH 05/17] Format the presence events correctly for v2 --- synapse/rest/client/v2_alpha/sync.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 97bf95acfb..f20b830eda 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -26,6 +26,7 @@ from synapse.events.utils import ( from synapse.api.filtering import Filter from ._base import client_v2_pattern +import copy import logging logger = logging.getLogger(__name__) @@ -129,7 +130,7 @@ class SyncRestServlet(RestServlet): ) response_content = { - "presence": self.encode_user_data( + "presence": self.encode_presence( sync_result.presence, filter, time_now ), "rooms": rooms, @@ -138,8 +139,13 @@ class SyncRestServlet(RestServlet): defer.returnValue((200, response_content)) - def encode_user_data(self, events, filter, time_now): - return events + def encode_presence(self, events, filter, time_now): + formatted = [] + for event in events: + event = copy.deepcopy(event) + event['sender'] = event['content'].pop('user_id'); + formatted.append(event) + return {"events": formatted} def encode_rooms(self, rooms, filter, time_now, token_id): joined = {} From 0a96a9a02371bd36970db7dfdb2d4c6e98e0200e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 9 Oct 2015 19:57:50 +0100 Subject: [PATCH 06/17] Set the user as online if they start polling the v2 sync --- synapse/rest/client/v2_alpha/sync.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f20b830eda..3348b46c14 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -79,6 +79,7 @@ class SyncRestServlet(RestServlet): def __init__(self, hs): super(SyncRestServlet, self).__init__() self.auth = hs.get_auth() + self.event_stream_handler = hs.get_handlers().event_stream_handler self.sync_handler = hs.get_handlers().sync_handler self.clock = hs.get_clock() self.filtering = hs.get_filtering() @@ -119,9 +120,16 @@ class SyncRestServlet(RestServlet): else: since_token = None - sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout - ) + if set_presence == "online": + yield self.event_stream_handler.started_stream(user) + + try: + sync_result = yield self.sync_handler.wait_for_sync_for_user( + sync_config, since_token=since_token, timeout=timeout + ) + finally: + if set_presence == "online": + self.event_stream_handler.stopped_stream(user) time_now = self.clock.time_msec() From 586beb8318bd259581918a8b47f5981f0b90b7e9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 12 Oct 2015 16:54:58 +0100 Subject: [PATCH 07/17] Update the filters to match the latest spec. Apply the filter the 'timeline' and 'ephemeral' keys of rooms. Apply the filter to the 'presence' key of a sync response. --- synapse/api/filtering.py | 53 ++++++++++++++++++---------- synapse/handlers/sync.py | 6 ++-- synapse/rest/client/v2_alpha/sync.py | 26 ++++++-------- tests/api/test_filtering.py | 12 +++---- 4 files changed, 55 insertions(+), 42 deletions(-) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 2d5431ba60..e79e91e7eb 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -144,17 +144,14 @@ class Filter(object): def ephemeral_limit(self): return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10) - def filter_public_user_data(self, events): - return self._filter_on_key(events, ["public_user_data"]) - - def filter_private_user_data(self, events): - return self._filter_on_key(events, ["private_user_data"]) + def filter_presence(self, events): + return self._filter_on_key(events, ["presence"]) def filter_room_state(self, events): return self._filter_on_key(events, ["room", "state"]) - def filter_room_events(self, events): - return self._filter_on_key(events, ["room", "events"]) + def filter_room_timeline(self, events): + return self._filter_on_key(events, ["room", "timeline"]) def filter_room_ephemeral(self, events): return self._filter_on_key(events, ["room", "ephemeral"]) @@ -178,11 +175,34 @@ class Filter(object): return [e for e in events if self._passes_definition(definition, e)] def _passes_definition(self, definition, event): + """Check if the event passes the filter definition + Args: + definition(dict): The filter definition to check against + event(dict or Event): The event to check + Returns: + True if the event passes the filter in the definition + """ + if type(event) is dict: + room_id = event.get("room_id") + sender = event.get("sender") + event_type = event["type"] + else: + room_id = getattr(event, "room_id", None) + sender = getattr(event, "sender", None) + event_type = event.type + return self._event_passes_definition( + definition, room_id, sender, event_type + ) + + def _event_passes_definition(self, definition, room_id, sender, + event_type): """Check if the event passes through the given definition. Args: definition(dict): The definition to check against. - event(Event): The event to check. + room_id(str): The id of the room this event is in or None. + sender(str): The sender of the event + event_type(str): The type of the event. Returns: True if the event passes through the filter. """ @@ -194,8 +214,7 @@ class Filter(object): # and 'not_types' then it is treated as only being in 'not_types') # room checks - if hasattr(event, "room_id"): - room_id = event.room_id + if room_id is not None: allow_rooms = definition.get("rooms", None) reject_rooms = definition.get("not_rooms", None) if reject_rooms and room_id in reject_rooms: @@ -204,9 +223,7 @@ class Filter(object): return False # sender checks - if hasattr(event, "sender"): - # Should we be including event.state_key for some event types? - sender = event.sender + if sender is not None: allow_senders = definition.get("senders", None) reject_senders = definition.get("not_senders", None) if reject_senders and sender in reject_senders: @@ -217,12 +234,12 @@ class Filter(object): # type checks if "not_types" in definition: for def_type in definition["not_types"]: - if self._event_matches_type(event, def_type): + if self._event_matches_type(event_type, def_type): return False if "types" in definition: included = False for def_type in definition["types"]: - if self._event_matches_type(event, def_type): + if self._event_matches_type(event_type, def_type): included = True break if not included: @@ -230,9 +247,9 @@ class Filter(object): return True - def _event_matches_type(self, event, def_type): + def _event_matches_type(self, event_type, def_type): if def_type.endswith("*"): type_prefix = def_type[:-1] - return event.type.startswith(type_prefix) + return event_type.startswith(type_prefix) else: - return event.type == def_type + return event_type == def_type diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 76cca7c621..edc728ece1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -277,7 +277,7 @@ class SyncHandler(BaseHandler): for room_id in room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - published_room_ids, typing_by_room + typing_by_room ) if room_sync: rooms.append(room_sync) @@ -355,7 +355,7 @@ class SyncHandler(BaseHandler): ) (room_key, _) = keys end_key = "s" + room_key.split('-')[-1] - loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents = sync_config.filter.filter_room_timeline(events) loaded_recents = yield self._filter_events_for_client( sync_config.user.to_string(), room_id, loaded_recents, ) @@ -381,7 +381,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - published_room_ids, typing_by_room): + typing_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 3348b46c14..1223a4a7f6 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -46,11 +46,6 @@ class SyncRestServlet(RestServlet): { "next_batch": // batch token for the next /sync "presence": // presence data for the user. - "invited": [], // Ids of invited rooms being updated. - "joined": [], // Ids of joined rooms being updated. - "archived": [] // Ids of archived rooms being updated. - } - } "rooms": { "joined": { // Joined rooms being updated. "${room_id}": { // Id of the room being updated @@ -67,8 +62,8 @@ class SyncRestServlet(RestServlet): "ephemeral": {"events": []} // list of event objects } }, - "invited": {}, // Ids of invited rooms being updated. - "archived": {} // Ids of archived rooms being updated. + "invited": {}, // Invited rooms being updated. + "archived": {} // Archived rooms being updated. } } """ @@ -151,9 +146,9 @@ class SyncRestServlet(RestServlet): formatted = [] for event in events: event = copy.deepcopy(event) - event['sender'] = event['content'].pop('user_id'); + event['sender'] = event['content'].pop('user_id') formatted.append(event) - return {"events": formatted} + return {"events": filter.filter_presence(formatted)} def encode_rooms(self, rooms, filter, time_now, token_id): joined = {} @@ -172,9 +167,10 @@ class SyncRestServlet(RestServlet): def encode_room(room, filter, time_now, token_id): event_map = {} state_events = filter.filter_room_state(room.state) - recent_events = filter.filter_room_events(room.timeline.events) + timeline_events = filter.filter_room_timeline(room.timeline.events) + ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) state_event_ids = [] - recent_event_ids = [] + timeline_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -183,22 +179,22 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - for event in recent_events: + for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( event, time_now, token_id=token_id, event_format=format_event_for_client_v2_without_event_id, ) - recent_event_ids.append(event.event_id) + timeline_event_ids.append(event.event_id) result = { "event_map": event_map, "timeline": { - "events": recent_event_ids, + "events": timeline_event_ids, "prev_batch": room.timeline.prev_batch.to_string(), "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, - "ephemeral": {"events": room.ephemeral}, + "ephemeral": {"events": ephemeral_events}, } return result diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 65b2f590c8..6942cdac51 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -345,9 +345,9 @@ class FilteringTestCase(unittest.TestCase): ) @defer.inlineCallbacks - def test_filter_public_user_data_match(self): + def test_filter_presence_match(self): user_filter_json = { - "public_user_data": { + "presence": { "types": ["m.*"] } } @@ -368,13 +368,13 @@ class FilteringTestCase(unittest.TestCase): filter_id=filter_id, ) - results = user_filter.filter_public_user_data(events=events) + results = user_filter.filter_presence(events=events) self.assertEquals(events, results) @defer.inlineCallbacks - def test_filter_public_user_data_no_match(self): + def test_filter_presence_no_match(self): user_filter_json = { - "public_user_data": { + "presence": { "types": ["m.*"] } } @@ -395,7 +395,7 @@ class FilteringTestCase(unittest.TestCase): filter_id=filter_id, ) - results = user_filter.filter_public_user_data(events=events) + results = user_filter.filter_presence(events=events) self.assertEquals([], results) @defer.inlineCallbacks From 956509dfecccca944d89bc9e9f002e5039cf81fc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 10:24:51 +0100 Subject: [PATCH 08/17] Start spliting out the rooms into joined and invited in v2 sync --- synapse/handlers/sync.py | 58 +++++++++++++++++++--------- synapse/rest/client/v2_alpha/sync.py | 18 ++++----- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index edc728ece1..e693e7c80e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -46,7 +46,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ return bool(self.events) -class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ +class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "room_id", "timeline", "state", @@ -61,10 +61,24 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ return bool(self.timeline or self.state or self.ephemeral) +class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ + "room_id", + "invite_state", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.invite_state) + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. - "rooms", # RoomSyncResult for each room. + "joined", # JoinedSyncResult for each joined room. + "invited", # InvitedSyncResult for each invited room. ])): __slots__ = [] @@ -151,24 +165,31 @@ class SyncHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) - rooms = [] + joined = [] for event in room_list: - room_sync = yield self.initial_sync_for_room( - event.room_id, sync_config, now_token, - ) - rooms.append(room_sync) + if event.membership == Membership.JOIN: + room_sync = yield self.initial_sync_for_room( + event.room_id, sync_config, now_token, + ) + joined.append(room_sync) + elif event.membership == Membership.INVITE: + invited.append(InvitedSyncResult( + room_id=event.room_id, + invited_state=[event], + ) defer.returnValue(SyncResult( presence=presence, - rooms=rooms, + joined=joined, + invited=[], next_batch=now_token, )) @defer.inlineCallbacks - def initial_sync_for_room(self, room_id, sync_config, now_token): + def initial_sync_for_joined_room(self, room_id, sync_config, now_token): """Sync a room for a client which is starting without any state Returns: - A Deferred RoomSyncResult. + A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( @@ -180,7 +201,7 @@ class SyncHandler(BaseHandler): ) current_state_events = current_state.values() - defer.returnValue(RoomSyncResult( + defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, state=current_state_events, @@ -239,7 +260,7 @@ class SyncHandler(BaseHandler): limit=timeline_limit + 1, ) - rooms = [] + joined = [] if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. @@ -261,7 +282,7 @@ class SyncHandler(BaseHandler): sync_config, room_id, state ) - room_sync = RoomSyncResult( + room_sync = JoinedSyncResult( room_id=room_id, timeline=TimelineBatch( events=recents, @@ -272,7 +293,7 @@ class SyncHandler(BaseHandler): ephemeral=typing_by_room.get(room_id, []) ) if room_sync: - rooms.append(room_sync) + joined.append(room_sync) else: for room_id in room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( @@ -280,11 +301,12 @@ class SyncHandler(BaseHandler): typing_by_room ) if room_sync: - rooms.append(room_sync) + joined.append(room_sync) defer.returnValue(SyncResult( presence=presence, - rooms=rooms, + joined=joined, + invited=[], next_batch=now_token, )) @@ -386,7 +408,7 @@ class SyncHandler(BaseHandler): the room. Gives the client the most recent events and the changes to state. Returns: - A Deferred RoomSyncResult + A Deferred JoinedSyncResult """ # TODO(mjark): Check for redactions we might have missed. @@ -418,7 +440,7 @@ class SyncHandler(BaseHandler): sync_config, room_id, state_events_delta ) - room_sync = RoomSyncResult( + room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, state=state_events_delta, diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1223a4a7f6..9b87879f51 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -128,15 +128,19 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() - rooms = self.encode_rooms( - sync_result.rooms, filter, time_now, token_id + joined = self.encode_joined( + sync_result.joined, filter, time_now, token_id ) response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now ), - "rooms": rooms, + "rooms": { + "joined": joined, + "invited": {}, + "archived": {}, + }, "next_batch": sync_result.next_batch.to_string(), } @@ -150,18 +154,14 @@ class SyncRestServlet(RestServlet): formatted.append(event) return {"events": filter.filter_presence(formatted)} - def encode_rooms(self, rooms, filter, time_now, token_id): + def encode_joined(self, rooms, filter, time_now, token_id): joined = {} for room in rooms: joined[room.room_id] = self.encode_room( room, filter, time_now, token_id ) - return { - "joined": joined, - "invited": {}, - "archived": {}, - } + return joined @staticmethod def encode_room(room, filter, time_now, token_id): From ab9cf732585244781ba67f4bb4c235ded3d4661a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 11:03:48 +0100 Subject: [PATCH 09/17] Include invited rooms in the initial sync --- synapse/handlers/sync.py | 16 ++++++---------- synapse/rest/client/v2_alpha/sync.py | 21 ++++++++++++++++++++- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e693e7c80e..574412d6b5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -63,16 +63,10 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ "room_id", - "invite_state", + "invite", ])): __slots__ = [] - def __nonzero__(self): - """Make the result appear empty if there are no updates. This is used - to tell if room needs to be part of the sync result. - """ - return bool(self.invite_state) - class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync @@ -166,6 +160,7 @@ class SyncHandler(BaseHandler): ) joined = [] + invited = [] for event in room_list: if event.membership == Membership.JOIN: room_sync = yield self.initial_sync_for_room( @@ -173,15 +168,16 @@ class SyncHandler(BaseHandler): ) joined.append(room_sync) elif event.membership == Membership.INVITE: + invite = yield self.store.get_event(event.event_id) invited.append(InvitedSyncResult( room_id=event.room_id, - invited_state=[event], - ) + invite=invite, + )) defer.returnValue(SyncResult( presence=presence, joined=joined, - invited=[], + invited=invited, next_batch=now_token, )) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 9b87879f51..399df9e772 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -132,13 +132,17 @@ class SyncRestServlet(RestServlet): sync_result.joined, filter, time_now, token_id ) + invited = self.encode_invited( + sync_result.invited, filter, time_now, token_id + ) + response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now ), "rooms": { "joined": joined, - "invited": {}, + "invited": invited, "archived": {}, }, "next_batch": sync_result.next_batch.to_string(), @@ -163,6 +167,21 @@ class SyncRestServlet(RestServlet): return joined + def encode_invited(self, rooms, filter, time_now, token_id): + invited = {} + for room in rooms: + invite = serialize_event( + room.invite, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, + ) + invited_state = invite.get("unsigned", {}).pop("invite_room_state", []) + invited_state.append(invite) + invited[room.room_id] = { + "invite_state": { "events": invited_state } + } + + return invited + @staticmethod def encode_room(room, filter, time_now, token_id): event_map = {} From 54414221e4ced47e632144afa7d768a7e252214c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 11:43:12 +0100 Subject: [PATCH 10/17] Include invites in incremental sync --- synapse/handlers/sync.py | 31 ++++++++++++++++++++++------ synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 574412d6b5..d9e55d8a58 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -163,7 +163,7 @@ class SyncHandler(BaseHandler): invited = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.initial_sync_for_room( + room_sync = yield self.initial_sync_for_joined_room( event.room_id, sync_config, now_token, ) joined.append(room_sync) @@ -240,9 +240,9 @@ class SyncHandler(BaseHandler): ) if app_service: rooms = yield self.store.get_app_service_rooms(app_service) - room_ids = set(r.room_id for r in rooms) + joined_room_ids = set(r.room_id for r in rooms) else: - room_ids = yield rm_handler.get_joined_rooms_for_user( + joined_room_ids = yield rm_handler.get_joined_rooms_for_user( sync_config.user ) @@ -260,11 +260,17 @@ class SyncHandler(BaseHandler): if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. + invite_events = [] events_by_room_id = {} for event in room_events: events_by_room_id.setdefault(event.room_id, []).append(event) + if event.room_id not in joined_room_ids: + if (event.type == EventTypes.Member + and event.membership == Membership.INVITE + and event.state_key == sync_config.user.to_string()): + invite_events.append(event) - for room_id in room_ids: + for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) state = [event for event in recents if event.is_state()] if recents: @@ -291,7 +297,15 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) else: - for room_id in room_ids: + invites = yield self.store.get_rooms_for_user_where_membership_is( + user_id=sync_config.user.to_string(), + membership_list=[Membership.INVITE], + ) + invite_events = yield self.store.get_events( + [invite.event_id for invite in invites] + ) + + for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, typing_by_room @@ -299,10 +313,15 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) + invited = [ + InvitedSyncResult(room_id=event.room_id, invite=event) + for event in invite_events + ] + defer.returnValue(SyncResult( presence=presence, joined=joined, - invited=[], + invited=invited, next_batch=now_token, )) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 399df9e772..fffecb24f5 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -177,7 +177,7 @@ class SyncRestServlet(RestServlet): invited_state = invite.get("unsigned", {}).pop("invite_room_state", []) invited_state.append(invite) invited[room.room_id] = { - "invite_state": { "events": invited_state } + "invite_state": {"events": invited_state} } return invited From cacf0688c691517dab55c3cff294b6bac7f0d6e3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 14:08:38 +0100 Subject: [PATCH 11/17] Add a get_invites_for_user method to the storage to find out the rooms a user is invited to --- synapse/handlers/sync.py | 8 ++------ synapse/storage/roommember.py | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d9e55d8a58..380798b7ab 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -297,12 +297,8 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) else: - invites = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE], - ) - invite_events = yield self.store.get_events( - [invite.event_id for invite in invites] + invite_events = yield self.store.get_invites_for_user( + sync_config.user.to_string() ) for room_id in joined_room_ids: diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c40d9a8a6..dd98dcfda8 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -110,6 +110,20 @@ class RoomMemberStore(SQLBaseStore): membership=membership, ).addCallback(self._get_events) + def get_invites_for_user(self, user_id): + """ Get all the invite events for a user + Args: + user_id (str): The user ID. + Returns: + A deferred list of event objects. + """ + + return self.get_rooms_for_user_where_membership_is( + user_id, [Membership.INVITE] + ).addCallback(lambda invites: self._get_events([ + invites.event_id for invite in invites + ])) + def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. From 7639c3d9e53cdb6222df6a8e1b12bc2a40612367 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 17:13:04 +0100 Subject: [PATCH 12/17] Bounce all deferreds through the reactor to make debugging easier. If all deferreds wait a reactor tick before resolving then there is always a chance to add an errback to the deferred so that stacktraces get reported, rather than being discarded. --- synapse/app/homeserver.py | 2 ++ synapse/util/debug.py | 68 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 synapse/util/debug.py diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index af53acb369..1c84242aa3 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,6 +33,8 @@ if __name__ == '__main__': sys.stderr.writelines(message) sys.exit(1) + from synapse.util.debug import debug_deferreds + debug_deferreds() from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain diff --git a/synapse/util/debug.py b/synapse/util/debug.py new file mode 100644 index 0000000000..66ac12c291 --- /dev/null +++ b/synapse/util/debug.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor +from functools import wraps +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext + +def with_logging_context(fn): + context = LoggingContext.current_context() + def restore_context_callback(x): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = context + return fn(x) + return restore_context_callback + +def debug_deferreds(): + """Cause all deferreds to wait for a reactor tick before running their + callbacks. This increases the chance of getting a stack trace out of + a defer.inlineCallback since the code waiting on the deferred will get + a chance to add an errback before the deferred runs.""" + + # We are going to modify the __init__ method of defer.Deferred so we + # need to get a copy of the old method so we can still call it. + old__init__ = defer.Deferred.__init__ + + # We need to create a deferred to bounce the callbacks through the reactor + # but we don't want to add a callback when we create that deferred so we + # we create a new type of deferred that uses the old __init__ method. + # This is safe as long as the old __init__ method doesn't invoke an + # __init__ using super. + class Bouncer(defer.Deferred): + __init__ = old__init__ + + # We'll add this as a callback to all Deferreds. Twisted will wait until + # the bouncer deferred resolves before calling the callbacks of the + # original deferred. + def bounce_callback(x): + bouncer = Bouncer() + reactor.callLater(0, with_logging_context(bouncer.callback), x) + return bouncer + + # We'll add this as an errback to all Deferreds. Twisted will wait until + # the bouncer deferred resolves before calling the errbacks of the + # original deferred. + def bounce_errback(x): + bouncer = Bouncer() + reactor.callLater(0, with_logging_context(bouncer.errback), x) + return bouncer + + @wraps(old__init__) + def new__init__(self, *args, **kargs): + old__init__(self, *args, **kargs) + self.addCallbacks(bounce_callback, bounce_errback) + + defer.Deferred.__init__ = new__init__ + From 32d66738b0229aa7f011d203d0cb7963f950bb95 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 17:18:29 +0100 Subject: [PATCH 13/17] Fix pep8 warnings. --- synapse/util/debug.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/util/debug.py b/synapse/util/debug.py index 66ac12c291..f6a5a841a4 100644 --- a/synapse/util/debug.py +++ b/synapse/util/debug.py @@ -17,13 +17,6 @@ from twisted.internet import defer, reactor from functools import wraps from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -def with_logging_context(fn): - context = LoggingContext.current_context() - def restore_context_callback(x): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = context - return fn(x) - return restore_context_callback def debug_deferreds(): """Cause all deferreds to wait for a reactor tick before running their @@ -31,6 +24,18 @@ def debug_deferreds(): a defer.inlineCallback since the code waiting on the deferred will get a chance to add an errback before the deferred runs.""" + # Helper method for retrieving and restoring the current logging context + # around a callback. + def with_logging_context(fn): + context = LoggingContext.current_context() + + def restore_context_callback(x): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = context + return fn(x) + + return restore_context_callback + # We are going to modify the __init__ method of defer.Deferred so we # need to get a copy of the old method so we can still call it. old__init__ = defer.Deferred.__init__ @@ -65,4 +70,3 @@ def debug_deferreds(): self.addCallbacks(bounce_callback, bounce_errback) defer.Deferred.__init__ = new__init__ - From 9020860479a9f70ae4d05ddcdc231d7e336474e3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 17:50:44 +0100 Subject: [PATCH 14/17] Only turn on the twisted deferred debugging if full_twisted_stacktraces is set in the config --- synapse/app/homeserver.py | 3 --- synapse/config/logger.py | 8 ++++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1c84242aa3..cf2fa221dc 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,9 +33,6 @@ if __name__ == '__main__': sys.stderr.writelines(message) sys.exit(1) - from synapse.util.debug import debug_deferreds - debug_deferreds() - from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain from synapse.storage.prepare_database import UpgradeDatabaseException diff --git a/synapse/config/logger.py b/synapse/config/logger.py index bd0c17c861..a13dc170c4 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -22,6 +22,7 @@ import yaml from string import Template import os import signal +from synapse.util.debug import debug_deferreds DEFAULT_LOG_CONFIG = Template(""" @@ -69,6 +70,8 @@ class LoggingConfig(Config): self.verbosity = config.get("verbose", 0) self.log_config = self.abspath(config.get("log_config")) self.log_file = self.abspath(config.get("log_file")) + if config.get("full_twisted_stacktraces"): + debug_deferreds() def default_config(self, config_dir_path, server_name, **kwargs): log_file = self.abspath("homeserver.log") @@ -84,6 +87,11 @@ class LoggingConfig(Config): # A yaml python logging config file log_config: "%(log_config)s" + + # Stop twisted from discarding the stack traces of exceptions in + # deferreds by waiting a reactor tick before running a deferred's + # callbacks. + # full_twisted_stacktraces: true """ % locals() def read_arguments(self, args): From 1941eb315d692c44b0e21fb3fbf1b95eed138d53 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 13 Oct 2015 18:00:02 +0100 Subject: [PATCH 15/17] Enable stack traces for the demo scripts --- demo/start.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/demo/start.sh b/demo/start.sh index a90561488d..d90115ec97 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -38,6 +38,9 @@ for port in 8080 8081 8082; do perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config + echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config + echo "report_stats: false" >> $DIR/etc/$port.config + python -m synapse.app.homeserver \ --config-path "$DIR/etc/$port.config" \ -D \ From 858634e1d0ca489bb546851d6ee052d548870b06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Oct 2015 09:29:08 +0100 Subject: [PATCH 16/17] Remove unused room_id arg --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 10 +++++----- synapse/handlers/sync.py | 2 +- synapse/storage/state.py | 10 +++++----- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3882ba79ed..a710bdcfdb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -242,7 +242,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), + frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, None), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b70258697b..dfeeae76db 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -164,7 +164,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): event_id_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), + frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id), @@ -290,7 +290,7 @@ class MessageHandler(BaseHandler): elif member_event.membership == Membership.LEAVE: key = (event_type, state_key) room_state = yield self.store.get_state_for_events( - room_id, [member_event.event_id], [key] + [member_event.event_id], [key] ) data = room_state[member_event.event_id].get(key) @@ -314,7 +314,7 @@ class MessageHandler(BaseHandler): room_state = yield self.state_handler.get_current_state(room_id) elif member_event.membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - room_id, [member_event.event_id], None + [member_event.event_id], None ) room_state = room_state[member_event.event_id] @@ -406,7 +406,7 @@ class MessageHandler(BaseHandler): elif event.membership == Membership.LEAVE: room_end_token = "s%d" % (event.stream_ordering,) deferred_room_state = self.store.get_state_for_events( - event.room_id, [event.event_id], None + [event.event_id], None ) deferred_room_state.addCallback( lambda states: states[event.event_id] @@ -499,7 +499,7 @@ class MessageHandler(BaseHandler): def _room_initial_sync_parted(self, user_id, room_id, pagin_config, member_event): room_state = yield self.store.get_state_for_events( - member_event.room_id, [member_event.event_id], None + [member_event.event_id], None ) room_state = room_state[member_event.event_id] diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9914ff6f9c..a8940de166 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -312,7 +312,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): event_id_to_state = yield self.store.get_state_for_events( - room_id, frozenset(e.event_id for e in events), + frozenset(e.event_id for e in events), types=( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id), diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e935b9443b..6f2a50d585 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -54,7 +54,7 @@ class StateStore(SQLBaseStore): defer.returnValue({}) event_to_groups = yield self._get_state_group_for_events( - room_id, event_ids, + event_ids, ) groups = set(event_to_groups.values()) @@ -208,7 +208,7 @@ class StateStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_state_for_events(self, room_id, event_ids, types): + def get_state_for_events(self, event_ids, types): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. @@ -225,7 +225,7 @@ class StateStore(SQLBaseStore): The dicts are mappings from (type, state_key) -> state_events """ event_to_groups = yield self._get_state_group_for_events( - room_id, event_ids, + event_ids, ) groups = set(event_to_groups.values()) @@ -251,8 +251,8 @@ class StateStore(SQLBaseStore): ) @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", - num_args=2) - def _get_state_group_for_events(self, room_id, event_ids): + num_args=1) + def _get_state_group_for_events(self, event_ids): """Returns mapping event_id -> state_group """ def f(txn): From c185c1c4139f9ea1ad5b586a2cfd9f658cffcbb3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 14 Oct 2015 13:16:53 +0100 Subject: [PATCH 17/17] Fix v2 sync polling --- synapse/handlers/sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 380798b7ab..7b2d6e3457 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -82,7 +82,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ events. """ return bool( - self.private_user_data or self.public_user_data or self.rooms + self.presence or self.joined or self.invited ) @@ -122,8 +122,8 @@ class SyncHandler(BaseHandler): ) result = yield self.notifier.wait_for_events( - sync_config.user, room_ids, - sync_config.filter, timeout, current_sync_callback + sync_config.user, room_ids, timeout, current_sync_callback, + from_token=since_token ) defer.returnValue(result)