Merge branch 'develop' of github.com:matrix-org/synapse into erikj/search

pull/307/head
Erik Johnston 2015-10-15 16:37:32 +01:00
commit bcfb653816
7 changed files with 295 additions and 172 deletions

View File

@ -38,6 +38,9 @@ for port in 8080 8081 8082; do
perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config
echo "report_stats: false" >> $DIR/etc/$port.config
python -m synapse.app.homeserver \
--config-path "$DIR/etc/$port.config" \
-D \

View File

@ -33,7 +33,6 @@ if __name__ == '__main__':
sys.stderr.writelines(message)
sys.exit(1)
from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
from synapse.storage import are_all_users_on_domain
from synapse.storage.prepare_database import UpgradeDatabaseException

View File

@ -22,6 +22,7 @@ import yaml
from string import Template
import os
import signal
from synapse.util.debug import debug_deferreds
DEFAULT_LOG_CONFIG = Template("""
@ -69,6 +70,8 @@ class LoggingConfig(Config):
self.verbosity = config.get("verbose", 0)
self.log_config = self.abspath(config.get("log_config"))
self.log_file = self.abspath(config.get("log_file"))
if config.get("full_twisted_stacktraces"):
debug_deferreds()
def default_config(self, config_dir_path, server_name, **kwargs):
log_file = self.abspath("homeserver.log")
@ -84,6 +87,11 @@ class LoggingConfig(Config):
# A yaml python logging config file
log_config: "%(log_config)s"
# Stop twisted from discarding the stack traces of exceptions in
# deferreds by waiting a reactor tick before running a deferred's
# callbacks.
# full_twisted_stacktraces: true
""" % locals()
def read_arguments(self, args):

View File

@ -28,21 +28,28 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"limit",
"gap",
"sort",
"backfill",
"filter",
])
class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"room_id",
"limited",
"published",
"events",
"state",
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
"limited",
])):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id",
"timeline",
"state",
"ephemeral",
])):
__slots__ = []
@ -51,14 +58,21 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events or self.state or self.ephemeral)
return bool(self.timeline or self.state or self.ephemeral)
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id",
"invite",
])):
__slots__ = []
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"private_user_data", # List of private events for the user.
"public_user_data", # List of public events for all users.
"rooms", # RoomSyncResult for each room.
"presence", # List of presence events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
])):
__slots__ = []
@ -68,7 +82,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
events.
"""
return bool(
self.private_user_data or self.public_user_data or self.rooms
self.presence or self.joined or self.invited
)
@ -108,8 +122,8 @@ class SyncHandler(BaseHandler):
)
result = yield self.notifier.wait_for_events(
sync_config.user, room_ids,
sync_config.filter, timeout, current_sync_callback
sync_config.user, room_ids, timeout, current_sync_callback,
from_token=since_token
)
defer.returnValue(result)
@ -121,11 +135,7 @@ class SyncHandler(BaseHandler):
if since_token is None:
return self.initial_sync(sync_config)
else:
if sync_config.gap:
return self.incremental_sync_with_gap(sync_config, since_token)
else:
# TODO(mjark): Handle gapless sync
raise NotImplementedError()
return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks
def initial_sync(self, sync_config):
@ -133,12 +143,6 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token()
presence_stream = self.event_sources.sources["presence"]
@ -155,33 +159,36 @@ class SyncHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
# TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
rooms = []
joined = []
invited = []
for event in room_list:
room_sync = yield self.initial_sync_for_room(
event.room_id, sync_config, now_token, published_room_ids
)
rooms.append(room_sync)
if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room(
event.room_id, sync_config, now_token,
)
joined.append(room_sync)
elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
room_id=event.room_id,
invite=invite,
))
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
rooms=rooms,
presence=presence,
joined=joined,
invited=invited,
next_batch=now_token,
))
@defer.inlineCallbacks
def initial_sync_for_room(self, room_id, sync_config, now_token,
published_room_ids):
def initial_sync_for_joined_room(self, room_id, sync_config, now_token):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred RoomSyncResult.
A Deferred JoinedSyncResult.
"""
recents, prev_batch_token, limited = yield self.load_filtered_recents(
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token,
)
@ -190,13 +197,10 @@ class SyncHandler(BaseHandler):
)
current_state_events = current_state.values()
defer.returnValue(RoomSyncResult(
defer.returnValue(JoinedSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch_token,
timeline=batch,
state=current_state_events,
limited=limited,
ephemeral=[],
))
@ -207,19 +211,13 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token()
presence_source = self.event_sources.sources["presence"]
presence, presence_key = yield presence_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.presence_key,
limit=sync_config.limit,
limit=sync_config.filter.presence_limit(),
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
@ -227,7 +225,7 @@ class SyncHandler(BaseHandler):
typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.typing_key,
limit=sync_config.limit,
limit=sync_config.filter.ephemeral_limit(),
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
@ -242,33 +240,37 @@ class SyncHandler(BaseHandler):
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
room_ids = set(r.room_id for r in rooms)
joined_room_ids = set(r.room_id for r in rooms)
else:
room_ids = yield rm_handler.get_joined_rooms_for_user(
joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user
)
# TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
timeline_limit = sync_config.filter.timeline_limit()
room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(),
from_key=since_token.room_key,
to_key=now_token.room_key,
room_id=None,
limit=sync_config.limit + 1,
limit=timeline_limit + 1,
)
rooms = []
if len(room_events) <= sync_config.limit:
joined = []
if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
invite_events = []
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member
and event.membership == Membership.INVITE
and event.state_key == sync_config.user.to_string()):
invite_events.append(event)
for room_id in room_ids:
for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, [])
state = [event for event in recents if event.is_state()]
if recents:
@ -282,30 +284,40 @@ class SyncHandler(BaseHandler):
sync_config, room_id, state
)
room_sync = RoomSyncResult(
room_sync = JoinedSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch,
timeline=TimelineBatch(
events=recents,
prev_batch=prev_batch,
limited=False,
),
state=state,
limited=False,
ephemeral=typing_by_room.get(room_id, [])
)
if room_sync:
rooms.append(room_sync)
joined.append(room_sync)
else:
for room_id in room_ids:
invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string()
)
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
published_room_ids, typing_by_room
typing_by_room
)
if room_sync:
rooms.append(room_sync)
joined.append(room_sync)
invited = [
InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events
]
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
rooms=rooms,
presence=presence,
joined=joined,
invited=invited,
next_batch=now_token,
))
@ -361,12 +373,13 @@ class SyncHandler(BaseHandler):
limited = True
recents = []
filtering_factor = 2
load_limit = max(sync_config.limit * filtering_factor, 100)
timeline_limit = sync_config.filter.timeline_limit()
load_limit = max(timeline_limit * filtering_factor, 100)
max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
while limited and len(recents) < sync_config.limit and max_repeat:
while limited and len(recents) < timeline_limit and max_repeat:
events, keys = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
@ -375,7 +388,7 @@ class SyncHandler(BaseHandler):
)
(room_key, _) = keys
end_key = "s" + room_key.split('-')[-1]
loaded_recents = sync_config.filter.filter_room_events(events)
loaded_recents = sync_config.filter.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
sync_config.user.to_string(), room_id, loaded_recents,
)
@ -385,34 +398,37 @@ class SyncHandler(BaseHandler):
limited = False
max_repeat -= 1
if len(recents) > sync_config.limit:
recents = recents[-sync_config.limit:]
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(
"room_key", room_key
)
defer.returnValue((recents, prev_batch_token, limited))
defer.returnValue(TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=limited
))
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
published_room_ids, typing_by_room):
typing_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
Returns:
A Deferred RoomSyncResult
A Deferred JoinedSyncResult
"""
# TODO(mjark): Check for redactions we might have missed.
recents, prev_batch_token, limited = yield self.load_filtered_recents(
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token,
)
logging.debug("Recents %r", recents)
logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a
# token to indicate what point in the stream this is
@ -435,13 +451,10 @@ class SyncHandler(BaseHandler):
sync_config, room_id, state_events_delta
)
room_sync = RoomSyncResult(
room_sync = JoinedSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch_token,
timeline=batch,
state=state_events_delta,
limited=limited,
ephemeral=typing_by_room.get(room_id, [])
)

View File

@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
RestServlet, parse_string, parse_integer
)
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
@ -26,6 +26,7 @@ from synapse.events.utils import (
from synapse.api.filtering import Filter
from ._base import client_v2_pattern
import copy
import logging
logger = logging.getLogger(__name__)
@ -36,51 +37,44 @@ class SyncRestServlet(RestServlet):
GET parameters::
timeout(int): How long to wait for new events in milliseconds.
limit(int): Maxiumum number of events per room to return.
gap(bool): Create gaps the message history if limit is exceeded to
ensure that the client has the most recent messages. Defaults to
"true".
sort(str,str): tuple of sort key (e.g. "timeline") and direction
(e.g. "asc", "desc"). Defaults to "timeline,asc".
since(batch_token): Batch token when asking for incremental deltas.
set_presence(str): What state the device presence should be set to.
default is "online".
backfill(bool): Should the HS request message history from other
servers. This may take a long time making it unsuitable for clients
expecting a prompt response. Defaults to "true".
filter(filter_id): A filter to apply to the events returned.
filter_*: Filter override parameters.
Response JSON::
{
"next_batch": // batch token for the next /sync
"private_user_data": // private events for this user.
"public_user_data": // public events for all users including the
// public events for this user.
"rooms": [{ // List of rooms with updates.
"room_id": // Id of the room being updated
"limited": // Was the per-room event limit exceeded?
"published": // Is the room published by our HS?
"next_batch": // batch token for the next /sync
"presence": // presence data for the user.
"rooms": {
"joined": { // Joined rooms being updated.
"${room_id}": { // Id of the room being updated
"event_map": // Map of EventID -> event JSON.
"events": { // The recent events in the room if gap is "true"
// otherwise the next events in the room.
"batch": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
"timeline": { // The recent events in the room if gap is "true"
"limited": // Was the per-room event limit exceeded?
// otherwise the next events in the room.
"events": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
}
"state": [] // list of EventIDs updating the current state to
// be what it should be at the end of the batch.
"ephemeral": []
}]
"state": {"events": []} // list of EventIDs updating the
// current state to be what it should
// be at the end of the batch.
"ephemeral": {"events": []} // list of event objects
}
},
"invited": {}, // Invited rooms being updated.
"archived": {} // Archived rooms being updated.
}
}
"""
PATTERN = client_v2_pattern("/sync$")
ALLOWED_SORT = set(["timeline,asc", "timeline,desc"])
ALLOWED_PRESENCE = set(["online", "offline", "idle"])
ALLOWED_PRESENCE = set(["online", "offline"])
def __init__(self, hs):
super(SyncRestServlet, self).__init__()
self.auth = hs.get_auth()
self.event_stream_handler = hs.get_handlers().event_stream_handler
self.sync_handler = hs.get_handlers().sync_handler
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
@ -90,45 +84,29 @@ class SyncRestServlet(RestServlet):
user, token_id = yield self.auth.get_user_by_req(request)
timeout = parse_integer(request, "timeout", default=0)
limit = parse_integer(request, "limit", required=True)
gap = parse_boolean(request, "gap", default=True)
sort = parse_string(
request, "sort", default="timeline,asc",
allowed_values=self.ALLOWED_SORT
)
since = parse_string(request, "since")
set_presence = parse_string(
request, "set_presence", default="online",
allowed_values=self.ALLOWED_PRESENCE
)
backfill = parse_boolean(request, "backfill", default=False)
filter_id = parse_string(request, "filter", default=None)
logger.info(
"/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r,"
" set_presence=%r, backfill=%r, filter_id=%r" % (
user, timeout, limit, gap, sort, since, set_presence,
backfill, filter_id
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r" % (
user, timeout, since, set_presence, filter_id
)
)
# TODO(mjark): Load filter and apply overrides.
try:
filter = yield self.filtering.get_user_filter(
user.localpart, filter_id
)
except:
filter = Filter({})
# filter = filter.apply_overrides(http_request)
# if filter.matches(event):
# # stuff
sync_config = SyncConfig(
user=user,
gap=gap,
limit=limit,
sort=sort,
backfill=backfill,
filter=filter,
)
@ -137,43 +115,81 @@ class SyncRestServlet(RestServlet):
else:
since_token = None
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
)
if set_presence == "online":
yield self.event_stream_handler.started_stream(user)
try:
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout
)
finally:
if set_presence == "online":
self.event_stream_handler.stopped_stream(user)
time_now = self.clock.time_msec()
joined = self.encode_joined(
sync_result.joined, filter, time_now, token_id
)
invited = self.encode_invited(
sync_result.invited, filter, time_now, token_id
)
response_content = {
"public_user_data": self.encode_user_data(
sync_result.public_user_data, filter, time_now
),
"private_user_data": self.encode_user_data(
sync_result.private_user_data, filter, time_now
),
"rooms": self.encode_rooms(
sync_result.rooms, filter, time_now, token_id
"presence": self.encode_presence(
sync_result.presence, filter, time_now
),
"rooms": {
"joined": joined,
"invited": invited,
"archived": {},
},
"next_batch": sync_result.next_batch.to_string(),
}
defer.returnValue((200, response_content))
def encode_user_data(self, events, filter, time_now):
return events
def encode_presence(self, events, filter, time_now):
formatted = []
for event in events:
event = copy.deepcopy(event)
event['sender'] = event['content'].pop('user_id')
formatted.append(event)
return {"events": filter.filter_presence(formatted)}
def encode_rooms(self, rooms, filter, time_now, token_id):
return [
self.encode_room(room, filter, time_now, token_id)
for room in rooms
]
def encode_joined(self, rooms, filter, time_now, token_id):
joined = {}
for room in rooms:
joined[room.room_id] = self.encode_room(
room, filter, time_now, token_id
)
return joined
def encode_invited(self, rooms, filter, time_now, token_id):
invited = {}
for room in rooms:
invite = serialize_event(
room.invite, time_now, token_id=token_id,
event_format=format_event_for_client_v2_without_event_id,
)
invited_state = invite.get("unsigned", {}).pop("invite_room_state", [])
invited_state.append(invite)
invited[room.room_id] = {
"invite_state": {"events": invited_state}
}
return invited
@staticmethod
def encode_room(room, filter, time_now, token_id):
event_map = {}
state_events = filter.filter_room_state(room.state)
recent_events = filter.filter_room_events(room.events)
timeline_events = filter.filter_room_timeline(room.timeline.events)
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
state_event_ids = []
recent_event_ids = []
timeline_event_ids = []
for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
@ -182,24 +198,22 @@ class SyncRestServlet(RestServlet):
)
state_event_ids.append(event.event_id)
for event in recent_events:
for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
event, time_now, token_id=token_id,
event_format=format_event_for_client_v2_without_event_id,
)
recent_event_ids.append(event.event_id)
timeline_event_ids.append(event.event_id)
result = {
"room_id": room.room_id,
"event_map": event_map,
"events": {
"batch": recent_event_ids,
"prev_batch": room.prev_batch.to_string(),
"timeline": {
"events": timeline_event_ids,
"prev_batch": room.timeline.prev_batch.to_string(),
"limited": room.timeline.limited,
},
"state": state_event_ids,
"limited": room.limited,
"published": room.published,
"ephemeral": room.ephemeral,
"state": {"events": state_event_ids},
"ephemeral": {"events": ephemeral_events},
}
return result

View File

@ -110,6 +110,20 @@ class RoomMemberStore(SQLBaseStore):
membership=membership,
).addCallback(self._get_events)
def get_invites_for_user(self, user_id):
""" Get all the invite events for a user
Args:
user_id (str): The user ID.
Returns:
A deferred list of event objects.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE]
).addCallback(lambda invites: self._get_events([
invites.event_id for invite in invites
]))
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.

72
synapse/util/debug.py Normal file
View File

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from functools import wraps
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
def debug_deferreds():
"""Cause all deferreds to wait for a reactor tick before running their
callbacks. This increases the chance of getting a stack trace out of
a defer.inlineCallback since the code waiting on the deferred will get
a chance to add an errback before the deferred runs."""
# Helper method for retrieving and restoring the current logging context
# around a callback.
def with_logging_context(fn):
context = LoggingContext.current_context()
def restore_context_callback(x):
with PreserveLoggingContext():
LoggingContext.thread_local.current_context = context
return fn(x)
return restore_context_callback
# We are going to modify the __init__ method of defer.Deferred so we
# need to get a copy of the old method so we can still call it.
old__init__ = defer.Deferred.__init__
# We need to create a deferred to bounce the callbacks through the reactor
# but we don't want to add a callback when we create that deferred so we
# we create a new type of deferred that uses the old __init__ method.
# This is safe as long as the old __init__ method doesn't invoke an
# __init__ using super.
class Bouncer(defer.Deferred):
__init__ = old__init__
# We'll add this as a callback to all Deferreds. Twisted will wait until
# the bouncer deferred resolves before calling the callbacks of the
# original deferred.
def bounce_callback(x):
bouncer = Bouncer()
reactor.callLater(0, with_logging_context(bouncer.callback), x)
return bouncer
# We'll add this as an errback to all Deferreds. Twisted will wait until
# the bouncer deferred resolves before calling the errbacks of the
# original deferred.
def bounce_errback(x):
bouncer = Bouncer()
reactor.callLater(0, with_logging_context(bouncer.errback), x)
return bouncer
@wraps(old__init__)
def new__init__(self, *args, **kargs):
old__init__(self, *args, **kargs)
self.addCallbacks(bounce_callback, bounce_errback)
defer.Deferred.__init__ = new__init__