Compare commits

...

6 Commits

Author SHA1 Message Date
Andrew Morgan f43c66d23b Merge branch 'develop' of github.com:matrix-org/synapse into anoa/info-mainline-no-check-password-reset 2020-09-29 14:21:41 +01:00
Andrew Morgan 12f0d18611
Add support for running Complement against the local checkout (#8317)
This PR adds a script that:

* Builds the local Synapse checkout using our existing `docker/Dockerfile` image.
* Downloads [Complement](https://github.com/matrix-org/complement/)'s source code.
* Builds the [Synapse.Dockerfile](https://github.com/matrix-org/complement/blob/master/dockerfiles/Synapse.Dockerfile) using the above dockerfile as a base.
* Builds and runs Complement against it.

This set up differs slightly from [that of the dendrite repo](https://github.com/matrix-org/dendrite/blob/master/build/scripts/complement.sh) (`complement.sh`, `Complement.Dockerfile`), which instead stores a separate, but slightly modified, dockerfile in Dendrite's repo rather than running the one stored in Complement's repo. That synapse equivalent to that dockerfile (`Synapse.Dockerfile`) in Complement's repo is just based on top of `matrixdotorg/synapse:latest`, which we opt to build here locally.

Thus copying over the files from Complement's repo wouldn't change any functionality, and would result in two instances of the same files. So just using the dockerfile in Complement's repo was decided upon instead.
2020-09-29 13:47:47 +01:00
Will Hunt 8676d8ab2e
Filter out appservices from mau count (#8404)
This is an attempt to fix #8403.
2020-09-29 13:11:02 +01:00
Andrew Morgan 1c6b8752b8
Only assert valid next_link params when provided (#8417)
Broken in https://github.com/matrix-org/synapse/pull/8275 and has yet to be put in a release. Fixes https://github.com/matrix-org/synapse/issues/8418.

`next_link` is an optional parameter. However, we were checking whether the `next_link` param was valid, even if it wasn't provided. In that case, `next_link` was `None`, which would clearly not be a valid URL.

This would prevent password reset and other operations if `next_link` was not provided, and the `next_link_domain_whitelist` config option was set.
2020-09-29 12:36:44 +01:00
Richard van der Hoff 866c84da8d
Add metrics to track success/otherwise of replication requests (#8406)
One hope is that this might provide some insights into #3365.
2020-09-29 11:06:11 +01:00
Richard van der Hoff 1c262431f9
Fix handling of connection timeouts in outgoing http requests (#8400)
* Remove `on_timeout_cancel` from `timeout_deferred`

The `on_timeout_cancel` param to `timeout_deferred` wasn't always called on a
timeout (in particular if the canceller raised an exception), so it was
unreliable. It was also only used in one place, and to be honest it's easier to
do what it does a different way.

* Fix handling of connection timeouts in outgoing http requests

Turns out that if we get a timeout during connection, then a different
exception is raised, which wasn't always handled correctly.

To fix it, catch the exception in SimpleHttpClient and turn it into a
RequestTimedOutError (which is already a documented exception).

Also add a description to RequestTimedOutError so that we can see which stage
it failed at.

* Fix incorrect handling of timeouts reading federation responses

This was trapping the wrong sort of TimeoutError, so was never being hit.

The effect was relatively minor, but we should fix this so that it does the
expected thing.

* Fix inconsistent handling of `timeout` param between methods

`get_json`, `put_json` and `delete_json` were applying a different timeout to
the response body to `post_json`; bring them in line and test.

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Co-authored-by: Erik Johnston <erik@matrix.org>
2020-09-29 10:29:21 +01:00
19 changed files with 404 additions and 118 deletions

1
changelog.d/8317.feature Normal file
View File

@ -0,0 +1 @@
Support testing the local Synapse checkout against the [Complement homeserver test suite](https://github.com/matrix-org/complement/).

1
changelog.d/8400.bugfix Normal file
View File

@ -0,0 +1 @@
Fix incorrect handling of timeouts on outgoing HTTP requests.

1
changelog.d/8404.misc Normal file
View File

@ -0,0 +1 @@
Do not include appservice users when calculating the total MAU for a server.

1
changelog.d/8406.feature Normal file
View File

@ -0,0 +1 @@
Add prometheus metrics for replication requests.

1
changelog.d/8417.feature Normal file
View File

@ -0,0 +1 @@
Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number.

22
scripts-dev/complement.sh Executable file
View File

@ -0,0 +1,22 @@
#! /bin/bash -eu
# This script is designed for developers who want to test their code
# against Complement.
#
# It makes a Synapse image which represents the current checkout,
# then downloads Complement and runs it with that image.
cd "$(dirname $0)/.."
# Build the base Synapse image from the local checkout
docker build -t matrixdotorg/synapse:latest -f docker/Dockerfile .
# Download Complement
wget -N https://github.com/matrix-org/complement/archive/master.tar.gz
tar -xzf master.tar.gz
cd complement-master
# Build the Synapse image from Complement, based on the above image we just built
docker build -t complement-synapse -f dockerfiles/Synapse.Dockerfile ./dockerfiles
# Run the tests on the resulting image!
COMPLEMENT_BASE_IMAGE=complement-synapse go test -v -count=1 ./tests

View File

@ -21,8 +21,6 @@ import logging
import urllib.parse
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
from twisted.internet.error import TimeoutError
from synapse.api.errors import (
CodeMessageException,
Codes,
@ -30,6 +28,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.http import RequestTimedOutError
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, Requester
from synapse.util import json_decoder
@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler):
try:
data = await self.http_client.get_json(url, query_params)
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
logger.info(
@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler):
if e.code != 404 or not use_v2:
logger.error("3PID bind failed with Matrix error: %r", e)
raise e.to_synapse_error()
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except CodeMessageException as e:
data = json_decoder.decode(e.msg) # XXX WAT?
@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler):
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(500, "Failed to contact identity server")
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
await self.store.remove_user_bound_threepid(
@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
async def requestMsisdnToken(
@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
assert self.hs.config.public_baseurl
@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler):
id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
body,
)
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler):
# require or validate it. See the following for context:
# https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
return data["mxid"]
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except IOError as e:
logger.warning("Error from v1 identity server lookup: %s" % (e,))
@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler):
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
{"access_token": id_access_token},
)
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
if not isinstance(hash_details, dict):
@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler):
},
headers=headers,
)
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except Exception as e:
logger.warning("Error when performing a v2 3pid lookup: %s", e)
@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler):
invite_config,
{"Authorization": create_id_access_token_header(id_access_token)},
)
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
if e.code != 404:
@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler):
data = await self.blacklisting_http_client.post_json_get_json(
url, invite_config
)
except TimeoutError:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
logger.warning(

View File

@ -16,8 +16,6 @@
import re
from twisted.internet import task
from twisted.internet.defer import CancelledError
from twisted.python import failure
from twisted.web.client import FileBodyProducer
from synapse.api.errors import SynapseError
@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError
class RequestTimedOutError(SynapseError):
"""Exception representing timeout of an outbound request"""
def __init__(self):
super().__init__(504, "Timed out")
def cancelled_to_request_timed_out_error(value, timeout):
"""Turns CancelledErrors into RequestTimedOutErrors.
For use with async.add_timeout_to_deferred
"""
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
def __init__(self, msg):
super().__init__(504, msg)
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")

View File

@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import urllib
from io import BytesIO
@ -38,7 +37,7 @@ from zope.interface import implementer, provider
from OpenSSL import SSL
from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, ssl
from twisted.internet import defer, error as twisted_error, protocol, ssl
from twisted.internet.interfaces import (
IReactorPluggableNameResolver,
IResolutionReceiver,
@ -46,17 +45,18 @@ from twisted.internet.interfaces import (
from twisted.internet.task import Cooperator
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool, readBody
from twisted.web.client import (
Agent,
HTTPConnectionPool,
ResponseNeverReceived,
readBody,
)
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import (
QuieterFileBodyProducer,
cancelled_to_request_timed_out_error,
redact_uri,
)
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
@ -332,8 +332,6 @@ class SimpleHttpClient:
RequestTimedOutError if the request times out before the headers are read
"""
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
@ -362,15 +360,17 @@ class SimpleHttpClient:
data=body_producer,
headers=headers,
**self._extra_treq_args
)
) # type: defer.Deferred
# we use our own timeout mechanism rather than treq's as a workaround
# for https://twistedmatrix.com/trac/ticket/9534.
request_deferred = timeout_deferred(
request_deferred,
60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
request_deferred, 60, self.hs.get_reactor(),
)
# turn timeouts into RequestTimedOutErrors
request_deferred.addErrback(_timeout_to_request_timed_out_error)
response = await make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
@ -410,7 +410,7 @@ class SimpleHttpClient:
parsed json
Raises:
RequestTimedOutException: if there is a timeout before the response headers
RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@ -461,7 +461,7 @@ class SimpleHttpClient:
parsed json
Raises:
RequestTimedOutException: if there is a timeout before the response headers
RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@ -506,7 +506,7 @@ class SimpleHttpClient:
Returns:
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
Raises:
RequestTimedOutException: if there is a timeout before the response headers
RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@ -538,7 +538,7 @@ class SimpleHttpClient:
Returns:
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
Raises:
RequestTimedOutException: if there is a timeout before the response headers
RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@ -586,7 +586,7 @@ class SimpleHttpClient:
Succeeds when we get a 2xx HTTP response, with the
HTTP body as bytes.
Raises:
RequestTimedOutException: if there is a timeout before the response headers
RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@ -631,7 +631,7 @@ class SimpleHttpClient:
headers, absolute URI of the response and HTTP response code.
Raises:
RequestTimedOutException: if there is a timeout before the response headers
RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@ -684,6 +684,18 @@ class SimpleHttpClient:
)
def _timeout_to_request_timed_out_error(f: Failure):
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
# The TCP connection has its own timeout (set by the 'connectTimeout' param
# on the Agent), which raises twisted_error.TimeoutError exception.
raise RequestTimedOutError("Timeout connecting to remote server")
elif f.check(defer.TimeoutError, ResponseNeverReceived):
# this one means that we hit our overall timeout on the request
raise RequestTimedOutError("Timeout waiting for response from remote server")
return f
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.

View File

@ -171,7 +171,7 @@ async def _handle_json_response(
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
body = await make_deferred_yieldable(d)
except TimeoutError as e:
except defer.TimeoutError as e:
logger.warning(
"{%s} [%s] Timed out reading response - %s %s",
request.txn_id,
@ -655,10 +655,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
timeout (int|None): number of milliseconds to wait for the response headers
(including connecting to the server), *for each attempt*.
timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
Note that we may make several attempts to send the request; this
timeout applies to the time spent waiting for response headers for
*each* attempt (including connection time) as well as the time spent
reading the response body after a 200 response.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): True if we should count a 404 response as
@ -704,8 +708,13 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
if timeout is not None:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
body = await _handle_json_response(
self.reactor, self.default_timeout, request, response, start_ms
self.reactor, _sec_timeout, request, response, start_ms
)
return body
@ -734,10 +743,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
timeout (int|None): number of milliseconds to wait for the response headers
(including connecting to the server), *for each attempt*.
timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
Note that we may make several attempts to send the request; this
timeout applies to the time spent waiting for response headers for
*each* attempt (including connection time) as well as the time spent
reading the response body after a 200 response.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
@ -801,10 +814,14 @@ class MatrixFederationHttpClient:
args (dict|None): A dictionary used to create query strings, defaults to
None.
timeout (int|None): number of milliseconds to wait for the response headers
(including connecting to the server), *for each attempt*.
timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
Note that we may make several attempts to send the request; this
timeout applies to the time spent waiting for response headers for
*each* attempt (including connection time) as well as the time spent
reading the response body after a 200 response.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
@ -840,8 +857,13 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
if timeout is not None:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
body = await _handle_json_response(
self.reactor, self.default_timeout, request, response, start_ms
self.reactor, _sec_timeout, request, response, start_ms
)
return body
@ -865,10 +887,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
timeout (int|None): number of milliseconds to wait for the response headers
(including connecting to the server), *for each attempt*.
timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
Note that we may make several attempts to send the request; this
timeout applies to the time spent waiting for response headers for
*each* attempt (including connection time) as well as the time spent
reading the response body after a 200 response.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
@ -900,8 +926,13 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
)
if timeout is not None:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
body = await _handle_json_response(
self.reactor, self.default_timeout, request, response, start_ms
self.reactor, _sec_timeout, request, response, start_ms
)
return body

View File

@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase):
`BrowserLikePolicyForHTTPS`, so unless you have special
requirements you can leave this as-is.
connectTimeout (float): The amount of time that this Agent will wait
for the peer to accept a connection.
connectTimeout (Optional[float]): The amount of time that this Agent will wait
for the peer to accept a connection, in seconds. If 'None',
HostnameEndpoint's default (30s) will be used.
This is used for connections to both proxies and destination servers.
bindAddress (bytes): The local address for client sockets to bind to.
@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase):
Returns:
Deferred[IResponse]: completes when the header of the response has
been received (regardless of the response status code).
Can fail with:
SchemeNotSupported: if the uri is not http or https
twisted.internet.error.TimeoutError if the server we are connecting
to (proxy or destination) does not accept a connection before
connectTimeout.
... other things too.
"""
uri = uri.strip()
if not _VALID_URI.match(uri):

View File

@ -20,18 +20,28 @@ import urllib
from inspect import signature
from typing import Dict, List, Tuple
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
RequestSendFailed,
SynapseError,
)
from prometheus_client import Counter, Gauge
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
from synapse.logging.opentracing import inject_active_span_byte_dict, trace
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
_pending_outgoing_requests = Gauge(
"synapse_pending_outgoing_replication_requests",
"Number of active outgoing replication requests, by replication method name",
["name"],
)
_outgoing_request_counter = Counter(
"synapse_outgoing_replication_requests",
"Number of outgoing replication requests, by replication method name and result",
["name", "code"],
)
class ReplicationEndpoint(metaclass=abc.ABCMeta):
"""Helper base class for defining new replication HTTP endpoints.
@ -138,7 +148,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_map = hs.config.worker.instance_map
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
@trace(opname="outgoing_replication_request")
@outgoing_gauge.track_inprogress()
async def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
@ -193,23 +206,26 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
try:
result = await request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise
logger.warning("%s request timed out", cls.NAME)
logger.warning("%s request timed out; retrying", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except RequestSendFailed as e:
raise SynapseError(502, "Failed to talk to master") from e
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e
_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result
return send_request

View File

@ -96,8 +96,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
send_attempt = body["send_attempt"]
next_link = body.get("next_link") # Optional param
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
if next_link:
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
# The email will be sent to the stored address.
# This avoids a potential account hijack by requesting a password reset to
@ -372,8 +373,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
Codes.THREEPID_DENIED,
)
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
if next_link:
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
existing_user_id = await self.store.get_user_id_by_threepid("email", email)
@ -446,8 +448,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
Codes.THREEPID_DENIED,
)
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
if next_link:
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
existing_user_id = await self.store.get_user_id_by_threepid("msisdn", msisdn)

View File

@ -41,7 +41,14 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
"""
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
# Exclude app service users
sql = """
SELECT COALESCE(count(*), 0)
FROM monthly_active_users
LEFT JOIN users
ON monthly_active_users.user_id=users.name
WHERE (users.appservice_id IS NULL OR users.appservice_id = '');
"""
txn.execute(sql)
(count,) = txn.fetchone()
return count

View File

@ -449,18 +449,8 @@ class ReadWriteLock:
R = TypeVar("R")
def _cancelled_to_timed_out_error(value: R, timeout: float) -> R:
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise defer.TimeoutError(timeout, "Deferred")
return value
def timeout_deferred(
deferred: defer.Deferred,
timeout: float,
reactor: IReactorTime,
on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None,
deferred: defer.Deferred, timeout: float, reactor: IReactorTime,
) -> defer.Deferred:
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
@ -469,27 +459,21 @@ def timeout_deferred(
(See https://twistedmatrix.com/trac/ticket/9534)
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred.
NOTE: the TimeoutError raised by the resultant deferred is
twisted.internet.defer.TimeoutError, which is *different* to the built-in
TimeoutError, as well as various other TimeoutErrors you might have imported.
Args:
deferred: The Deferred to potentially timeout.
timeout: Timeout in seconds
reactor: The twisted reactor to use
on_timeout_cancel: A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
It takes an arbitrary value, which is the value of the deferred at
that exact point in time (probably a CancelledError Failure), and
the timeout.
The default callable (if none is provided) will translate a
CancelledError Failure into a defer.TimeoutError.
Returns:
A new Deferred.
A new Deferred, which will errback with defer.TimeoutError on timeout.
"""
new_d = defer.Deferred()
timed_out = [False]
@ -502,18 +486,23 @@ def timeout_deferred(
except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")
# the cancel() call should have set off a chain of errbacks which
# will have errbacked new_d, but in case it hasn't, errback it now.
if not new_d.called:
new_d.errback(defer.TimeoutError(timeout, "Deferred"))
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
def convert_cancelled(value: failure.Failure):
# if the orgininal deferred was cancelled, and our timeout has fired, then
# the reason it was cancelled was due to our timeout. Turn the CancelledError
# into a TimeoutError.
if timed_out[0] and value.check(CancelledError):
raise defer.TimeoutError("Timed out after %gs" % (timeout,))
return value
deferred.addBoth(convert_cancelled)
deferred.addErrback(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired

View File

@ -318,14 +318,14 @@ class FederationClientTests(HomeserverTestCase):
r = self.successResultOf(d)
self.assertEqual(r.code, 200)
def test_client_headers_no_body(self):
@parameterized.expand(["get_json", "post_json", "delete_json", "put_json"])
def test_timeout_reading_body(self, method_name: str):
"""
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
timed out, it'll give a RequestSendFailed with can_retry.
"""
d = defer.ensureDeferred(
self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)
)
method = getattr(self.cl, method_name)
d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000))
self.pump()
@ -349,7 +349,9 @@ class FederationClientTests(HomeserverTestCase):
self.reactor.advance(10.5)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, TimeoutError)
self.assertIsInstance(f.value, RequestSendFailed)
self.assertTrue(f.value.can_retry)
self.assertIsInstance(f.value.inner_exception, defer.TimeoutError)
def test_client_requires_trailing_slashes(self):
"""

View File

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from netaddr import IPSet
from twisted.internet import defer
from twisted.internet.error import DNSLookupError
from synapse.http import RequestTimedOutError
from synapse.http.client import SimpleHttpClient
from synapse.server import HomeServer
from tests.unittest import HomeserverTestCase
class SimpleHttpClientTests(HomeserverTestCase):
def prepare(self, reactor, clock, hs: "HomeServer"):
# Add a DNS entry for a test server
self.reactor.lookups["testserv"] = "1.2.3.4"
self.cl = hs.get_simple_http_client()
def test_dns_error(self):
"""
If the DNS lookup returns an error, it will bubble up.
"""
d = defer.ensureDeferred(self.cl.get_json("http://testserv2:8008/foo/bar"))
self.pump()
f = self.failureResultOf(d)
self.assertIsInstance(f.value, DNSLookupError)
def test_client_connection_refused(self):
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
self.pump()
# Nothing happened yet
self.assertNoResult(d)
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, factory, _timeout, _bindAddress) = clients[0]
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 8008)
e = Exception("go away")
factory.clientConnectionFailed(None, e)
self.pump(0.5)
f = self.failureResultOf(d)
self.assertIs(f.value, e)
def test_client_never_connect(self):
"""
If the HTTP request is not connected and is timed out, it'll give a
ConnectingCancelledError or TimeoutError.
"""
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
self.pump()
# Nothing happened yet
self.assertNoResult(d)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
self.assertEqual(clients[0][0], "1.2.3.4")
self.assertEqual(clients[0][1], 8008)
# Deferred is still without a result
self.assertNoResult(d)
# Push by enough to time it out
self.reactor.advance(120)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, RequestTimedOutError)
def test_client_connect_no_response(self):
"""
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
self.pump()
# Nothing happened yet
self.assertNoResult(d)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
self.assertEqual(clients[0][0], "1.2.3.4")
self.assertEqual(clients[0][1], 8008)
conn = Mock()
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)
# Deferred is still without a result
self.assertNoResult(d)
# Push by enough to time it out
self.reactor.advance(120)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, RequestTimedOutError)
def test_client_ip_range_blacklist(self):
"""Ensure that Synapse does not try to connect to blacklisted IPs"""
# Add some DNS entries we'll blacklist
self.reactor.lookups["internal"] = "127.0.0.1"
self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337"
ip_blacklist = IPSet(["127.0.0.0/8", "fe80::/64"])
cl = SimpleHttpClient(self.hs, ip_blacklist=ip_blacklist)
# Try making a GET request to a blacklisted IPv4 address
# ------------------------------------------------------
# Make the request
d = defer.ensureDeferred(cl.get_json("http://internal:8008/foo/bar"))
self.pump(1)
# Check that it was unable to resolve the address
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 0)
self.failureResultOf(d, DNSLookupError)
# Try making a POST request to a blacklisted IPv6 address
# -------------------------------------------------------
# Make the request
d = defer.ensureDeferred(
cl.post_json_get_json("http://internalv6:8008/foo/bar", {})
)
# Move the reactor forwards
self.pump(1)
# Check that it was unable to resolve the address
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 0)
# Check that it was due to a blacklisted DNS lookup
self.failureResultOf(d, DNSLookupError)
# Try making a GET request to a non-blacklisted IPv4 address
# ----------------------------------------------------------
# Make the request
d = defer.ensureDeferred(cl.get_json("http://testserv:8008/foo/bar"))
# Nothing has happened yet
self.assertNoResult(d)
# Move the reactor forwards
self.pump(1)
# Check that it was able to resolve the address
clients = self.reactor.tcpClients
self.assertNotEqual(len(clients), 0)
# Connection will still fail as this IP address does not resolve to anything
self.failureResultOf(d, RequestTimedOutError)

View File

@ -732,6 +732,12 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
@override_config({"next_link_domain_whitelist": ["example.com", "example.org"]})
def test_next_link_domain_whitelist(self):
"""Tests next_link parameters must fit the whitelist if provided"""
# Ensure not providing a next_link parameter still works
self._request_token(
"something@example.com", "some_secret", next_link=None, expect_code=200,
)
self._request_token(
"something@example.com",
"some_secret",

View File

@ -137,6 +137,21 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
count = self.get_success(self.store.get_monthly_active_count())
self.assertEqual(count, 1)
def test_appservice_user_not_counted_in_mau(self):
self.get_success(
self.store.register_user(
user_id="@appservice_user:server", appservice_id="wibble"
)
)
count = self.get_success(self.store.get_monthly_active_count())
self.assertEqual(count, 0)
d = self.store.upsert_monthly_active_user("@appservice_user:server")
self.get_success(d)
count = self.get_success(self.store.get_monthly_active_count())
self.assertEqual(count, 0)
def test_user_last_seen_monthly_active(self):
user_id1 = "@user1:server"
user_id2 = "@user2:server"
@ -383,7 +398,7 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
self.get_success(self.store.upsert_monthly_active_user(appservice2_user1))
count = self.get_success(self.store.get_monthly_active_count())
self.assertEqual(count, 4)
self.assertEqual(count, 1)
d = self.store.get_monthly_active_count_by_service()
result = self.get_success(d)