Merge pull request #305 from matrix-org/markjh/v2_sync_api

Update the v2 sync API to work as specified in the current spec.
pull/308/head
Mark Haines 2015-10-14 13:56:23 +01:00
commit a059760954
3 changed files with 212 additions and 171 deletions

View File

@ -28,21 +28,28 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"limit",
"gap",
"sort",
"backfill",
"filter",
])
class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"room_id",
"limited",
"published",
"events",
"state",
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
"limited",
])):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id",
"timeline",
"state",
"ephemeral",
])):
__slots__ = []
@ -51,14 +58,21 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events or self.state or self.ephemeral)
return bool(self.timeline or self.state or self.ephemeral)
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id",
"invite",
])):
__slots__ = []
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"private_user_data", # List of private events for the user.
"public_user_data", # List of public events for all users.
"rooms", # RoomSyncResult for each room.
"presence", # List of presence events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
])):
__slots__ = []
@ -68,7 +82,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
events.
"""
return bool(
self.private_user_data or self.public_user_data or self.rooms
self.presence or self.joined or self.invited
)
@ -108,8 +122,8 @@ class SyncHandler(BaseHandler):
)
result = yield self.notifier.wait_for_events(
sync_config.user, room_ids,
sync_config.filter, timeout, current_sync_callback
sync_config.user, room_ids, timeout, current_sync_callback,
from_token=since_token
)
defer.returnValue(result)
@ -121,11 +135,7 @@ class SyncHandler(BaseHandler):
if since_token is None:
return self.initial_sync(sync_config)
else:
if sync_config.gap:
return self.incremental_sync_with_gap(sync_config, since_token)
else:
# TODO(mjark): Handle gapless sync
raise NotImplementedError()
return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks
def initial_sync(self, sync_config):
@ -133,12 +143,6 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token()
presence_stream = self.event_sources.sources["presence"]
@ -155,33 +159,36 @@ class SyncHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
# TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
rooms = []
joined = []
invited = []
for event in room_list:
room_sync = yield self.initial_sync_for_room(
event.room_id, sync_config, now_token, published_room_ids
)
rooms.append(room_sync)
if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room(
event.room_id, sync_config, now_token,
)
joined.append(room_sync)
elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
room_id=event.room_id,
invite=invite,
))
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
rooms=rooms,
presence=presence,
joined=joined,
invited=invited,
next_batch=now_token,
))
@defer.inlineCallbacks
def initial_sync_for_room(self, room_id, sync_config, now_token,
published_room_ids):
def initial_sync_for_joined_room(self, room_id, sync_config, now_token):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred RoomSyncResult.
A Deferred JoinedSyncResult.
"""
recents, prev_batch_token, limited = yield self.load_filtered_recents(
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token,
)
@ -190,13 +197,10 @@ class SyncHandler(BaseHandler):
)
current_state_events = current_state.values()
defer.returnValue(RoomSyncResult(
defer.returnValue(JoinedSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch_token,
timeline=batch,
state=current_state_events,
limited=limited,
ephemeral=[],
))
@ -207,19 +211,13 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token()
presence_source = self.event_sources.sources["presence"]
presence, presence_key = yield presence_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.presence_key,
limit=sync_config.limit,
limit=sync_config.filter.presence_limit(),
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
@ -227,7 +225,7 @@ class SyncHandler(BaseHandler):
typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.typing_key,
limit=sync_config.limit,
limit=sync_config.filter.ephemeral_limit(),
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
@ -242,33 +240,37 @@ class SyncHandler(BaseHandler):
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
room_ids = set(r.room_id for r in rooms)
joined_room_ids = set(r.room_id for r in rooms)
else:
room_ids = yield rm_handler.get_joined_rooms_for_user(
joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user
)
# TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
timeline_limit = sync_config.filter.timeline_limit()
room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(),
from_key=since_token.room_key,
to_key=now_token.room_key,
room_id=None,
limit=sync_config.limit + 1,
limit=timeline_limit + 1,
)
rooms = []
if len(room_events) <= sync_config.limit:
joined = []
if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
invite_events = []
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member
and event.membership == Membership.INVITE
and event.state_key == sync_config.user.to_string()):
invite_events.append(event)
for room_id in room_ids:
for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, [])
state = [event for event in recents if event.is_state()]
if recents:
@ -282,30 +284,40 @@ class SyncHandler(BaseHandler):
sync_config, room_id, state
)
room_sync = RoomSyncResult(
room_sync = JoinedSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch,
timeline=TimelineBatch(
events=recents,
prev_batch=prev_batch,
limited=False,
),
state=state,
limited=False,
ephemeral=typing_by_room.get(room_id, [])
)
if room_sync:
rooms.append(room_sync)
joined.append(room_sync)
else:
for room_id in room_ids:
invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string()
)
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
published_room_ids, typing_by_room
typing_by_room
)
if room_sync:
rooms.append(room_sync)
joined.append(room_sync)
invited = [
InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events
]
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
rooms=rooms,
presence=presence,
joined=joined,
invited=invited,
next_batch=now_token,
))
@ -361,12 +373,13 @@ class SyncHandler(BaseHandler):
limited = True
recents = []
filtering_factor = 2
load_limit = max(sync_config.limit * filtering_factor, 100)
timeline_limit = sync_config.filter.timeline_limit()
load_limit = max(timeline_limit * filtering_factor, 100)
max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
while limited and len(recents) < sync_config.limit and max_repeat:
while limited and len(recents) < timeline_limit and max_repeat:
events, keys = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
@ -375,7 +388,7 @@ class SyncHandler(BaseHandler):
)
(room_key, _) = keys
end_key = "s" + room_key.split('-')[-1]
loaded_recents = sync_config.filter.filter_room_events(events)
loaded_recents = sync_config.filter.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
sync_config.user.to_string(), room_id, loaded_recents,
)
@ -385,34 +398,37 @@ class SyncHandler(BaseHandler):
limited = False
max_repeat -= 1
if len(recents) > sync_config.limit:
recents = recents[-sync_config.limit:]
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(
"room_key", room_key
)
defer.returnValue((recents, prev_batch_token, limited))
defer.returnValue(TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=limited
))
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
published_room_ids, typing_by_room):
typing_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
Returns:
A Deferred RoomSyncResult
A Deferred JoinedSyncResult
"""
# TODO(mjark): Check for redactions we might have missed.
recents, prev_batch_token, limited = yield self.load_filtered_recents(
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token,
)
logging.debug("Recents %r", recents)
logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a
# token to indicate what point in the stream this is
@ -435,13 +451,10 @@ class SyncHandler(BaseHandler):
sync_config, room_id, state_events_delta
)
room_sync = RoomSyncResult(
room_sync = JoinedSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch_token,
timeline=batch,
state=state_events_delta,
limited=limited,
ephemeral=typing_by_room.get(room_id, [])
)

View File

@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
RestServlet, parse_string, parse_integer
)
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
@ -26,6 +26,7 @@ from synapse.events.utils import (
from synapse.api.filtering import Filter
from ._base import client_v2_pattern
import copy
import logging
logger = logging.getLogger(__name__)
@ -36,51 +37,44 @@ class SyncRestServlet(RestServlet):
GET parameters::
timeout(int): How long to wait for new events in milliseconds.
limit(int): Maxiumum number of events per room to return.
gap(bool): Create gaps the message history if limit is exceeded to
ensure that the client has the most recent messages. Defaults to
"true".
sort(str,str): tuple of sort key (e.g. "timeline") and direction
(e.g. "asc", "desc"). Defaults to "timeline,asc".
since(batch_token): Batch token when asking for incremental deltas.
set_presence(str): What state the device presence should be set to.
default is "online".
backfill(bool): Should the HS request message history from other
servers. This may take a long time making it unsuitable for clients
expecting a prompt response. Defaults to "true".
filter(filter_id): A filter to apply to the events returned.
filter_*: Filter override parameters.
Response JSON::
{
"next_batch": // batch token for the next /sync
"private_user_data": // private events for this user.
"public_user_data": // public events for all users including the
// public events for this user.
"rooms": [{ // List of rooms with updates.
"room_id": // Id of the room being updated
"limited": // Was the per-room event limit exceeded?
"published": // Is the room published by our HS?
"next_batch": // batch token for the next /sync
"presence": // presence data for the user.
"rooms": {
"joined": { // Joined rooms being updated.
"${room_id}": { // Id of the room being updated
"event_map": // Map of EventID -> event JSON.
"events": { // The recent events in the room if gap is "true"
// otherwise the next events in the room.
"batch": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
"timeline": { // The recent events in the room if gap is "true"
"limited": // Was the per-room event limit exceeded?
// otherwise the next events in the room.
"events": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
}
"state": [] // list of EventIDs updating the current state to
// be what it should be at the end of the batch.
"ephemeral": []
}]
"state": {"events": []} // list of EventIDs updating the
// current state to be what it should
// be at the end of the batch.
"ephemeral": {"events": []} // list of event objects
}
},
"invited": {}, // Invited rooms being updated.
"archived": {} // Archived rooms being updated.
}
}
"""
PATTERN = client_v2_pattern("/sync$")
ALLOWED_SORT = set(["timeline,asc", "timeline,desc"])
ALLOWED_PRESENCE = set(["online", "offline", "idle"])
ALLOWED_PRESENCE = set(["online", "offline"])
def __init__(self, hs):
super(SyncRestServlet, self).__init__()
self.auth = hs.get_auth()
self.event_stream_handler = hs.get_handlers().event_stream_handler
self.sync_handler = hs.get_handlers().sync_handler
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
@ -90,45 +84,29 @@ class SyncRestServlet(RestServlet):
user, token_id = yield self.auth.get_user_by_req(request)
timeout = parse_integer(request, "timeout", default=0)
limit = parse_integer(request, "limit", required=True)
gap = parse_boolean(request, "gap", default=True)
sort = parse_string(
request, "sort", default="timeline,asc",
allowed_values=self.ALLOWED_SORT
)
since = parse_string(request, "since")
set_presence = parse_string(
request, "set_presence", default="online",
allowed_values=self.ALLOWED_PRESENCE
)
backfill = parse_boolean(request, "backfill", default=False)
filter_id = parse_string(request, "filter", default=None)
logger.info(
"/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r,"
" set_presence=%r, backfill=%r, filter_id=%r" % (
user, timeout, limit, gap, sort, since, set_presence,
backfill, filter_id
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r" % (
user, timeout, since, set_presence, filter_id
)
)
# TODO(mjark): Load filter and apply overrides.
try:
filter = yield self.filtering.get_user_filter(
user.localpart, filter_id
)
except:
filter = Filter({})
# filter = filter.apply_overrides(http_request)
# if filter.matches(event):
# # stuff
sync_config = SyncConfig(
user=user,
gap=gap,
limit=limit,
sort=sort,
backfill=backfill,
filter=filter,
)
@ -137,43 +115,81 @@ class SyncRestServlet(RestServlet):
else:
since_token = None
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
)
if set_presence == "online":
yield self.event_stream_handler.started_stream(user)
try:
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
)
finally:
if set_presence == "online":
self.event_stream_handler.stopped_stream(user)
time_now = self.clock.time_msec()
joined = self.encode_joined(
sync_result.joined, filter, time_now, token_id
)
invited = self.encode_invited(
sync_result.invited, filter, time_now, token_id
)
response_content = {
"public_user_data": self.encode_user_data(
sync_result.public_user_data, filter, time_now
),
"private_user_data": self.encode_user_data(
sync_result.private_user_data, filter, time_now
),
"rooms": self.encode_rooms(
sync_result.rooms, filter, time_now, token_id
"presence": self.encode_presence(
sync_result.presence, filter, time_now
),
"rooms": {
"joined": joined,
"invited": invited,
"archived": {},
},
"next_batch": sync_result.next_batch.to_string(),
}
defer.returnValue((200, response_content))
def encode_user_data(self, events, filter, time_now):
return events
def encode_presence(self, events, filter, time_now):
formatted = []
for event in events:
event = copy.deepcopy(event)
event['sender'] = event['content'].pop('user_id')
formatted.append(event)
return {"events": filter.filter_presence(formatted)}
def encode_rooms(self, rooms, filter, time_now, token_id):
return [
self.encode_room(room, filter, time_now, token_id)
for room in rooms
]
def encode_joined(self, rooms, filter, time_now, token_id):
joined = {}
for room in rooms:
joined[room.room_id] = self.encode_room(
room, filter, time_now, token_id
)
return joined
def encode_invited(self, rooms, filter, time_now, token_id):
invited = {}
for room in rooms:
invite = serialize_event(
room.invite, time_now, token_id=token_id,
event_format=format_event_for_client_v2_without_event_id,
)
invited_state = invite.get("unsigned", {}).pop("invite_room_state", [])
invited_state.append(invite)
invited[room.room_id] = {
"invite_state": {"events": invited_state}
}
return invited
@staticmethod
def encode_room(room, filter, time_now, token_id):
event_map = {}
state_events = filter.filter_room_state(room.state)
recent_events = filter.filter_room_events(room.events)
timeline_events = filter.filter_room_timeline(room.timeline.events)
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
state_event_ids = []
recent_event_ids = []
timeline_event_ids = []
for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
@ -182,24 +198,22 @@ class SyncRestServlet(RestServlet):
)
state_event_ids.append(event.event_id)
for event in recent_events:
for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
event, time_now, token_id=token_id,
event_format=format_event_for_client_v2_without_event_id,
)
recent_event_ids.append(event.event_id)
timeline_event_ids.append(event.event_id)
result = {
"room_id": room.room_id,
"event_map": event_map,
"events": {
"batch": recent_event_ids,
"prev_batch": room.prev_batch.to_string(),
"timeline": {
"events": timeline_event_ids,
"prev_batch": room.timeline.prev_batch.to_string(),
"limited": room.timeline.limited,
},
"state": state_event_ids,
"limited": room.limited,
"published": room.published,
"ephemeral": room.ephemeral,
"state": {"events": state_event_ids},
"ephemeral": {"events": ephemeral_events},
}
return result

View File

@ -110,6 +110,20 @@ class RoomMemberStore(SQLBaseStore):
membership=membership,
).addCallback(self._get_events)
def get_invites_for_user(self, user_id):
""" Get all the invite events for a user
Args:
user_id (str): The user ID.
Returns:
A deferred list of event objects.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE]
).addCallback(lambda invites: self._get_events([
invites.event_id for invite in invites
]))
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.