Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

pull/3979/head
Erik Johnston 2018-09-20 11:00:14 +01:00
commit b2fda9d20e
51 changed files with 793 additions and 460 deletions

2
.gitignore vendored
View File

@ -1,9 +1,11 @@
*.pyc
.*.swp
*~
*.lock
.DS_Store
_trial_temp/
_trial_temp*/
logs/
dbs/
*.egg

View File

@ -30,12 +30,28 @@ use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
We use `Jenkins <http://matrix.org/jenkins>`_ and
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
pull request for feedback.
We use `CircleCI <https://circleci.com/gh/matrix-org>`_ and `Travis CI
<https://travis-ci.org/matrix-org/synapse>`_ for continuous integration. All
pull requests to synapse get automatically tested by Travis and CircleCI.
If your change breaks the build, this will be shown in GitHub, so please
keep an eye on the pull request for feedback.
To run unit tests in a local development environment, you can use:
- ``tox -e py27`` (requires tox to be installed by ``pip install tox``) for
SQLite-backed Synapse on Python 2.7.
- ``tox -e py35`` for SQLite-backed Synapse on Python 3.5.
- ``tox -e py36`` for SQLite-backed Synapse on Python 3.6.
- ``tox -e py27-postgres`` for PostgreSQL-backed Synapse on Python 2.7
(requires a running local PostgreSQL with access to create databases).
- ``./test_postgresql.sh`` for PostgreSQL-backed Synapse on Python 2.7
(requires Docker). Entirely self-contained, recommended if you don't want to
set up PostgreSQL yourself.
Docker images are available for running the integration tests (SyTest) locally,
see the `documentation in the SyTest repo
<https://github.com/matrix-org/sytest/blob/develop/docker/README.md>`_ for more
information.
Code style
~~~~~~~~~~
@ -77,7 +93,8 @@ AUTHORS.rst file for the project in question. Please feel free to include a
change to AUTHORS.rst in your pull request to list yourself and a short
description of the area(s) you've worked on. Also, we sometimes have swag to
give away to contributors - if you feel that Matrix-branded apparel is missing
from your life, please mail us your shipping address to matrix at matrix.org and we'll try to fix it :)
from your life, please mail us your shipping address to matrix at matrix.org and
we'll try to fix it :)
Sign off
~~~~~~~~
@ -144,4 +161,9 @@ flag to ``git commit``, which uses the name and email set in your
Conclusion
~~~~~~~~~~
That's it! Matrix is a very open and collaborative project as you might expect given our obsession with open communication. If we're going to successfully matrix together all the fragmented communication technologies out there we are reliant on contributions and collaboration from the community to do so. So please get involved - and we hope you have as much fun hacking on Matrix as we do!
That's it! Matrix is a very open and collaborative project as you might expect
given our obsession with open communication. If we're going to successfully
matrix together all the fragmented communication technologies out there we are
reliant on contributions and collaboration from the community to do so. So
please get involved - and we hope you have as much fun hacking on Matrix as we
do!

View File

@ -28,6 +28,7 @@ exclude jenkins*.sh
exclude jenkins*
exclude Dockerfile
exclude .dockerignore
exclude test_postgresql.sh
recursive-exclude jenkins *.sh
include pyproject.toml

View File

@ -157,7 +157,7 @@ if you prefer.
In case of problems, please see the _`Troubleshooting` section below.
There is an offical synapse image available at
There is an offical synapse image available at
https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with
the docker-compose file available at `contrib/docker <contrib/docker>`_. Further information on
this including configuration options is available in the README on
@ -459,37 +459,13 @@ https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/matrix-
Windows Install
---------------
Synapse can be installed on Cygwin. It requires the following Cygwin packages:
- gcc
- git
- libffi-devel
- openssl (and openssl-devel, python-openssl)
- python
- python-setuptools
The content repository requires additional packages and will be unable to process
uploads without them:
- libjpeg8
- libjpeg8-devel
- zlib
If you choose to install Synapse without these packages, you will need to reinstall
``pillow`` for changes to be applied, e.g. ``pip uninstall pillow`` ``pip install
pillow --user``
Troubleshooting:
- You may need to upgrade ``setuptools`` to get this to work correctly:
``pip install setuptools --upgrade``.
- You may encounter errors indicating that ``ffi.h`` is missing, even with
``libffi-devel`` installed. If you do, copy the ``.h`` files:
``cp /usr/lib/libffi-3.0.13/include/*.h /usr/include``
- You may need to install libsodium from source in order to install PyNacl. If
you do, you may need to create a symlink to ``libsodium.a`` so ``ld`` can find
it: ``ln -s /usr/local/lib/libsodium.a /usr/lib/libsodium.a``
If you wish to run or develop Synapse on Windows, the Windows Subsystem For
Linux provides a Linux environment on Windows 10 which is capable of using the
Debian, Fedora, or source installation methods. More information about WSL can
be found at https://docs.microsoft.com/en-us/windows/wsl/install-win10 for
Windows 10 and https://docs.microsoft.com/en-us/windows/wsl/install-on-server
for Windows Server.
Troubleshooting
===============
@ -908,7 +884,7 @@ to install using pip and a virtualenv::
virtualenv -p python2.7 env
source env/bin/activate
python synapse/python_dependencies.py | xargs pip install
python -m synapse.python_dependencies | xargs pip install
pip install lxml mock
This will run a process of downloading and installing all the needed
@ -963,5 +939,13 @@ variable. The default is 0.5, which can be decreased to reduce RAM usage
in memory constrained enviroments, or increased if performance starts to
degrade.
Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant
improvement in overall amount, and especially in terms of giving back RAM
to the OS. To use it, the library must simply be put in the LD_PRELOAD
environment variable when launching Synapse. On Debian, this can be done
by installing the ``libjemalloc1`` package and adding this line to
``/etc/default/matrix-synapse``::
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
.. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys

2
changelog.d/3699.misc Normal file
View File

@ -0,0 +1,2 @@
Unit tests can now be run under PostgreSQL in Docker using
``test_postgresql.sh``.

2
changelog.d/3873.misc Normal file
View File

@ -0,0 +1,2 @@
Remove documentation regarding installation on Cygwin, the use of WSL is
recommended instead.

1
changelog.d/3877.misc Normal file
View File

@ -0,0 +1 @@
mention jemalloc in the README

1
changelog.d/3879.bugfix Normal file
View File

@ -0,0 +1 @@
Don't ratelimit autojoins

1
changelog.d/3883.feature Normal file
View File

@ -0,0 +1 @@
Adding the ability to change MAX_UPLOAD_SIZE for the docker container variables.

1
changelog.d/3888.misc Normal file
View File

@ -0,0 +1 @@
Remove unmaintained "nuke-room-from-db.sh" script

1
changelog.d/3889.bugfix Normal file
View File

@ -0,0 +1 @@
Fix 500 error when deleting unknown room alias

1
changelog.d/3892.bugfix Normal file
View File

@ -0,0 +1 @@
Fix some b'abcd' noise in logs and metrics

1
changelog.d/3894.feature Normal file
View File

@ -0,0 +1 @@
Report "python_version" in the phone home stats

1
changelog.d/3895.bugfix Normal file
View File

@ -0,0 +1 @@
Fix some b'abcd' noise in logs and metrics

1
changelog.d/3897.misc Normal file
View File

@ -0,0 +1 @@
Fix typo in README, synaspse -> synapse

1
changelog.d/3899.bugfix Normal file
View File

@ -0,0 +1 @@
When we join a room, always try the server we used for the alias lookup first, to avoid unresponsive and out-of-date servers.

1
changelog.d/3903.misc Normal file
View File

@ -0,0 +1 @@
Increase the timeout when filling missing events in federation requests

1
changelog.d/3904.misc Normal file
View File

@ -0,0 +1 @@
Improve the logging when handling a federation transaction

1
changelog.d/3906.misc Normal file
View File

@ -0,0 +1 @@
Improve logging of outbound federation requests

1
changelog.d/3907.bugfix Normal file
View File

@ -0,0 +1 @@
Fix incorrect server-name indication for outgoing federation requests

1
changelog.d/3909.misc Normal file
View File

@ -0,0 +1 @@
Improve logging of outbound federation requests

1
changelog.d/3910.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where things occaisonally were not being timed out correctly.

1
changelog.d/3912.misc Normal file
View File

@ -0,0 +1 @@
Add a regression test for logging failed HTTP requests on Python 3.

1
changelog.d/3914.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where outbound federation would stop talking to some servers when using workers

12
docker/Dockerfile-pgtests Normal file
View File

@ -0,0 +1,12 @@
# Use the Sytest image that comes with a lot of the build dependencies
# pre-installed
FROM matrixdotorg/sytest:latest
# The Sytest image doesn't come with python, so install that
RUN apt-get -qq install -y python python-dev python-pip
# We need tox to run the tests in run_pg_tests.sh
RUN pip install tox
ADD run_pg_tests.sh /pg_tests.sh
ENTRYPOINT /pg_tests.sh

View File

@ -88,6 +88,7 @@ variables are available for configuration:
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
* ``SYNAPSE_MAX_UPLOAD_SIZE``, set this variable to change the max upload size [default `10M`].
Shared secrets, that will be initialized to random values if not set:

View File

@ -85,7 +85,7 @@ federation_rc_concurrent: 3
media_store_path: "/data/media"
uploads_path: "/data/uploads"
max_upload_size: "10M"
max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "10M" }}"
max_image_pixels: "32M"
dynamic_thumbnails: false

20
docker/run_pg_tests.sh Executable file
View File

@ -0,0 +1,20 @@
#!/bin/bash
# This script runs the PostgreSQL tests inside a Docker container. It expects
# the relevant source files to be mounted into /src (done automatically by the
# caller script). It will set up the database, run it, and then use the tox
# configuration to run the tests.
set -e
# Set PGUSER so Synapse's tests know what user to connect to the database with
export PGUSER=postgres
# Initialise & start the database
su -c '/usr/lib/postgresql/9.6/bin/initdb -D /var/lib/postgresql/data -E "UTF-8" --lc-collate="en_US.UTF-8" --lc-ctype="en_US.UTF-8" --username=postgres' postgres
su -c '/usr/lib/postgresql/9.6/bin/pg_ctl -w -D /var/lib/postgresql/data start' postgres
# Run the tests
cd /src
export TRIAL_FLAGS="-j 4"
tox --workdir=/tmp -e py27-postgres

View File

@ -1,57 +0,0 @@
#!/bin/bash
## CAUTION:
## This script will remove (hopefully) all trace of the given room ID from
## your homeserver.db
## Do not run it lightly.
set -e
if [ "$1" == "-h" ] || [ "$1" == "" ]; then
echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
echo "or"
echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
exit
fi
ROOMID="$1"
cat <<EOF
DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_edges WHERE room_id = '$ROOMID';
DELETE FROM room_depth WHERE room_id = '$ROOMID';
DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM events WHERE room_id = '$ROOMID';
DELETE FROM event_json WHERE room_id = '$ROOMID';
DELETE FROM state_events WHERE room_id = '$ROOMID';
DELETE FROM current_state_events WHERE room_id = '$ROOMID';
DELETE FROM room_memberships WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID';
DELETE FROM topics WHERE room_id = '$ROOMID';
DELETE FROM room_names WHERE room_id = '$ROOMID';
DELETE FROM rooms WHERE room_id = '$ROOMID';
DELETE FROM room_hosts WHERE room_id = '$ROOMID';
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
DELETE FROM state_groups WHERE room_id = '$ROOMID';
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
DELETE FROM event_search WHERE room_id = '$ROOMID';
DELETE FROM guest_access WHERE room_id = '$ROOMID';
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
DELETE FROM room_tags WHERE room_id = '$ROOMID';
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
DELETE FROM local_invites WHERE room_id = '$ROOMID';
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
DELETE FROM event_reports WHERE room_id = '$ROOMID';
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
DELETE FROM event_auth WHERE room_id = '$ROOMID';
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
VACUUM;
EOF

View File

@ -457,6 +457,10 @@ def run(hs):
stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()

View File

@ -20,7 +20,14 @@ import string
from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
from synapse.api.errors import (
AuthError,
CodeMessageException,
Codes,
NotFoundError,
StoreError,
SynapseError,
)
from synapse.types import RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler
@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler):
def delete_association(self, requester, user_id, room_alias):
# association deletion for human users
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown room alias")
raise
if not can_delete:
raise AuthError(
403, "You don't have permission to delete the alias.",
@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler):
def _user_can_delete_alias(self, alias, user_id):
creator = yield self.store.get_room_alias_creator(alias.to_string())
if creator and creator == user_id:
if creator is not None and creator == user_id:
defer.returnValue(True)
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))

View File

@ -69,6 +69,27 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
def shortstr(iterable, maxitems=5):
"""If iterable has maxitems or fewer, return the stringification of a list
containing those items.
Otherwise, return the stringification of a a list with the first maxitems items,
followed by "...".
Args:
iterable (Iterable): iterable to truncate
maxitems (int): number of items to return before truncating
Returns:
unicode
"""
items = list(itertools.islice(iterable, maxitems + 1))
if len(items) <= maxitems:
return str(items)
return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@ -114,7 +135,6 @@ class FederationHandler(BaseHandler):
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
@log_function
def on_receive_pdu(
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
):
@ -130,9 +150,17 @@ class FederationHandler(BaseHandler):
Returns (Deferred): completes with None
"""
room_id = pdu.room_id
event_id = pdu.event_id
logger.info(
"[%s %s] handling received PDU: %s",
room_id, event_id, pdu,
)
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
pdu.event_id,
event_id,
allow_none=True,
allow_rejected=True,
)
@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
)
)
if already_seen:
logger.debug("Already seen pdu %s", pdu.event_id)
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
# do some initial sanity-checking of the event. In particular, make
@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
raise FederationError(
"ERROR",
err.code,
@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
logger.info("Ignoring PDU %s for room %s from %s for now; join "
"in progress", pdu.event_id, pdu.room_id, origin)
self.room_queues[pdu.room_id].append((pdu, origin))
if room_id in self.room_queues:
logger.info(
"[%s %s] Queuing PDU from %s for now: join in progress",
room_id, event_id, origin,
)
self.room_queues[room_id].append((pdu, origin))
return
# If we're no longer in the room just ditch the event entirely. This
@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
# we should check if we *are* in fact in the room. If we are then we
# can magically rejoin the room.
is_in_room = yield self.auth.check_host_in_room(
pdu.room_id,
room_id,
self.server_name
)
if not is_in_room:
@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
)
if was_in_room:
logger.info(
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
"[%s %s] Ignoring PDU from %s as we've left the room",
room_id, event_id, origin,
)
defer.returnValue(None)
@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
)
logger.debug(
"_handle_new_pdu min_depth for %s: %d",
pdu.room_id, min_depth
"[%s %s] min_depth: %d",
room_id, event_id, min_depth,
)
prevs = {e_id for e_id, _ in pdu.prev_events}
@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen:
missing_prevs = prevs - seen
if get_missing and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
"Acquiring lock for room %r to fetch %d missing events: %r...",
pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
"[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
"Acquired lock for room %r to fetch %d missing events",
pdu.room_id, len(prevs - seen),
"[%s %s] Acquired room lock to fetch %d missing prev_events",
room_id, event_id, len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
@ -241,19 +273,23 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
logger.info(
"Found all missing prev events for %s", pdu.event_id
"[%s %s] Found all missing prev_events",
room_id, event_id,
)
elif prevs - seen:
elif missing_prevs:
logger.info(
"Not fetching %d missing events for room %r,event %s: %r...",
len(prevs - seen), pdu.room_id, pdu.event_id,
list(prevs - seen)[:5],
"[%s %s] Not recursively fetching %d missing prev_events: %s",
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
if sent_to_us_directly and prevs - seen:
# If they have sent it to us directly, and the server
# isn't telling us about the auth events that it's
# made a message referencing, we explode
logger.warn(
"[%s %s] Failed to fetch %d prev events: rejecting",
room_id, event_id, len(prevs - seen),
)
raise FederationError(
"ERROR",
403,
@ -270,15 +306,19 @@ class FederationHandler(BaseHandler):
auth_chains = set()
try:
# Get the state of the events we know about
ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
ours = yield self.store.get_state_groups(room_id, list(seen))
state_groups.append(ours)
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
logger.info(
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, pdu.room_id, p
origin, room_id, p,
)
)
auth_chains.update(got_auth_chain)
@ -291,19 +331,24 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
room_version = yield self.store.get_room_version(pdu.room_id)
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
room_version, state_groups, {pdu.event_id: pdu}, fetch
room_version, state_groups, {event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
logger.warn(
"[%s %s] Error attempting to resolve state at missing "
"prev_events",
room_id, event_id, exc_info=True,
)
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
affected=pdu.event_id,
affected=event_id,
)
yield self._process_received_pdu(
@ -322,15 +367,16 @@ class FederationHandler(BaseHandler):
prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
room_id = pdu.room_id
event_id = pdu.event_id
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)
latest = yield self.store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@ -338,8 +384,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
"Missing %d events for room %r pdu %s: %r...",
len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
"[%s %s]: Requesting %d prev_events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
# XXX: we set timeout to 10s to help workaround
@ -360,49 +406,87 @@ class FederationHandler(BaseHandler):
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744
#
# ----
#
# Update richvdh 2018/09/18: There are a number of problems with timing this
# request out agressively on the client side:
#
# - it plays badly with the server-side rate-limiter, which starts tarpitting you
# if you send too many requests at once, so you end up with the server carefully
# working through the backlog of your requests, which you have already timed
# out.
#
# - for this request in particular, we now (as of
# https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
# server can't produce a plausible-looking set of prev_events - so we becone
# much more likely to reject the event.
#
# - contrary to what it says above, we do *not* fall back to fetching fresh state
# for the room if get_missing_events times out. Rather, we give up processing
# the PDU whose prevs we are missing, which then makes it much more likely that
# we'll end up back here for the *next* PDU in the list, which exacerbates the
# problem.
#
# - the agressive 10s timeout was introduced to deal with incoming federation
# requests taking 8 hours to process. It's not entirely clear why that was going
# on; certainly there were other issues causing traffic storms which are now
# resolved, and I think in any case we may be more sensible about our locking
# now. We're *certainly* more sensible about our logging.
#
# All that said: Let's try increasing the timout to 60s and see what happens.
missing_events = yield self.federation_client.get_missing_events(
origin,
pdu.room_id,
room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
timeout=10000,
timeout=60000,
)
logger.info(
"Got %d events: %r...",
len(missing_events), [e.event_id for e in missing_events[:5]]
"[%s %s]: Got %d prev_events: %s",
room_id, event_id, len(missing_events), shortstr(missing_events),
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
for ev in missing_events:
logger.info(
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
try:
yield self.on_receive_pdu(
origin,
e,
ev,
get_missing=False
)
except FederationError as e:
if e.code == 403:
logger.warn("Event %s failed history check.")
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
)
else:
raise
@log_function
@defer.inlineCallbacks
def _process_received_pdu(self, origin, pdu, state, auth_chain):
def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
event = pdu
room_id = event.room_id
event_id = event.event_id
logger.debug("Processing event: %s", event)
logger.debug(
"[%s %s] Processing event: %s",
room_id, event_id, event,
)
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@ -411,15 +495,16 @@ class FederationHandler(BaseHandler):
# event.
if state and auth_chain and not event.internal_metadata.is_outlier():
is_in_room = yield self.auth.check_host_in_room(
event.room_id,
room_id,
self.server_name
)
else:
is_in_room = True
if not is_in_room:
logger.info(
"Got event for room we're not in: %r %r",
event.room_id, event.event_id
"[%s %s] Got event for room we're not in",
room_id, event_id,
)
try:
@ -431,7 +516,7 @@ class FederationHandler(BaseHandler):
"ERROR",
e.code,
e.msg,
affected=event.event_id,
affected=event_id,
)
else:
@ -480,12 +565,12 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
room = yield self.store.get_room(event.room_id)
room = yield self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
room_id=event.room_id,
room_id=room_id,
room_creator_user_id="",
is_public=False,
)
@ -513,7 +598,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
yield self.user_joined_room(user, event.room_id)
yield self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
@ -1430,12 +1515,10 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue(None)
@log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@defer.inlineCallbacks
@log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event(
@ -1635,8 +1718,8 @@ class FederationHandler(BaseHandler):
)
except AuthError as e:
logger.warn(
"Rejecting %s because %s",
event.event_id, e.msg
"[%s %s] Rejecting: %s",
event.room_id, event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR

View File

@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler):
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
ratelimit=False,
)

View File

@ -603,6 +603,11 @@ class RoomMemberHandler(object):
room_id = mapping["room_id"]
servers = mapping["servers"]
# put the server which owns the alias at the front of the server list.
if room_alias.domain in servers:
servers.remove(room_alias.domain)
servers.insert(0, room_alias.domain)
defer.returnValue((RoomID.from_string(room_id), servers))
@defer.inlineCallbacks

View File

@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@ -99,7 +99,7 @@ class SimpleHttpClient(object):
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
add_timeout_to_deferred(
request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)

View File

@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
Args:
reactor: Twisted reactor.
destination (bytes): The name of the server to connect to.
destination (unicode): The name of the server to connect to.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
# the SNI string should be the same as the Host header, minus the port.
# as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
# the Host header and SNI should therefore be the server_name of the remote
# server.
tls_options = tls_client_options_factory.get_options(domain)
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
tls_client_options_factory.get_options(host),
HostnameEndpoint(reactor, host, port, timeout=timeout))
tls_options,
HostnameEndpoint(reactor, host, port, timeout=timeout),
)
default_port = 8448
if port is None:

View File

@ -17,10 +17,12 @@ import cgi
import logging
import random
import sys
from io import BytesIO
from six import PY3, string_types
from six.moves import urllib
import attr
import treq
from canonicaljson import encode_canonical_json
from prometheus_client import Counter
@ -28,8 +30,9 @@ from signedjson.sign import sign_json
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@ -41,13 +44,11 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object):
)
_next_id = 1
@attr.s
class MatrixFederationRequest(object):
method = attr.ib()
"""HTTP method
:type: str
"""
path = attr.ib()
"""HTTP path
:type: str
"""
destination = attr.ib()
"""The remote server to send the HTTP request to.
:type: str"""
json = attr.ib(default=None)
"""JSON to send in the body.
:type: dict|None
"""
json_callback = attr.ib(default=None)
"""A callback to generate the JSON.
:type: func|None
"""
query = attr.ib(default=None)
"""Query arguments.
:type: dict|None
"""
txn_id = attr.ib(default=None)
"""Unique ID for this request (for logging)
:type: str|None
"""
def __attrs_post_init__(self):
global _next_id
self.txn_id = "%s-O-%s" % (self.method, _next_id)
_next_id = (_next_id + 1) % (MAXINT - 1)
def get_json(self):
if self.json_callback:
return self.json_callback()
return self.json
@defer.inlineCallbacks
def _handle_json_response(reactor, timeout_sec, request, response):
"""
Reads the JSON body of a response, with a timeout
Args:
reactor (IReactor): twisted reactor, for the timeout
timeout_sec (float): number of seconds to wait for response to complete
request (MatrixFederationRequest): the request that triggered the response
response (IResponse): response to the request
Returns:
dict: parsed JSON response
"""
try:
check_content_type_is_json(response.headers)
d = treq.json_content(response)
d = timeout_deferred(
d,
timeout=timeout_sec,
reactor=reactor,
)
body = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
"{%s} [%s] Error reading response: %s",
request.txn_id,
request.destination,
e,
)
raise
logger.info(
"{%s} [%s] Completed: %d %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
defer.returnValue(body)
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
self.default_timeout = 60
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
return urllib.parse.urlunparse(
(b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
)
def schedule(x):
reactor.callLater(_EPSILON, x)
self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks
def _request(self, destination, method, path,
json=None, json_callback=None,
param_bytes=b"",
query=None, retry_on_dns_fail=True,
timeout=None, long_retries=False,
ignore_backoff=False,
backoff_on_404=False):
def _send_request(
self,
request,
retry_on_dns_fail=True,
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False
):
"""
Creates and sends a request to the given server.
Sends a request to the given server.
Args:
destination (str): The remote server to send the HTTP request to.
method (str): HTTP method
path (str): The HTTP path
json (dict or None): JSON to send in the body.
json_callback (func or None): A callback to generate the JSON.
query (dict or None): Query arguments.
request (MatrixFederationRequest): details of request to be sent
timeout (int|None): number of milliseconds to wait for the response headers
(including connecting to the server). 60s by default.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): Back off if we get a 404
Returns:
@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object):
if (
self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist
request.destination not in self.hs.config.federation_domain_whitelist
):
raise FederationDeniedError(destination)
raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
destination,
request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
headers_dict = {}
path_bytes = path.encode("ascii")
if query:
query_bytes = encode_query_args(query)
method = request.method
destination = request.destination
path_bytes = request.path.encode("ascii")
if request.query:
query_bytes = encode_query_args(request.query)
else:
query_bytes = b""
headers_dict = {
"User-Agent": [self.version_string],
"Host": [destination],
"Host": [request.destination],
}
with limiter:
url = self._create_url(
destination.encode("ascii"), path_bytes, param_bytes, query_bytes
).decode('ascii')
txn_id = "%s-O-%s" % (method, self._next_id)
self._next_id = (self._next_id + 1) % (MAXINT - 1)
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
http_url = urllib.parse.urlunparse(
(b"", b"", path_bytes, param_bytes, query_bytes, b"")
).decode('ascii')
url = urllib.parse.urlunparse((
b"matrix", destination.encode("ascii"),
path_bytes, None, query_bytes, b"",
)).decode('ascii')
http_url = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
)).decode('ascii')
log_result = None
while True:
try:
if json_callback:
json = json_callback()
json = request.get_json()
if json:
data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"]
@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object):
data = None
self.sign_request(destination, method, http_url, headers_dict)
outbound_logger.info(
logger.info(
"{%s} [%s] Sending request: %s %s",
txn_id, destination, method, url
request.txn_id, destination, method, url
)
if data:
producer = FileBodyProducer(
BytesIO(data),
cooperator=self._cooperator
)
else:
producer = None
request_deferred = treq.request(
method,
url,
headers=Headers(headers_dict),
data=data,
data=producer,
agent=self.agent,
reactor=self.hs.get_reactor(),
unbuffered=True
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
# Sometimes the timeout above doesn't work, so lets hack yet
# another layer of timeouts in in the vain hope that at some
# point the world made sense and this really really really
# should work.
request_deferred = timeout_no_seriously(
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout * 2,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
@ -244,30 +339,19 @@ class MatrixFederationHttpClient(object):
request_deferred,
)
log_result = "%d %s" % (response.code, response.phrase,)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
logger.warn(
"{%s} Sending request failed to %s: %s %s: %s",
txn_id,
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
destination,
method,
url,
_flatten_response_never_received(e),
)
log_result = _flatten_response_never_received(e)
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
raise
if retries_left and not timeout:
if long_retries:
@ -280,33 +364,37 @@ class MatrixFederationHttpClient(object):
delay *= random.uniform(0.8, 1.4)
logger.debug(
"{%s} Waiting %s before sending to %s...",
txn_id,
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
destination,
delay,
destination
)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
finally:
outbound_logger.info(
"{%s} [%s] Result: %s",
txn_id,
destination,
log_result,
)
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
with logcontext.PreserveLoggingContext():
d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
d = treq.content(response)
d = timeout_deferred(
d,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@ -400,29 +488,26 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist
"""
if not json_data_callback:
json_data_callback = lambda: data
response = yield self._request(
destination,
"PUT",
path,
json_callback=json_data_callback,
request = MatrixFederationRequest(
method="PUT",
destination=destination,
path=path,
query=args,
json_callback=json_data_callback,
json=data,
)
response = yield self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -456,31 +541,30 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
response = yield self._request(
destination,
"POST",
path,
request = MatrixFederationRequest(
method="POST",
destination=destination,
path=path,
query=args,
json=data,
)
response = yield self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
body = yield _handle_json_response(
self.hs.get_reactor(), _sec_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -516,25 +600,23 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
response = yield self._request(
destination,
"GET",
path,
request = MatrixFederationRequest(
method="GET",
destination=destination,
path=path,
query=args,
)
response = yield self._send_request(
request,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -565,25 +647,23 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
response = yield self._request(
destination,
"DELETE",
path,
request = MatrixFederationRequest(
method="DELETE",
destination=destination,
path=path,
query=args,
)
response = yield self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -611,11 +691,15 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
response = yield self._request(
destination,
"GET",
path,
request = MatrixFederationRequest(
method="GET",
destination=destination,
path=path,
query=args,
)
response = yield self._send_request(
request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@ -623,14 +707,25 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
with logcontext.PreserveLoggingContext():
d = _readBodyToFile(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
length = yield make_deferred_yieldable(d)
except Exception:
logger.exception("Failed to download body")
d = _readBodyToFile(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
length = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
"{%s} [%s] Error reading response: %s",
request.txn_id,
request.destination,
e,
)
raise
logger.info(
"{%s} [%s] Completed: %d %s [%d bytes]",
request.txn_id,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
length,
)
defer.returnValue((length, headers))

View File

@ -162,7 +162,7 @@ class RequestMetrics(object):
with _in_flight_requests_lock:
_in_flight_requests.add(self)
def stop(self, time_sec, request):
def stop(self, time_sec, response_code, sent_bytes):
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
@ -179,35 +179,35 @@ class RequestMetrics(object):
)
return
response_code = str(request.code)
response_code = str(response_code)
outgoing_responses_counter.labels(request.method, response_code).inc()
outgoing_responses_counter.labels(self.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
response_count.labels(self.method, self.name, tag).inc()
response_timer.labels(request.method, self.name, tag, response_code).observe(
response_timer.labels(self.method, self.name, tag, response_code).observe(
time_sec - self.start
)
resource_usage = context.get_resource_usage()
response_ru_utime.labels(request.method, self.name, tag).inc(
response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime,
)
response_ru_stime.labels(request.method, self.name, tag).inc(
response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime,
)
response_db_txn_count.labels(request.method, self.name, tag).inc(
response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
)
response_db_txn_duration.labels(request.method, self.name, tag).inc(
response_db_txn_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_duration_sec
)
response_db_sched_duration.labels(request.method, self.name, tag).inc(
response_db_sched_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_sched_duration_sec
)
response_size.labels(request.method, self.name, tag).inc(request.sentLength)
response_size.labels(self.method, self.name, tag).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in

View File

@ -75,14 +75,14 @@ class SynapseRequest(Request):
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
self.method,
self.method.decode('ascii', errors='replace'),
self.get_redacted_uri(),
self.clientproto,
self.clientproto.decode('ascii', errors='replace'),
self.site.site_tag,
)
def get_request_id(self):
return "%s-%i" % (self.method, self.request_seq)
return "%s-%i" % (self.method.decode('ascii'), self.request_seq)
def get_redacted_uri(self):
uri = self.uri
@ -119,7 +119,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(self.method,
requests_counter.labels(self.method.decode('ascii'),
self.request_metrics.name).inc()
@contextlib.contextmanager
@ -280,15 +280,15 @@ class SynapseRequest(Request):
int(usage.db_txn_count),
self.sentLength,
code,
self.method,
self.method.decode('ascii'),
self.get_redacted_uri(),
self.clientproto,
self.clientproto.decode('ascii', errors='replace'),
user_agent,
usage.evt_db_fetch_count,
)
try:
self.request_metrics.stop(self.finish_time, self)
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)

View File

@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
from synapse.util.async_helpers import (
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@ -337,7 +333,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred(
listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
@ -354,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break
@ -559,15 +555,16 @@ class Notifier(object):
if end_time <= now:
break
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
listener.deferred = timeout_deferred(
listener.deferred,
timeout=(end_time - now) / 1000.,
reactor=self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
yield listener.deferred
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break

View File

@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore):
},
retcol="creator",
desc="get_room_alias_creator",
allow_none=True
)
@cached(max_entries=5000)

View File

@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore):
"""
pass
@cached(max_entries=10000)
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
# XXX: we could chose to not bother persisting this if our cache thinks
# this is a NOOP
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore):
retry_last_ts, retry_interval):
self.database_engine.lock_table(txn, "destinations")
self._invalidate_cache_and_stream(
txn, self.get_destination_retry_timings, (destination,)
)
# We need to be careful here as the data may have changed from under us
# due to a worker setting the timings.

View File

@ -374,29 +374,25 @@ class ReadWriteLock(object):
defer.returnValue(_ctx_manager())
class DeferredTimeoutError(Exception):
"""
This error is raised by default when a L{Deferred} times out.
"""
def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise defer.TimeoutError(timeout, "Deferred")
return value
def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.
def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
the case where the given deferred's canceller throws.
This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
reactor (twisted.internet.reactor): the Twisted reactor to use
deferred (Deferred)
timeout (float): Timeout in seconds
reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.
The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]
CancelledError Failure into a defer.TimeoutError.
def time_it_out():
timed_out[0] = True
deferred.cancel()
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value
def timeout_no_seriously(deferred, timeout, reactor):
"""The in build twisted deferred addTimeout (and the method above)
completely fail to time things out under some unknown circumstances.
Lets try a different way of timing things out and maybe that will make
things work?!
TODO: Kill this with fire.
Returns:
Deferred
"""
new_d = defer.Deferred()
@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out():
timed_out[0] = True
if not new_d.called:
new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
try:
deferred.cancel()
except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")
deferred.cancel()
if not new_d.called:
new_d.errback(defer.TimeoutError(timeout, "Deferred"))
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
return _cancelled_to_timed_out_error(value, timeout)
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)

View File

@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
else:
self.retry_interval = self.min_retry_interval
logger.debug(
logger.info(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)

12
test_postgresql.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
# This script builds the Docker image to run the PostgreSQL tests, and then runs
# the tests.
set -e
# Build, and tag
docker build docker/ -f docker/Dockerfile-pgtests -t synapsepgtests
# Run, mounting the current directory into /src
docker run --rm -it -v $(pwd)\:/src synapsepgtests

View File

@ -18,9 +18,14 @@ from mock import Mock
from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.web.client import ResponseNeverReceived
from twisted.web.http import HTTPChannel
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.http.matrixfederationclient import (
MatrixFederationHttpClient,
MatrixFederationRequest,
)
from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase
@ -40,7 +45,7 @@ class FederationClientTests(HomeserverTestCase):
"""
If the DNS raising returns an error, it will bubble up.
"""
d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000)
d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
self.pump()
f = self.failureResultOf(d)
@ -51,7 +56,7 @@ class FederationClientTests(HomeserverTestCase):
If the HTTP request is not connected and is timed out, it'll give a
ConnectingCancelledError.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump()
@ -78,7 +83,7 @@ class FederationClientTests(HomeserverTestCase):
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump()
@ -108,7 +113,12 @@ class FederationClientTests(HomeserverTestCase):
"""
Once the client gets the headers, _request returns successfully.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
request = MatrixFederationRequest(
method="GET",
destination="testserv:8008",
path="foo/bar",
)
d = self.cl._send_request(request, timeout=10000)
self.pump()
@ -155,3 +165,26 @@ class FederationClientTests(HomeserverTestCase):
f = self.failureResultOf(d)
self.assertIsInstance(f.value, TimeoutError)
def test_client_sends_body(self):
self.cl.post_json(
"testserv:8008", "foo/bar", timeout=10000,
data={"a": "b"}
)
self.pump()
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
client = clients[0][2].buildProtocol(None)
server = HTTPChannel()
client.makeConnection(FakeTransport(server, self.reactor))
server.makeConnection(FakeTransport(client, self.reactor))
self.pump(0.1)
self.assertEqual(len(server.requests), 1)
request = server.requests[0]
content = request.content.read()
self.assertEqual(content, b'{"a":"b"}')

View File

@ -15,8 +15,6 @@
from mock import Mock, NonCallableMock
import attr
from synapse.replication.tcp.client import (
ReplicationClientFactory,
ReplicationClientHandler,
@ -24,6 +22,7 @@ from synapse.replication.tcp.client import (
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from tests import unittest
from tests.server import FakeTransport
class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
@ -56,36 +55,8 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
server = server_factory.buildProtocol(None)
client = client_factory.buildProtocol(None)
@attr.s
class FakeTransport(object):
other = attr.ib()
disconnecting = False
buffer = attr.ib(default=b'')
def registerProducer(self, producer, streaming):
self.producer = producer
def _produce():
self.producer.resumeProducing()
reactor.callLater(0.1, _produce)
reactor.callLater(0.0, _produce)
def write(self, byt):
self.buffer = self.buffer + byt
if getattr(self.other, "transport") is not None:
self.other.dataReceived(self.buffer)
self.buffer = b""
def writeSequence(self, seq):
for x in seq:
self.write(x)
client.makeConnection(FakeTransport(server))
server.makeConnection(FakeTransport(client))
client.makeConnection(FakeTransport(server, reactor))
server.makeConnection(FakeTransport(client, reactor))
def replicate(self):
"""Tell the master side of replication that something has happened, and then

View File

@ -280,3 +280,84 @@ def get_clock():
clock = ThreadedMemoryReactorClock()
hs_clock = Clock(clock)
return (clock, hs_clock)
@attr.s
class FakeTransport(object):
"""
A twisted.internet.interfaces.ITransport implementation which sends all its data
straight into an IProtocol object: it exists to connect two IProtocols together.
To use it, instantiate it with the receiving IProtocol, and then pass it to the
sending IProtocol's makeConnection method:
server = HTTPChannel()
client.makeConnection(FakeTransport(server, self.reactor))
If you want bidirectional communication, you'll need two instances.
"""
other = attr.ib()
"""The Protocol object which will receive any data written to this transport.
:type: twisted.internet.interfaces.IProtocol
"""
_reactor = attr.ib()
"""Test reactor
:type: twisted.internet.interfaces.IReactorTime
"""
disconnecting = False
buffer = attr.ib(default=b'')
producer = attr.ib(default=None)
def getPeer(self):
return None
def getHost(self):
return None
def loseConnection(self):
self.disconnecting = True
def abortConnection(self):
self.disconnecting = True
def pauseProducing(self):
self.producer.pauseProducing()
def unregisterProducer(self):
if not self.producer:
return
self.producer = None
def registerProducer(self, producer, streaming):
self.producer = producer
self.producerStreaming = streaming
def _produce():
d = self.producer.resumeProducing()
d.addCallback(lambda x: self._reactor.callLater(0.1, _produce))
if not streaming:
self._reactor.callLater(0.0, _produce)
def write(self, byt):
self.buffer = self.buffer + byt
def _write():
if getattr(self.other, "transport") is not None:
self.other.dataReceived(self.buffer)
self.buffer = b""
return
self._reactor.callLater(0.0, _write)
_write()
def writeSequence(self, seq):
for x in seq:
self.write(x)

View File

@ -1,14 +1,35 @@
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from six import StringIO
from twisted.internet.defer import Deferred
from twisted.test.proto_helpers import MemoryReactorClock
from twisted.python.failure import Failure
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite, logger
from synapse.util import Clock
from tests import unittest
from tests.server import make_request, render, setup_test_homeserver
from tests.server import FakeTransport, make_request, render, setup_test_homeserver
class JsonResourceTests(unittest.TestCase):
@ -121,3 +142,52 @@ class JsonResourceTests(unittest.TestCase):
self.assertEqual(channel.result["code"], b'400')
self.assertEqual(channel.json_body["error"], "Unrecognized request")
self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")
class SiteTestCase(unittest.HomeserverTestCase):
def test_lose_connection(self):
"""
We log the URI correctly redacted when we lose the connection.
"""
class HangingResource(Resource):
"""
A Resource that strategically hangs, as if it were processing an
answer.
"""
def render(self, request):
return NOT_DONE_YET
# Set up a logging handler that we can inspect afterwards
output = StringIO()
handler = logging.StreamHandler(output)
logger.addHandler(handler)
old_level = logger.level
logger.setLevel(10)
self.addCleanup(logger.setLevel, old_level)
self.addCleanup(logger.removeHandler, handler)
# Make a resource and a Site, the resource will hang and allow us to
# time out the request while it's 'processing'
base_resource = Resource()
base_resource.putChild(b'', HangingResource())
site = SynapseSite("test", "site_tag", {}, base_resource, "1.0")
server = site.buildProtocol(None)
client = AccumulatingProtocol()
client.makeConnection(FakeTransport(server, self.reactor))
server.makeConnection(FakeTransport(client, self.reactor))
# Send a request with an access token that will get redacted
server.dataReceived(b"GET /?access_token=bar HTTP/1.0\r\n\r\n")
self.pump()
# Lose the connection
e = Failure(Exception("Failed123"))
server.connectionLost(e)
handler.flush()
# Our access token is redacted and the failure reason is logged.
self.assertIn("/?access_token=<redacted>", output.getvalue())
self.assertIn("Failed123", output.getvalue())

View File

@ -219,7 +219,8 @@ class HomeserverTestCase(TestCase):
Function to be overridden in subclasses.
"""
raise NotImplementedError()
hs = self.setup_test_homeserver()
return hs
def prepare(self, reactor, clock, homeserver):
"""

View File

@ -4,7 +4,7 @@ envlist = packaging, py27, py36, pep8, check_isort
[base]
deps =
coverage
Twisted>=15.1
Twisted>=17.1
mock
python-subunit
junitxml