Merge branch 'develop' into matrix-org-hotfixes

matrix-org/fix_event_sig_checks
Richard van der Hoff 2018-07-19 16:40:28 +01:00
commit 9e56e1ab30
47 changed files with 256 additions and 231 deletions

View File

@ -1,3 +1,41 @@
Synapse 0.33.0 (2018-07-19)
===========================
Bugfixes
--------
- Disable a noisy warning about logcontexts. (`#3561 <https://github.com/matrix-org/synapse/issues/3561>`_)
Synapse 0.33.0rc1 (2018-07-18)
==============================
Features
--------
- Enforce the specified API for report_event. (`#3316 <https://github.com/matrix-org/synapse/issues/3316>`_)
- Include CPU time from database threads in request/block metrics. (`#3496 <https://github.com/matrix-org/synapse/issues/3496>`_, `#3501 <https://github.com/matrix-org/synapse/issues/3501>`_)
- Add CPU metrics for _fetch_event_list. (`#3497 <https://github.com/matrix-org/synapse/issues/3497>`_)
- Optimisation to make handling incoming federation requests more efficient. (`#3541 <https://github.com/matrix-org/synapse/issues/3541>`_)
Bugfixes
--------
- Fix a significant performance regression in /sync. (`#3505 <https://github.com/matrix-org/synapse/issues/3505>`_, `#3521 <https://github.com/matrix-org/synapse/issues/3521>`_, `#3530 <https://github.com/matrix-org/synapse/issues/3530>`_, `#3544 <https://github.com/matrix-org/synapse/issues/3544>`_)
- Use more portable syntax in our use of the attrs package, widening the supported versions. (`#3498 <https://github.com/matrix-org/synapse/issues/3498>`_)
- Fix queued federation requests being processed in the wrong order. (`#3533 <https://github.com/matrix-org/synapse/issues/3533>`_)
- Ensure that erasure requests are correctly honoured for publicly accessible rooms when accessed over federation. (`#3546 <https://github.com/matrix-org/synapse/issues/3546>`_)
Misc
----
- Refactoring to improve testability. (`#3351 <https://github.com/matrix-org/synapse/issues/3351>`_, `#3499 <https://github.com/matrix-org/synapse/issues/3499>`_)
- Use ``isort`` to sort imports. (`#3463 <https://github.com/matrix-org/synapse/issues/3463>`_, `#3464 <https://github.com/matrix-org/synapse/issues/3464>`_, `#3540 <https://github.com/matrix-org/synapse/issues/3540>`_)
- Use parse and asserts from http.servlet. (`#3534 <https://github.com/matrix-org/synapse/issues/3534>`_, `#3535 <https://github.com/matrix-org/synapse/issues/3535>`_).
Synapse 0.32.2 (2018-07-07) Synapse 0.32.2 (2018-07-07)
=========================== ===========================

View File

@ -71,7 +71,7 @@ We'd like to invite you to join #matrix:matrix.org (via
https://matrix.org/docs/projects/try-matrix-now.html), run a homeserver, take a look https://matrix.org/docs/projects/try-matrix-now.html), run a homeserver, take a look
at the `Matrix spec <https://matrix.org/docs/spec>`_, and experiment with the at the `Matrix spec <https://matrix.org/docs/spec>`_, and experiment with the
`APIs <https://matrix.org/docs/api>`_ and `Client SDKs `APIs <https://matrix.org/docs/api>`_ and `Client SDKs
<http://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_. <https://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
Thanks for using Matrix! Thanks for using Matrix!
@ -283,7 +283,7 @@ Connecting to Synapse from a client
The easiest way to try out your new Synapse installation is by connecting to it The easiest way to try out your new Synapse installation is by connecting to it
from a web client. The easiest option is probably the one at from a web client. The easiest option is probably the one at
http://riot.im/app. You will need to specify a "Custom server" when you log on https://riot.im/app. You will need to specify a "Custom server" when you log on
or register: set this to ``https://domain.tld`` if you setup a reverse proxy or register: set this to ``https://domain.tld`` if you setup a reverse proxy
following the recommended setup, or ``https://localhost:8448`` - remember to specify the following the recommended setup, or ``https://localhost:8448`` - remember to specify the
port (``:8448``) if not ``:443`` unless you changed the configuration. (Leave the identity port (``:8448``) if not ``:443`` unless you changed the configuration. (Leave the identity
@ -329,7 +329,7 @@ Security Note
============= =============
Matrix serves raw user generated data in some APIs - specifically the `content Matrix serves raw user generated data in some APIs - specifically the `content
repository endpoints <http://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_. repository endpoints <https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
Whilst we have tried to mitigate against possible XSS attacks (e.g. Whilst we have tried to mitigate against possible XSS attacks (e.g.
https://github.com/matrix-org/synapse/pull/1021) we recommend running https://github.com/matrix-org/synapse/pull/1021) we recommend running
@ -348,7 +348,7 @@ Platform-Specific Instructions
Debian Debian
------ ------
Matrix provides official Debian packages via apt from http://matrix.org/packages/debian/. Matrix provides official Debian packages via apt from https://matrix.org/packages/debian/.
Note that these packages do not include a client - choose one from Note that these packages do not include a client - choose one from
https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one of our SDKs :) https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one of our SDKs :)
@ -524,7 +524,7 @@ Troubleshooting Running
----------------------- -----------------------
If synapse fails with ``missing "sodium.h"`` crypto errors, you may need If synapse fails with ``missing "sodium.h"`` crypto errors, you may need
to manually upgrade PyNaCL, as synapse uses NaCl (http://nacl.cr.yp.to/) for to manually upgrade PyNaCL, as synapse uses NaCl (https://nacl.cr.yp.to/) for
encryption and digital signatures. encryption and digital signatures.
Unfortunately PyNACL currently has a few issues Unfortunately PyNACL currently has a few issues
(https://github.com/pyca/pynacl/issues/53) and (https://github.com/pyca/pynacl/issues/53) and
@ -672,8 +672,8 @@ useful just for development purposes. See `<demo/README>`_.
Using PostgreSQL Using PostgreSQL
================ ================
As of Synapse 0.9, `PostgreSQL <http://www.postgresql.org>`_ is supported as an As of Synapse 0.9, `PostgreSQL <https://www.postgresql.org>`_ is supported as an
alternative to the `SQLite <http://sqlite.org/>`_ database that Synapse has alternative to the `SQLite <https://sqlite.org/>`_ database that Synapse has
traditionally used for convenience and simplicity. traditionally used for convenience and simplicity.
The advantages of Postgres include: The advantages of Postgres include:
@ -697,7 +697,7 @@ Using a reverse proxy with Synapse
It is recommended to put a reverse proxy such as It is recommended to put a reverse proxy such as
`nginx <https://nginx.org/en/docs/http/ngx_http_proxy_module.html>`_, `nginx <https://nginx.org/en/docs/http/ngx_http_proxy_module.html>`_,
`Apache <https://httpd.apache.org/docs/current/mod/mod_proxy_http.html>`_ or `Apache <https://httpd.apache.org/docs/current/mod/mod_proxy_http.html>`_ or
`HAProxy <http://www.haproxy.org/>`_ in front of Synapse. One advantage of `HAProxy <https://www.haproxy.org/>`_ in front of Synapse. One advantage of
doing so is that it means that you can expose the default https port (443) to doing so is that it means that you can expose the default https port (443) to
Matrix clients without needing to run Synapse with root privileges. Matrix clients without needing to run Synapse with root privileges.

View File

@ -1 +0,0 @@
Enforce the specified API for report_event

View File

View File

View File

@ -1 +0,0 @@
Include CPU time from database threads in request/block metrics.

View File

@ -1 +0,0 @@
Add CPU metrics for _fetch_event_list

View File

View File

View File

View File

@ -1 +0,0 @@
Reduce database consumption when processing large numbers of receipts

View File

@ -1 +0,0 @@
Cache optimisation for /sync requests

View File

View File

@ -1 +0,0 @@
Fix queued federation requests being processed in the wrong order

View File

@ -1 +0,0 @@
refactor: use parse_{string,integer} and assert's from http.servlet for deduplication

View File

View File

@ -1 +0,0 @@
check isort for each PR

View File

@ -1 +0,0 @@
Optimisation to make handling incoming federation requests more efficient.

View File

View File

@ -1 +0,0 @@
Ensure that erasure requests are correctly honoured for publicly accessible rooms when accessed over federation.

1
changelog.d/3548.bugfix Normal file
View File

@ -0,0 +1 @@
Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis.

1
changelog.d/3556.feature Normal file
View File

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

View File

@ -36,3 +36,4 @@ known_compat = mock,six
known_twisted=twisted,OpenSSL known_twisted=twisted,OpenSSL
multi_line_output=3 multi_line_output=3
include_trailing_comma=true include_trailing_comma=true
combine_as_imports=true

View File

@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.32.2" __version__ = "0.33.0"

View File

@ -168,7 +168,7 @@ class TransactionQueue(object):
# fire off a processing loop in the background # fire off a processing loop in the background
run_as_background_process( run_as_background_process(
"process_transaction_queue", "process_event_queue_for_federation",
self._process_event_queue_loop, self._process_event_queue_loop,
) )
@ -435,14 +435,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Starting transaction loop", destination) logger.debug("TX [%s] Starting transaction loop", destination)
# Drop the logcontext before starting the transaction. It doesn't run_as_background_process(
# really make sense to log all the outbound transactions against "federation_transaction_transmission_loop",
# whatever path led us to this point: that's pretty arbitrary really. self._transaction_transmission_loop,
# destination,
# (this also means we can fire off _perform_transaction without )
# yielding)
with logcontext.PreserveLoggingContext():
self._transaction_transmission_loop(destination)
@defer.inlineCallbacks @defer.inlineCallbacks
def _transaction_transmission_loop(self, destination): def _transaction_transmission_loop(self, destination):

View File

@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, reactor, ssl, task from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.client import ( from twisted.web.client import (
Agent,
BrowserLikeRedirectAgent,
ContentDecoderAgent,
FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder, GzipDecoder,
HTTPConnectionPool, HTTPConnectionPool,
PartialDownloadError, PartialDownloadError,

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -13,13 +14,24 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from six import PY3
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.rest.client import versions from synapse.rest.client import versions
from synapse.rest.client.v1 import admin, directory, events, initial_sync from synapse.rest.client.v1 import (
from synapse.rest.client.v1 import login as v1_login admin,
from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher directory,
from synapse.rest.client.v1 import register as v1_register events,
from synapse.rest.client.v1 import room, voip initial_sync,
login as v1_login,
logout,
presence,
profile,
push_rule,
pusher,
room,
voip,
)
from synapse.rest.client.v2_alpha import ( from synapse.rest.client.v2_alpha import (
account, account,
account_data, account_data,
@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
user_directory, user_directory,
) )
if not PY3:
from synapse.rest.client.v1_only import (
register as v1_register,
)
class ClientRestResource(JsonResource): class ClientRestResource(JsonResource):
"""A resource for version 1 of the matrix client API.""" """A resource for version 1 of the matrix client API."""
@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
def register_servlets(client_resource, hs): def register_servlets(client_resource, hs):
versions.register_servlets(client_resource) versions.register_servlets(client_resource)
# "v1" if not PY3:
room.register_servlets(hs, client_resource) # "v1" (Python 2 only)
events.register_servlets(hs, client_resource)
v1_register.register_servlets(hs, client_resource) v1_register.register_servlets(hs, client_resource)
# Deprecated in r0
initial_sync.register_servlets(hs, client_resource)
room.register_deprecated_servlets(hs, client_resource)
# Partially deprecated in r0
events.register_servlets(hs, client_resource)
# "v1" + "r0"
room.register_servlets(hs, client_resource)
v1_login.register_servlets(hs, client_resource) v1_login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource) profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource) presence.register_servlets(hs, client_resource)
initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource) directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource) voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource) admin.register_servlets(hs, client_resource)

View File

@ -832,10 +832,13 @@ def register_servlets(hs, http_server):
RoomSendEventRestServlet(hs).register(http_server) RoomSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server)
RoomInitialSyncRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server) JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server) RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server) RoomEventContextServlet(hs).register(http_server)
def register_deprecated_servlets(hs, http_server):
RoomInitialSyncRestServlet(hs).register(http_server)

View File

@ -0,0 +1,3 @@
"""
REST APIs that are only used in v1 (the legacy API).
"""

View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains base REST classes for constructing client v1 servlets.
"""
import re
from synapse.api.urls import CLIENT_PREFIX
def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
"""Creates a regex compiled client path with the correct client path
prefix.
Args:
path_regex (str): The regex string to match. This should NOT have a ^
as this will be prefixed.
Returns:
list of SRE_Pattern
"""
patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
if include_in_unstable:
unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
patterns.append(re.compile("^" + unstable_prefix + path_regex))
return patterns

View File

@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester from synapse.types import create_requester
from .base import ClientV1RestServlet, client_path_patterns from .base import v1_only_client_path_patterns
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
handler doesn't have a concept of multi-stages or sessions. handler doesn't have a concept of multi-stages or sessions.
""" """
PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False) PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
def __init__(self, hs): def __init__(self, hs):
""" """
@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
"""Handles user creation via a server-to-server interface """Handles user creation via a server-to-server interface
""" """
PATTERNS = client_path_patterns("/createUser$", releases=()) PATTERNS = v1_only_client_path_patterns("/createUser$")
def __init__(self, hs): def __init__(self, hs):
super(CreateUserRestServlet, self).__init__(hs) super(CreateUserRestServlet, self).__init__(hs)

View File

@ -344,7 +344,7 @@ class SQLBaseStore(object):
parent_context = LoggingContext.current_context() parent_context = LoggingContext.current_context()
if parent_context == LoggingContext.sentinel: if parent_context == LoggingContext.sentinel:
logger.warn( logger.warn(
"Running db txn from sentinel context: metrics will be lost", "Starting db connection from sentinel context: metrics will be lost",
) )
parent_context = None parent_context = None

View File

@ -19,6 +19,8 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from . import engines from . import engines
from ._base import SQLBaseStore from ._base import SQLBaseStore
@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {} self._background_update_handlers = {}
self._all_done = False self._all_done = False
@defer.inlineCallbacks
def start_doing_background_updates(self): def start_doing_background_updates(self):
logger.info("Starting background schema updates") run_as_background_process(
"background_updates", self._run_background_updates,
)
@defer.inlineCallbacks
def _run_background_updates(self):
logger.info("Starting background schema updates")
while True: while True:
yield self.hs.get_clock().sleep( yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)

View File

@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates from . import background_updates
@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now) self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self): def _update_client_ips_batch(self):
def update():
to_update = self._batch_row_update to_update = self._batch_row_update
self._batch_row_update = {} self._batch_row_update = {}
return self.runInteraction( return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update "_update_client_ips_batch", self._update_client_ips_batch_txn,
to_update,
)
run_as_background_process(
"update_client_ips", update,
) )
def _update_client_ips_batch_txn(self, txn, to_update): def _update_client_ips_batch_txn(self, txn, to_update):

View File

@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError
# these are only included to make the type annotations work # these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -155,11 +156,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id) self._currently_persisting_rooms.discard(room_id)
# set handle_queue_loop off on the background. We don't want to # set handle_queue_loop off in the background
# attribute work done in it to the current request, so we drop the run_as_background_process("persist_events", handle_queue_loop)
# logcontext altogether.
with PreserveLoggingContext():
handle_queue_loop()
def _get_drainining_queue(self, room_id): def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque()) queue = self._event_persist_queues.setdefault(room_id, deque())

View File

@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import ( from synapse.util.logcontext import (
LoggingContext, LoggingContext,
PreserveLoggingContext, PreserveLoggingContext,
@ -322,9 +323,10 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False should_start = False
if should_start: if should_start:
with PreserveLoggingContext(): run_as_background_process(
self.runWithConnection( "fetch_events",
self._do_fetch self.runWithConnection,
self._do_fetch,
) )
logger.debug("Loading %d events", len(events)) logger.debug("Loading %d events", len(events))

View File

@ -16,6 +16,7 @@
import logging import logging
from collections import OrderedDict from collections import OrderedDict
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache from synapse.util.caches import register_cache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -63,7 +64,10 @@ class ExpiringCache(object):
return return
def f(): def f():
self._prune_cache() run_as_background_process(
"prune_cache_%s" % self._cache_name,
self._prune_cache,
)
self._clock.looping_call(f, self._expiry_ms / 2) self._clock.looping_call(f, self._expiry_ms / 2)

View File

@ -17,19 +17,17 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.util import unwrapFirstError from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id): def user_left_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_left_room", user=user, room_id=room_id) distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id): def user_joined_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_joined_room", user=user, room_id=room_id) distributor.fire("user_joined_room", user=user, room_id=room_id)
@ -44,9 +42,7 @@ class Distributor(object):
model will do for today. model will do for today.
""" """
def __init__(self, suppress_failures=True): def __init__(self):
self.suppress_failures = suppress_failures
self.signals = {} self.signals = {}
self.pre_registration = {} self.pre_registration = {}
@ -56,7 +52,6 @@ class Distributor(object):
self.signals[name] = Signal( self.signals[name] = Signal(
name, name,
suppress_failures=self.suppress_failures,
) )
if name in self.pre_registration: if name in self.pre_registration:
@ -75,10 +70,18 @@ class Distributor(object):
self.pre_registration[name].append(observer) self.pre_registration[name].append(observer)
def fire(self, name, *args, **kwargs): def fire(self, name, *args, **kwargs):
"""Dispatches the given signal to the registered observers.
Runs the observers as a background process. Does not return a deferred.
"""
if name not in self.signals: if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name)) raise KeyError("%r does not have a signal named %s" % (self, name))
return self.signals[name].fire(*args, **kwargs) run_as_background_process(
name,
self.signals[name].fire,
*args, **kwargs
)
class Signal(object): class Signal(object):
@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers. method into all of the observers.
""" """
def __init__(self, name, suppress_failures): def __init__(self, name):
self.name = name self.name = name
self.suppress_failures = suppress_failures
self.observers = [] self.observers = []
def observe(self, observer): def observe(self, observer):
@ -103,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred.""" Each observer callable may return a Deferred."""
self.observers.append(observer) self.observers.append(observer)
@defer.inlineCallbacks
def fire(self, *args, **kwargs): def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and """Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is kwargs. Exceptions thrown by observers are logged but ignored. It is
@ -121,22 +122,17 @@ class Signal(object):
failure.type, failure.type,
failure.value, failure.value,
failure.getTracebackObject())) failure.getTracebackObject()))
if not self.suppress_failures:
return failure
return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
with PreserveLoggingContext():
deferreds = [ deferreds = [
do(observer) run_in_background(do, o)
for observer in self.observers for o in self.observers
] ]
res = yield defer.gatherResults( return make_deferred_yieldable(defer.gatherResults(
deferreds, consumeErrors=True deferreds, consumeErrors=True,
).addErrback(unwrapFirstError) ))
defer.returnValue(res)
def __repr__(self): def __repr__(self):
return "<Signal name=%r>" % (self.name,) return "<Signal name=%r>" % (self.name,)

View File

@ -99,6 +99,17 @@ class ContextResourceUsage(object):
self.db_sched_duration_sec = 0 self.db_sched_duration_sec = 0
self.evt_db_fetch_count = 0 self.evt_db_fetch_count = 0
def __repr__(self):
return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
"db_txn_count='%r', db_txn_duration_sec='%r', "
"db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
self.ru_stime,
self.ru_utime,
self.db_txn_count,
self.db_txn_duration_sec,
self.db_sched_duration_sec,
self.evt_db_fetch_count,)
def __iadd__(self, other): def __iadd__(self, other):
"""Add another ContextResourceUsage's stats to this one's. """Add another ContextResourceUsage's stats to this one's.

View File

@ -104,12 +104,19 @@ class Measure(object):
logger.warn("Expected context. (%r)", self.name) logger.warn("Expected context. (%r)", self.name)
return return
usage = context.get_resource_usage() - self.start_usage current = context.get_resource_usage()
usage = current - self.start_usage
try:
block_ru_utime.labels(self.name).inc(usage.ru_utime) block_ru_utime.labels(self.name).inc(usage.ru_utime)
block_ru_stime.labels(self.name).inc(usage.ru_stime) block_ru_stime.labels(self.name).inc(usage.ru_stime)
block_db_txn_count.labels(self.name).inc(usage.db_txn_count) block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
except ValueError:
logger.warn(
"Failed to save metrics! OLD: %r, NEW: %r",
self.start_usage, current
)
if self.created_context: if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb) self.start_context.__exit__(exc_type, exc_val, exc_tb)

View File

@ -14,100 +14,30 @@
# limitations under the License. # limitations under the License.
""" Tests REST events for /events paths.""" """ Tests REST events for /events paths."""
from mock import Mock, NonCallableMock from mock import Mock, NonCallableMock
from six import PY3
# twisted imports
from twisted.internet import defer from twisted.internet import defer
import synapse.rest.client.v1.events
import synapse.rest.client.v1.register
import synapse.rest.client.v1.room
from tests import unittest
from ....utils import MockHttpResource, setup_test_homeserver from ....utils import MockHttpResource, setup_test_homeserver
from .utils import RestTestCase from .utils import RestTestCase
PATH_PREFIX = "/_matrix/client/api/v1" PATH_PREFIX = "/_matrix/client/api/v1"
class EventStreamPaginationApiTestCase(unittest.TestCase):
""" Tests event streaming query parameters and start/end keys used in the
Pagination stream API. """
user_id = "sid1"
def setUp(self):
# configure stream and inject items
pass
def tearDown(self):
pass
def TODO_test_long_poll(self):
# stream from 'end' key, send (self+other) message, expect message.
# stream from 'END', send (self+other) message, expect message.
# stream from 'end' key, send (self+other) topic, expect topic.
# stream from 'END', send (self+other) topic, expect topic.
# stream from 'end' key, send (self+other) invite, expect invite.
# stream from 'END', send (self+other) invite, expect invite.
pass
def TODO_test_stream_forward(self):
# stream from START, expect injected items
# stream from 'start' key, expect same content
# stream from 'end' key, expect nothing
# stream from 'END', expect nothing
# The following is needed for cases where content is removed e.g. you
# left a room, so the token you're streaming from is > the one that
# would be returned naturally from START>END.
# stream from very new token (higher than end key), expect same token
# returned as end key
pass
def TODO_test_limits(self):
# stream from a key, expect limit_num items
# stream from START, expect limit_num items
pass
def TODO_test_range(self):
# stream from key to key, expect X items
# stream from key to END, expect X items
# stream from START to key, expect X items
# stream from START to END, expect all items
pass
def TODO_test_direction(self):
# stream from END to START and fwds, expect newest first
# stream from END to START and bwds, expect oldest first
# stream from START to END and fwds, expect oldest first
# stream from START to END and bwds, expect newest first
pass
class EventStreamPermissionsTestCase(RestTestCase): class EventStreamPermissionsTestCase(RestTestCase):
""" Tests event streaming (GET /events). """ """ Tests event streaming (GET /events). """
if PY3:
skip = "Skip on Py3 until ported to use not V1 only register."
@defer.inlineCallbacks @defer.inlineCallbacks
def setUp(self): def setUp(self):
import synapse.rest.client.v1.events
import synapse.rest.client.v1_only.register
import synapse.rest.client.v1.room
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX) self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
hs = yield setup_test_homeserver( hs = yield setup_test_homeserver(
@ -125,7 +55,7 @@ class EventStreamPermissionsTestCase(RestTestCase):
hs.get_handlers().federation_handler = Mock() hs.get_handlers().federation_handler = Mock()
synapse.rest.client.v1.register.register_servlets(hs, self.mock_resource) synapse.rest.client.v1_only.register.register_servlets(hs, self.mock_resource)
synapse.rest.client.v1.events.register_servlets(hs, self.mock_resource) synapse.rest.client.v1.events.register_servlets(hs, self.mock_resource)
synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource) synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)

View File

@ -16,11 +16,12 @@
import json import json
from mock import Mock from mock import Mock
from six import PY3
from twisted.test.proto_helpers import MemoryReactorClock from twisted.test.proto_helpers import MemoryReactorClock
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.rest.client.v1.register import register_servlets from synapse.rest.client.v1_only.register import register_servlets
from synapse.util import Clock from synapse.util import Clock
from tests import unittest from tests import unittest
@ -31,6 +32,8 @@ class CreateUserServletTestCase(unittest.TestCase):
""" """
Tests for CreateUserRestServlet. Tests for CreateUserRestServlet.
""" """
if PY3:
skip = "Not ported to Python 3."
def setUp(self): def setUp(self):
self.registration_handler = Mock() self.registration_handler = Mock()

View File

@ -20,7 +20,6 @@ import json
from mock import Mock, NonCallableMock from mock import Mock, NonCallableMock
from six.moves.urllib import parse as urlparse from six.moves.urllib import parse as urlparse
# twisted imports
from twisted.internet import defer from twisted.internet import defer
import synapse.rest.client.v1.room import synapse.rest.client.v1.room
@ -86,6 +85,7 @@ class RoomBase(unittest.TestCase):
self.resource = JsonResource(self.hs) self.resource = JsonResource(self.hs)
synapse.rest.client.v1.room.register_servlets(self.hs, self.resource) synapse.rest.client.v1.room.register_servlets(self.hs, self.resource)
synapse.rest.client.v1.room.register_deprecated_servlets(self.hs, self.resource)
self.helper = RestHelper(self.hs, self.resource, self.user_id) self.helper = RestHelper(self.hs, self.resource, self.user_id)

View File

@ -21,8 +21,12 @@ from synapse.types import UserID
from synapse.util import Clock from synapse.util import Clock
from tests import unittest from tests import unittest
from tests.server import ThreadedMemoryReactorClock as MemoryReactorClock from tests.server import (
from tests.server import make_request, setup_test_homeserver, wait_until_result ThreadedMemoryReactorClock as MemoryReactorClock,
make_request,
setup_test_homeserver,
wait_until_result,
)
PATH_PREFIX = "/_matrix/client/v2_alpha" PATH_PREFIX = "/_matrix/client/v2_alpha"

View File

@ -20,8 +20,12 @@ from synapse.types import UserID
from synapse.util import Clock from synapse.util import Clock
from tests import unittest from tests import unittest
from tests.server import ThreadedMemoryReactorClock as MemoryReactorClock from tests.server import (
from tests.server import make_request, setup_test_homeserver, wait_until_result ThreadedMemoryReactorClock as MemoryReactorClock,
make_request,
setup_test_homeserver,
wait_until_result,
)
PATH_PREFIX = "/_matrix/client/v2_alpha" PATH_PREFIX = "/_matrix/client/v2_alpha"

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -15,8 +16,6 @@
from mock import Mock, patch from mock import Mock, patch
from twisted.internet import defer
from synapse.util.distributor import Distributor from synapse.util.distributor import Distributor
from . import unittest from . import unittest
@ -27,38 +26,15 @@ class DistributorTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.dist = Distributor() self.dist = Distributor()
@defer.inlineCallbacks
def test_signal_dispatch(self): def test_signal_dispatch(self):
self.dist.declare("alert") self.dist.declare("alert")
observer = Mock() observer = Mock()
self.dist.observe("alert", observer) self.dist.observe("alert", observer)
d = self.dist.fire("alert", 1, 2, 3) self.dist.fire("alert", 1, 2, 3)
yield d
self.assertTrue(d.called)
observer.assert_called_with(1, 2, 3) observer.assert_called_with(1, 2, 3)
@defer.inlineCallbacks
def test_signal_dispatch_deferred(self):
self.dist.declare("whine")
d_inner = defer.Deferred()
def observer():
return d_inner
self.dist.observe("whine", observer)
d_outer = self.dist.fire("whine")
self.assertFalse(d_outer.called)
d_inner.callback(None)
yield d_outer
self.assertTrue(d_outer.called)
@defer.inlineCallbacks
def test_signal_catch(self): def test_signal_catch(self):
self.dist.declare("alarm") self.dist.declare("alarm")
@ -71,9 +47,7 @@ class DistributorTestCase(unittest.TestCase):
with patch( with patch(
"synapse.util.distributor.logger", spec=["warning"] "synapse.util.distributor.logger", spec=["warning"]
) as mock_logger: ) as mock_logger:
d = self.dist.fire("alarm", "Go") self.dist.fire("alarm", "Go")
yield d
self.assertTrue(d.called)
observers[0].assert_called_once_with("Go") observers[0].assert_called_once_with("Go")
observers[1].assert_called_once_with("Go") observers[1].assert_called_once_with("Go")
@ -83,34 +57,12 @@ class DistributorTestCase(unittest.TestCase):
mock_logger.warning.call_args[0][0], str mock_logger.warning.call_args[0][0], str
) )
@defer.inlineCallbacks
def test_signal_catch_no_suppress(self):
# Gut-wrenching
self.dist.suppress_failures = False
self.dist.declare("whail")
class MyException(Exception):
pass
@defer.inlineCallbacks
def observer():
raise MyException("Oopsie")
self.dist.observe("whail", observer)
d = self.dist.fire("whail")
yield self.assertFailure(d, MyException)
self.dist.suppress_failures = True
@defer.inlineCallbacks
def test_signal_prereg(self): def test_signal_prereg(self):
observer = Mock() observer = Mock()
self.dist.observe("flare", observer) self.dist.observe("flare", observer)
self.dist.declare("flare") self.dist.declare("flare")
yield self.dist.fire("flare", 4, 5) self.dist.fire("flare", 4, 5)
observer.assert_called_with(4, 5) observer.assert_called_with(4, 5)