Move the rooms out into a room_map mapping from room_id to room.

pull/305/head
Mark Haines 2015-10-05 16:39:22 +01:00
parent f31014b18f
commit 471555b3a8
3 changed files with 47 additions and 32 deletions

View File

@ -136,7 +136,13 @@ class Filter(object):
self.filter_json = filter_json
def timeline_limit(self):
return self.filter_json.get("room", {}).get("timeline", {}).get(limit, 10)
return self.filter_json.get("room", {}).get("timeline", {}).get("limit", 10)
def presence_limit(self):
return self.filter_json.get("presence", {}).get("limit", 10)
def ephemeral_limit(self):
return self.filter_json.get("room", {}).get("ephemeral", {}).get("limit", 10)
def filter_public_user_data(self, events):
return self._filter_on_key(events, ["public_user_data"])

View File

@ -31,6 +31,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter",
])
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
@ -44,6 +45,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
"""
return bool(self.events)
class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"room_id",
"timeline",
@ -125,11 +127,7 @@ class SyncHandler(BaseHandler):
if since_token is None:
return self.initial_sync(sync_config)
else:
if sync_config.gap:
return self.incremental_sync_with_gap(sync_config, since_token)
else:
# TODO(mjark): Handle gapless sync
raise NotImplementedError()
return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks
def initial_sync(self, sync_config):
@ -174,7 +172,7 @@ class SyncHandler(BaseHandler):
A Deferred RoomSyncResult.
"""
recents, prev_batch_token, limited = yield self.load_filtered_recents(
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token,
)
@ -185,10 +183,8 @@ class SyncHandler(BaseHandler):
defer.returnValue(RoomSyncResult(
room_id=room_id,
events=recents,
prev_batch=prev_batch_token,
timeline=batch,
state=current_state_events,
limited=limited,
ephemeral=[],
))
@ -199,18 +195,13 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token()
presence_source = self.event_sources.sources["presence"]
presence, presence_key = yield presence_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.presence_key,
limit=sync_config.filter.presence_limit(),
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
@ -218,6 +209,7 @@ class SyncHandler(BaseHandler):
typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.typing_key,
limit=sync_config.filter.ephemeral_limit(),
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
@ -295,8 +287,7 @@ class SyncHandler(BaseHandler):
rooms.append(room_sync)
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
presence=presence,
rooms=rooms,
next_batch=now_token,
))
@ -407,7 +398,7 @@ class SyncHandler(BaseHandler):
room_id, sync_config, now_token, since_token,
)
logging.debug("Recents %r", recents)
logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a
# token to indicate what point in the stream this is

View File

@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
RestServlet, parse_string, parse_integer
)
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
@ -46,8 +46,14 @@ class SyncRestServlet(RestServlet):
"next_batch": // batch token for the next /sync
"presence": // presence data for the user.
"rooms": {
"roomlist": [{ // List of rooms with updates.
"room_id": // Id of the room being updated
"default": {
"invited": [], // Ids of invited rooms being updated.
"joined": [], // Ids of joined rooms being updated.
"archived": [] // Ids of archived rooms being updated.
}
}
"room_map": {
"${room_id}": { // Id of the room being updated
"event_map": // Map of EventID -> event JSON.
"timeline": { // The recent events in the room if gap is "true"
"limited": // Was the per-room event limit exceeded?
@ -58,7 +64,7 @@ class SyncRestServlet(RestServlet):
"state": [] // list of EventIDs updating the current state to
// be what it should be at the end of the batch.
"ephemeral": []
}]
}
}
}
"""
@ -115,13 +121,16 @@ class SyncRestServlet(RestServlet):
time_now = self.clock.time_msec()
room_map, rooms = self.encode_rooms(
sync_result.rooms, filter, time_now, token_id
)
response_content = {
"presence": self.encode_user_data(
sync_result.presence, filter, time_now
),
"rooms": self.encode_rooms(
sync_result.rooms, filter, time_now, token_id
),
"room_map": room_map,
"rooms": rooms,
"next_batch": sync_result.next_batch.to_string(),
}
@ -131,10 +140,21 @@ class SyncRestServlet(RestServlet):
return events
def encode_rooms(self, rooms, filter, time_now, token_id):
return [
self.encode_room(room, filter, time_now, token_id)
for room in rooms
]
room_map = {}
joined = []
for room in rooms:
room_map[room.room_id] = self.encode_room(
room, filter, time_now, token_id
)
joined.append(room.room_id)
return room_map, {
"default": {
"joined": joined,
"invited": [],
"archived": [],
}
}
@staticmethod
def encode_room(room, filter, time_now, token_id):
@ -159,7 +179,6 @@ class SyncRestServlet(RestServlet):
)
recent_event_ids.append(event.event_id)
result = {
"room_id": room.room_id,
"event_map": event_map,
"events": {
"batch": recent_event_ids,
@ -167,7 +186,6 @@ class SyncRestServlet(RestServlet):
},
"state": state_event_ids,
"limited": room.limited,
"published": room.published,
"ephemeral": room.ephemeral,
}
return result