Compare commits
6 Commits
fe443acaee
...
f43c66d23b
Author | SHA1 | Date |
---|---|---|
Andrew Morgan | f43c66d23b | |
Andrew Morgan | 12f0d18611 | |
Will Hunt | 8676d8ab2e | |
Andrew Morgan | 1c6b8752b8 | |
Richard van der Hoff | 866c84da8d | |
Richard van der Hoff | 1c262431f9 |
|
@ -0,0 +1 @@
|
|||
Support testing the local Synapse checkout against the [Complement homeserver test suite](https://github.com/matrix-org/complement/).
|
|
@ -0,0 +1 @@
|
|||
Fix incorrect handling of timeouts on outgoing HTTP requests.
|
|
@ -0,0 +1 @@
|
|||
Do not include appservice users when calculating the total MAU for a server.
|
|
@ -0,0 +1 @@
|
|||
Add prometheus metrics for replication requests.
|
|
@ -0,0 +1 @@
|
|||
Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number.
|
|
@ -0,0 +1,22 @@
|
|||
#! /bin/bash -eu
|
||||
# This script is designed for developers who want to test their code
|
||||
# against Complement.
|
||||
#
|
||||
# It makes a Synapse image which represents the current checkout,
|
||||
# then downloads Complement and runs it with that image.
|
||||
|
||||
cd "$(dirname $0)/.."
|
||||
|
||||
# Build the base Synapse image from the local checkout
|
||||
docker build -t matrixdotorg/synapse:latest -f docker/Dockerfile .
|
||||
|
||||
# Download Complement
|
||||
wget -N https://github.com/matrix-org/complement/archive/master.tar.gz
|
||||
tar -xzf master.tar.gz
|
||||
cd complement-master
|
||||
|
||||
# Build the Synapse image from Complement, based on the above image we just built
|
||||
docker build -t complement-synapse -f dockerfiles/Synapse.Dockerfile ./dockerfiles
|
||||
|
||||
# Run the tests on the resulting image!
|
||||
COMPLEMENT_BASE_IMAGE=complement-synapse go test -v -count=1 ./tests
|
|
@ -21,8 +21,6 @@ import logging
|
|||
import urllib.parse
|
||||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from twisted.internet.error import TimeoutError
|
||||
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
Codes,
|
||||
|
@ -30,6 +28,7 @@ from synapse.api.errors import (
|
|||
SynapseError,
|
||||
)
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.types import JsonDict, Requester
|
||||
from synapse.util import json_decoder
|
||||
|
@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler):
|
|||
|
||||
try:
|
||||
data = await self.http_client.get_json(url, query_params)
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except HttpResponseException as e:
|
||||
logger.info(
|
||||
|
@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler):
|
|||
if e.code != 404 or not use_v2:
|
||||
logger.error("3PID bind failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except CodeMessageException as e:
|
||||
data = json_decoder.decode(e.msg) # XXX WAT?
|
||||
|
@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler):
|
|||
else:
|
||||
logger.error("Failed to unbind threepid on identity server: %s", e)
|
||||
raise SynapseError(500, "Failed to contact identity server")
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
await self.store.remove_user_bound_threepid(
|
||||
|
@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler):
|
|||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
async def requestMsisdnToken(
|
||||
|
@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler):
|
|||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
assert self.hs.config.public_baseurl
|
||||
|
@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler):
|
|||
id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
|
||||
body,
|
||||
)
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except HttpResponseException as e:
|
||||
logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
|
||||
|
@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler):
|
|||
# require or validate it. See the following for context:
|
||||
# https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
|
||||
return data["mxid"]
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except IOError as e:
|
||||
logger.warning("Error from v1 identity server lookup: %s" % (e,))
|
||||
|
@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler):
|
|||
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
|
||||
{"access_token": id_access_token},
|
||||
)
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
if not isinstance(hash_details, dict):
|
||||
|
@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler):
|
|||
},
|
||||
headers=headers,
|
||||
)
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except Exception as e:
|
||||
logger.warning("Error when performing a v2 3pid lookup: %s", e)
|
||||
|
@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler):
|
|||
invite_config,
|
||||
{"Authorization": create_id_access_token_header(id_access_token)},
|
||||
)
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except HttpResponseException as e:
|
||||
if e.code != 404:
|
||||
|
@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler):
|
|||
data = await self.blacklisting_http_client.post_json_get_json(
|
||||
url, invite_config
|
||||
)
|
||||
except TimeoutError:
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except HttpResponseException as e:
|
||||
logger.warning(
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
import re
|
||||
|
||||
from twisted.internet import task
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.python import failure
|
||||
from twisted.web.client import FileBodyProducer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
|
@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError
|
|||
class RequestTimedOutError(SynapseError):
|
||||
"""Exception representing timeout of an outbound request"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(504, "Timed out")
|
||||
|
||||
|
||||
def cancelled_to_request_timed_out_error(value, timeout):
|
||||
"""Turns CancelledErrors into RequestTimedOutErrors.
|
||||
|
||||
For use with async.add_timeout_to_deferred
|
||||
"""
|
||||
if isinstance(value, failure.Failure):
|
||||
value.trap(CancelledError)
|
||||
raise RequestTimedOutError()
|
||||
return value
|
||||
def __init__(self, msg):
|
||||
super().__init__(504, msg)
|
||||
|
||||
|
||||
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
from io import BytesIO
|
||||
|
@ -38,7 +37,7 @@ from zope.interface import implementer, provider
|
|||
|
||||
from OpenSSL import SSL
|
||||
from OpenSSL.SSL import VERIFY_NONE
|
||||
from twisted.internet import defer, protocol, ssl
|
||||
from twisted.internet import defer, error as twisted_error, protocol, ssl
|
||||
from twisted.internet.interfaces import (
|
||||
IReactorPluggableNameResolver,
|
||||
IResolutionReceiver,
|
||||
|
@ -46,17 +45,18 @@ from twisted.internet.interfaces import (
|
|||
from twisted.internet.task import Cooperator
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web._newclient import ResponseDone
|
||||
from twisted.web.client import Agent, HTTPConnectionPool, readBody
|
||||
from twisted.web.client import (
|
||||
Agent,
|
||||
HTTPConnectionPool,
|
||||
ResponseNeverReceived,
|
||||
readBody,
|
||||
)
|
||||
from twisted.web.http import PotentialDataLoss
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IResponse
|
||||
|
||||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
||||
from synapse.http import (
|
||||
QuieterFileBodyProducer,
|
||||
cancelled_to_request_timed_out_error,
|
||||
redact_uri,
|
||||
)
|
||||
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
|
||||
from synapse.http.proxyagent import ProxyAgent
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||
|
@ -332,8 +332,6 @@ class SimpleHttpClient:
|
|||
RequestTimedOutError if the request times out before the headers are read
|
||||
|
||||
"""
|
||||
# A small wrapper around self.agent.request() so we can easily attach
|
||||
# counters to it
|
||||
outgoing_requests_counter.labels(method).inc()
|
||||
|
||||
# log request but strip `access_token` (AS requests for example include this)
|
||||
|
@ -362,15 +360,17 @@ class SimpleHttpClient:
|
|||
data=body_producer,
|
||||
headers=headers,
|
||||
**self._extra_treq_args
|
||||
)
|
||||
) # type: defer.Deferred
|
||||
|
||||
# we use our own timeout mechanism rather than treq's as a workaround
|
||||
# for https://twistedmatrix.com/trac/ticket/9534.
|
||||
request_deferred = timeout_deferred(
|
||||
request_deferred,
|
||||
60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
request_deferred, 60, self.hs.get_reactor(),
|
||||
)
|
||||
|
||||
# turn timeouts into RequestTimedOutErrors
|
||||
request_deferred.addErrback(_timeout_to_request_timed_out_error)
|
||||
|
||||
response = await make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
|
@ -410,7 +410,7 @@ class SimpleHttpClient:
|
|||
parsed json
|
||||
|
||||
Raises:
|
||||
RequestTimedOutException: if there is a timeout before the response headers
|
||||
RequestTimedOutError: if there is a timeout before the response headers
|
||||
are received. Note there is currently no timeout on reading the response
|
||||
body.
|
||||
|
||||
|
@ -461,7 +461,7 @@ class SimpleHttpClient:
|
|||
parsed json
|
||||
|
||||
Raises:
|
||||
RequestTimedOutException: if there is a timeout before the response headers
|
||||
RequestTimedOutError: if there is a timeout before the response headers
|
||||
are received. Note there is currently no timeout on reading the response
|
||||
body.
|
||||
|
||||
|
@ -506,7 +506,7 @@ class SimpleHttpClient:
|
|||
Returns:
|
||||
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
|
||||
Raises:
|
||||
RequestTimedOutException: if there is a timeout before the response headers
|
||||
RequestTimedOutError: if there is a timeout before the response headers
|
||||
are received. Note there is currently no timeout on reading the response
|
||||
body.
|
||||
|
||||
|
@ -538,7 +538,7 @@ class SimpleHttpClient:
|
|||
Returns:
|
||||
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
|
||||
Raises:
|
||||
RequestTimedOutException: if there is a timeout before the response headers
|
||||
RequestTimedOutError: if there is a timeout before the response headers
|
||||
are received. Note there is currently no timeout on reading the response
|
||||
body.
|
||||
|
||||
|
@ -586,7 +586,7 @@ class SimpleHttpClient:
|
|||
Succeeds when we get a 2xx HTTP response, with the
|
||||
HTTP body as bytes.
|
||||
Raises:
|
||||
RequestTimedOutException: if there is a timeout before the response headers
|
||||
RequestTimedOutError: if there is a timeout before the response headers
|
||||
are received. Note there is currently no timeout on reading the response
|
||||
body.
|
||||
|
||||
|
@ -631,7 +631,7 @@ class SimpleHttpClient:
|
|||
headers, absolute URI of the response and HTTP response code.
|
||||
|
||||
Raises:
|
||||
RequestTimedOutException: if there is a timeout before the response headers
|
||||
RequestTimedOutError: if there is a timeout before the response headers
|
||||
are received. Note there is currently no timeout on reading the response
|
||||
body.
|
||||
|
||||
|
@ -684,6 +684,18 @@ class SimpleHttpClient:
|
|||
)
|
||||
|
||||
|
||||
def _timeout_to_request_timed_out_error(f: Failure):
|
||||
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
|
||||
# The TCP connection has its own timeout (set by the 'connectTimeout' param
|
||||
# on the Agent), which raises twisted_error.TimeoutError exception.
|
||||
raise RequestTimedOutError("Timeout connecting to remote server")
|
||||
elif f.check(defer.TimeoutError, ResponseNeverReceived):
|
||||
# this one means that we hit our overall timeout on the request
|
||||
raise RequestTimedOutError("Timeout waiting for response from remote server")
|
||||
|
||||
return f
|
||||
|
||||
|
||||
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
|
||||
# The two should be factored out.
|
||||
|
||||
|
|
|
@ -171,7 +171,7 @@ async def _handle_json_response(
|
|||
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
||||
|
||||
body = await make_deferred_yieldable(d)
|
||||
except TimeoutError as e:
|
||||
except defer.TimeoutError as e:
|
||||
logger.warning(
|
||||
"{%s} [%s] Timed out reading response - %s %s",
|
||||
request.txn_id,
|
||||
|
@ -655,10 +655,14 @@ class MatrixFederationHttpClient:
|
|||
long_retries (bool): whether to use the long retry algorithm. See
|
||||
docs on _send_request for details.
|
||||
|
||||
timeout (int|None): number of milliseconds to wait for the response headers
|
||||
(including connecting to the server), *for each attempt*.
|
||||
timeout (int|None): number of milliseconds to wait for the response.
|
||||
self._default_timeout (60s) by default.
|
||||
|
||||
Note that we may make several attempts to send the request; this
|
||||
timeout applies to the time spent waiting for response headers for
|
||||
*each* attempt (including connection time) as well as the time spent
|
||||
reading the response body after a 200 response.
|
||||
|
||||
ignore_backoff (bool): true to ignore the historical backoff data
|
||||
and try the request anyway.
|
||||
backoff_on_404 (bool): True if we should count a 404 response as
|
||||
|
@ -704,8 +708,13 @@ class MatrixFederationHttpClient:
|
|||
timeout=timeout,
|
||||
)
|
||||
|
||||
if timeout is not None:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
body = await _handle_json_response(
|
||||
self.reactor, self.default_timeout, request, response, start_ms
|
||||
self.reactor, _sec_timeout, request, response, start_ms
|
||||
)
|
||||
|
||||
return body
|
||||
|
@ -734,10 +743,14 @@ class MatrixFederationHttpClient:
|
|||
long_retries (bool): whether to use the long retry algorithm. See
|
||||
docs on _send_request for details.
|
||||
|
||||
timeout (int|None): number of milliseconds to wait for the response headers
|
||||
(including connecting to the server), *for each attempt*.
|
||||
timeout (int|None): number of milliseconds to wait for the response.
|
||||
self._default_timeout (60s) by default.
|
||||
|
||||
Note that we may make several attempts to send the request; this
|
||||
timeout applies to the time spent waiting for response headers for
|
||||
*each* attempt (including connection time) as well as the time spent
|
||||
reading the response body after a 200 response.
|
||||
|
||||
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||
try the request anyway.
|
||||
|
||||
|
@ -801,10 +814,14 @@ class MatrixFederationHttpClient:
|
|||
args (dict|None): A dictionary used to create query strings, defaults to
|
||||
None.
|
||||
|
||||
timeout (int|None): number of milliseconds to wait for the response headers
|
||||
(including connecting to the server), *for each attempt*.
|
||||
timeout (int|None): number of milliseconds to wait for the response.
|
||||
self._default_timeout (60s) by default.
|
||||
|
||||
Note that we may make several attempts to send the request; this
|
||||
timeout applies to the time spent waiting for response headers for
|
||||
*each* attempt (including connection time) as well as the time spent
|
||||
reading the response body after a 200 response.
|
||||
|
||||
ignore_backoff (bool): true to ignore the historical backoff data
|
||||
and try the request anyway.
|
||||
|
||||
|
@ -840,8 +857,13 @@ class MatrixFederationHttpClient:
|
|||
timeout=timeout,
|
||||
)
|
||||
|
||||
if timeout is not None:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
body = await _handle_json_response(
|
||||
self.reactor, self.default_timeout, request, response, start_ms
|
||||
self.reactor, _sec_timeout, request, response, start_ms
|
||||
)
|
||||
|
||||
return body
|
||||
|
@ -865,10 +887,14 @@ class MatrixFederationHttpClient:
|
|||
long_retries (bool): whether to use the long retry algorithm. See
|
||||
docs on _send_request for details.
|
||||
|
||||
timeout (int|None): number of milliseconds to wait for the response headers
|
||||
(including connecting to the server), *for each attempt*.
|
||||
timeout (int|None): number of milliseconds to wait for the response.
|
||||
self._default_timeout (60s) by default.
|
||||
|
||||
Note that we may make several attempts to send the request; this
|
||||
timeout applies to the time spent waiting for response headers for
|
||||
*each* attempt (including connection time) as well as the time spent
|
||||
reading the response body after a 200 response.
|
||||
|
||||
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||
try the request anyway.
|
||||
|
||||
|
@ -900,8 +926,13 @@ class MatrixFederationHttpClient:
|
|||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
|
||||
if timeout is not None:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
body = await _handle_json_response(
|
||||
self.reactor, self.default_timeout, request, response, start_ms
|
||||
self.reactor, _sec_timeout, request, response, start_ms
|
||||
)
|
||||
return body
|
||||
|
||||
|
|
|
@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase):
|
|||
`BrowserLikePolicyForHTTPS`, so unless you have special
|
||||
requirements you can leave this as-is.
|
||||
|
||||
connectTimeout (float): The amount of time that this Agent will wait
|
||||
for the peer to accept a connection.
|
||||
connectTimeout (Optional[float]): The amount of time that this Agent will wait
|
||||
for the peer to accept a connection, in seconds. If 'None',
|
||||
HostnameEndpoint's default (30s) will be used.
|
||||
|
||||
This is used for connections to both proxies and destination servers.
|
||||
|
||||
bindAddress (bytes): The local address for client sockets to bind to.
|
||||
|
||||
|
@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase):
|
|||
Returns:
|
||||
Deferred[IResponse]: completes when the header of the response has
|
||||
been received (regardless of the response status code).
|
||||
|
||||
Can fail with:
|
||||
SchemeNotSupported: if the uri is not http or https
|
||||
|
||||
twisted.internet.error.TimeoutError if the server we are connecting
|
||||
to (proxy or destination) does not accept a connection before
|
||||
connectTimeout.
|
||||
|
||||
... other things too.
|
||||
"""
|
||||
uri = uri.strip()
|
||||
if not _VALID_URI.match(uri):
|
||||
|
|
|
@ -20,18 +20,28 @@ import urllib
|
|||
from inspect import signature
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
HttpResponseException,
|
||||
RequestSendFailed,
|
||||
SynapseError,
|
||||
)
|
||||
from prometheus_client import Counter, Gauge
|
||||
|
||||
from synapse.api.errors import HttpResponseException, SynapseError
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.logging.opentracing import inject_active_span_byte_dict, trace
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_pending_outgoing_requests = Gauge(
|
||||
"synapse_pending_outgoing_replication_requests",
|
||||
"Number of active outgoing replication requests, by replication method name",
|
||||
["name"],
|
||||
)
|
||||
|
||||
_outgoing_request_counter = Counter(
|
||||
"synapse_outgoing_replication_requests",
|
||||
"Number of outgoing replication requests, by replication method name and result",
|
||||
["name", "code"],
|
||||
)
|
||||
|
||||
|
||||
class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
"""Helper base class for defining new replication HTTP endpoints.
|
||||
|
@ -138,7 +148,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
|||
|
||||
instance_map = hs.config.worker.instance_map
|
||||
|
||||
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
|
||||
|
||||
@trace(opname="outgoing_replication_request")
|
||||
@outgoing_gauge.track_inprogress()
|
||||
async def send_request(instance_name="master", **kwargs):
|
||||
if instance_name == local_instance_name:
|
||||
raise Exception("Trying to send HTTP request to self")
|
||||
|
@ -193,23 +206,26 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
|||
try:
|
||||
result = await request_func(uri, data, headers=headers)
|
||||
break
|
||||
except CodeMessageException as e:
|
||||
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
|
||||
except RequestTimedOutError:
|
||||
if not cls.RETRY_ON_TIMEOUT:
|
||||
raise
|
||||
|
||||
logger.warning("%s request timed out", cls.NAME)
|
||||
logger.warning("%s request timed out; retrying", cls.NAME)
|
||||
|
||||
# If we timed out we probably don't need to worry about backing
|
||||
# off too much, but lets just wait a little anyway.
|
||||
await clock.sleep(1)
|
||||
except HttpResponseException as e:
|
||||
# We convert to SynapseError as we know that it was a SynapseError
|
||||
# on the master process that we should send to the client. (And
|
||||
# on the main process that we should send to the client. (And
|
||||
# importantly, not stack traces everywhere)
|
||||
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed as e:
|
||||
raise SynapseError(502, "Failed to talk to master") from e
|
||||
except Exception as e:
|
||||
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
|
||||
raise SynapseError(502, "Failed to talk to main process") from e
|
||||
|
||||
_outgoing_request_counter.labels(cls.NAME, 200).inc()
|
||||
return result
|
||||
|
||||
return send_request
|
||||
|
|
|
@ -96,8 +96,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
|||
send_attempt = body["send_attempt"]
|
||||
next_link = body.get("next_link") # Optional param
|
||||
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
if next_link:
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
|
||||
# The email will be sent to the stored address.
|
||||
# This avoids a potential account hijack by requesting a password reset to
|
||||
|
@ -372,8 +373,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
|||
Codes.THREEPID_DENIED,
|
||||
)
|
||||
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
if next_link:
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
|
||||
existing_user_id = await self.store.get_user_id_by_threepid("email", email)
|
||||
|
||||
|
@ -446,8 +448,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
|
|||
Codes.THREEPID_DENIED,
|
||||
)
|
||||
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
if next_link:
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
|
||||
existing_user_id = await self.store.get_user_id_by_threepid("msisdn", msisdn)
|
||||
|
||||
|
|
|
@ -41,7 +41,14 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
def _count_users(txn):
|
||||
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
|
||||
# Exclude app service users
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0)
|
||||
FROM monthly_active_users
|
||||
LEFT JOIN users
|
||||
ON monthly_active_users.user_id=users.name
|
||||
WHERE (users.appservice_id IS NULL OR users.appservice_id = '');
|
||||
"""
|
||||
txn.execute(sql)
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
|
|
@ -449,18 +449,8 @@ class ReadWriteLock:
|
|||
R = TypeVar("R")
|
||||
|
||||
|
||||
def _cancelled_to_timed_out_error(value: R, timeout: float) -> R:
|
||||
if isinstance(value, failure.Failure):
|
||||
value.trap(CancelledError)
|
||||
raise defer.TimeoutError(timeout, "Deferred")
|
||||
return value
|
||||
|
||||
|
||||
def timeout_deferred(
|
||||
deferred: defer.Deferred,
|
||||
timeout: float,
|
||||
reactor: IReactorTime,
|
||||
on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None,
|
||||
deferred: defer.Deferred, timeout: float, reactor: IReactorTime,
|
||||
) -> defer.Deferred:
|
||||
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
|
||||
that have a canceller that throws exceptions. This method creates a new
|
||||
|
@ -469,27 +459,21 @@ def timeout_deferred(
|
|||
|
||||
(See https://twistedmatrix.com/trac/ticket/9534)
|
||||
|
||||
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
|
||||
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred.
|
||||
|
||||
NOTE: the TimeoutError raised by the resultant deferred is
|
||||
twisted.internet.defer.TimeoutError, which is *different* to the built-in
|
||||
TimeoutError, as well as various other TimeoutErrors you might have imported.
|
||||
|
||||
Args:
|
||||
deferred: The Deferred to potentially timeout.
|
||||
timeout: Timeout in seconds
|
||||
reactor: The twisted reactor to use
|
||||
on_timeout_cancel: A callable which is called immediately
|
||||
after the deferred times out, and not if this deferred is
|
||||
otherwise cancelled before the timeout.
|
||||
|
||||
It takes an arbitrary value, which is the value of the deferred at
|
||||
that exact point in time (probably a CancelledError Failure), and
|
||||
the timeout.
|
||||
|
||||
The default callable (if none is provided) will translate a
|
||||
CancelledError Failure into a defer.TimeoutError.
|
||||
|
||||
Returns:
|
||||
A new Deferred.
|
||||
A new Deferred, which will errback with defer.TimeoutError on timeout.
|
||||
"""
|
||||
|
||||
new_d = defer.Deferred()
|
||||
|
||||
timed_out = [False]
|
||||
|
@ -502,18 +486,23 @@ def timeout_deferred(
|
|||
except: # noqa: E722, if we throw any exception it'll break time outs
|
||||
logger.exception("Canceller failed during timeout")
|
||||
|
||||
# the cancel() call should have set off a chain of errbacks which
|
||||
# will have errbacked new_d, but in case it hasn't, errback it now.
|
||||
|
||||
if not new_d.called:
|
||||
new_d.errback(defer.TimeoutError(timeout, "Deferred"))
|
||||
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
|
||||
|
||||
delayed_call = reactor.callLater(timeout, time_it_out)
|
||||
|
||||
def convert_cancelled(value):
|
||||
if timed_out[0]:
|
||||
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
|
||||
return to_call(value, timeout)
|
||||
def convert_cancelled(value: failure.Failure):
|
||||
# if the orgininal deferred was cancelled, and our timeout has fired, then
|
||||
# the reason it was cancelled was due to our timeout. Turn the CancelledError
|
||||
# into a TimeoutError.
|
||||
if timed_out[0] and value.check(CancelledError):
|
||||
raise defer.TimeoutError("Timed out after %gs" % (timeout,))
|
||||
return value
|
||||
|
||||
deferred.addBoth(convert_cancelled)
|
||||
deferred.addErrback(convert_cancelled)
|
||||
|
||||
def cancel_timeout(result):
|
||||
# stop the pending call to cancel the deferred if it's been fired
|
||||
|
|
|
@ -318,14 +318,14 @@ class FederationClientTests(HomeserverTestCase):
|
|||
r = self.successResultOf(d)
|
||||
self.assertEqual(r.code, 200)
|
||||
|
||||
def test_client_headers_no_body(self):
|
||||
@parameterized.expand(["get_json", "post_json", "delete_json", "put_json"])
|
||||
def test_timeout_reading_body(self, method_name: str):
|
||||
"""
|
||||
If the HTTP request is connected, but gets no response before being
|
||||
timed out, it'll give a ResponseNeverReceived.
|
||||
timed out, it'll give a RequestSendFailed with can_retry.
|
||||
"""
|
||||
d = defer.ensureDeferred(
|
||||
self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)
|
||||
)
|
||||
method = getattr(self.cl, method_name)
|
||||
d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000))
|
||||
|
||||
self.pump()
|
||||
|
||||
|
@ -349,7 +349,9 @@ class FederationClientTests(HomeserverTestCase):
|
|||
self.reactor.advance(10.5)
|
||||
f = self.failureResultOf(d)
|
||||
|
||||
self.assertIsInstance(f.value, TimeoutError)
|
||||
self.assertIsInstance(f.value, RequestSendFailed)
|
||||
self.assertTrue(f.value.can_retry)
|
||||
self.assertIsInstance(f.value.inner_exception, defer.TimeoutError)
|
||||
|
||||
def test_client_requires_trailing_slashes(self):
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from mock import Mock
|
||||
|
||||
from netaddr import IPSet
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.error import DNSLookupError
|
||||
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.server import HomeServer
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class SimpleHttpClientTests(HomeserverTestCase):
|
||||
def prepare(self, reactor, clock, hs: "HomeServer"):
|
||||
# Add a DNS entry for a test server
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
|
||||
self.cl = hs.get_simple_http_client()
|
||||
|
||||
def test_dns_error(self):
|
||||
"""
|
||||
If the DNS lookup returns an error, it will bubble up.
|
||||
"""
|
||||
d = defer.ensureDeferred(self.cl.get_json("http://testserv2:8008/foo/bar"))
|
||||
self.pump()
|
||||
|
||||
f = self.failureResultOf(d)
|
||||
self.assertIsInstance(f.value, DNSLookupError)
|
||||
|
||||
def test_client_connection_refused(self):
|
||||
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
|
||||
|
||||
self.pump()
|
||||
|
||||
# Nothing happened yet
|
||||
self.assertNoResult(d)
|
||||
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, factory, _timeout, _bindAddress) = clients[0]
|
||||
self.assertEqual(host, "1.2.3.4")
|
||||
self.assertEqual(port, 8008)
|
||||
e = Exception("go away")
|
||||
factory.clientConnectionFailed(None, e)
|
||||
self.pump(0.5)
|
||||
|
||||
f = self.failureResultOf(d)
|
||||
|
||||
self.assertIs(f.value, e)
|
||||
|
||||
def test_client_never_connect(self):
|
||||
"""
|
||||
If the HTTP request is not connected and is timed out, it'll give a
|
||||
ConnectingCancelledError or TimeoutError.
|
||||
"""
|
||||
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
|
||||
|
||||
self.pump()
|
||||
|
||||
# Nothing happened yet
|
||||
self.assertNoResult(d)
|
||||
|
||||
# Make sure treq is trying to connect
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
self.assertEqual(clients[0][0], "1.2.3.4")
|
||||
self.assertEqual(clients[0][1], 8008)
|
||||
|
||||
# Deferred is still without a result
|
||||
self.assertNoResult(d)
|
||||
|
||||
# Push by enough to time it out
|
||||
self.reactor.advance(120)
|
||||
f = self.failureResultOf(d)
|
||||
|
||||
self.assertIsInstance(f.value, RequestTimedOutError)
|
||||
|
||||
def test_client_connect_no_response(self):
|
||||
"""
|
||||
If the HTTP request is connected, but gets no response before being
|
||||
timed out, it'll give a ResponseNeverReceived.
|
||||
"""
|
||||
d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
|
||||
|
||||
self.pump()
|
||||
|
||||
# Nothing happened yet
|
||||
self.assertNoResult(d)
|
||||
|
||||
# Make sure treq is trying to connect
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
self.assertEqual(clients[0][0], "1.2.3.4")
|
||||
self.assertEqual(clients[0][1], 8008)
|
||||
|
||||
conn = Mock()
|
||||
client = clients[0][2].buildProtocol(None)
|
||||
client.makeConnection(conn)
|
||||
|
||||
# Deferred is still without a result
|
||||
self.assertNoResult(d)
|
||||
|
||||
# Push by enough to time it out
|
||||
self.reactor.advance(120)
|
||||
f = self.failureResultOf(d)
|
||||
|
||||
self.assertIsInstance(f.value, RequestTimedOutError)
|
||||
|
||||
def test_client_ip_range_blacklist(self):
|
||||
"""Ensure that Synapse does not try to connect to blacklisted IPs"""
|
||||
|
||||
# Add some DNS entries we'll blacklist
|
||||
self.reactor.lookups["internal"] = "127.0.0.1"
|
||||
self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337"
|
||||
ip_blacklist = IPSet(["127.0.0.0/8", "fe80::/64"])
|
||||
|
||||
cl = SimpleHttpClient(self.hs, ip_blacklist=ip_blacklist)
|
||||
|
||||
# Try making a GET request to a blacklisted IPv4 address
|
||||
# ------------------------------------------------------
|
||||
# Make the request
|
||||
d = defer.ensureDeferred(cl.get_json("http://internal:8008/foo/bar"))
|
||||
self.pump(1)
|
||||
|
||||
# Check that it was unable to resolve the address
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 0)
|
||||
|
||||
self.failureResultOf(d, DNSLookupError)
|
||||
|
||||
# Try making a POST request to a blacklisted IPv6 address
|
||||
# -------------------------------------------------------
|
||||
# Make the request
|
||||
d = defer.ensureDeferred(
|
||||
cl.post_json_get_json("http://internalv6:8008/foo/bar", {})
|
||||
)
|
||||
|
||||
# Move the reactor forwards
|
||||
self.pump(1)
|
||||
|
||||
# Check that it was unable to resolve the address
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 0)
|
||||
|
||||
# Check that it was due to a blacklisted DNS lookup
|
||||
self.failureResultOf(d, DNSLookupError)
|
||||
|
||||
# Try making a GET request to a non-blacklisted IPv4 address
|
||||
# ----------------------------------------------------------
|
||||
# Make the request
|
||||
d = defer.ensureDeferred(cl.get_json("http://testserv:8008/foo/bar"))
|
||||
|
||||
# Nothing has happened yet
|
||||
self.assertNoResult(d)
|
||||
|
||||
# Move the reactor forwards
|
||||
self.pump(1)
|
||||
|
||||
# Check that it was able to resolve the address
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertNotEqual(len(clients), 0)
|
||||
|
||||
# Connection will still fail as this IP address does not resolve to anything
|
||||
self.failureResultOf(d, RequestTimedOutError)
|
|
@ -732,6 +732,12 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
|
|||
@override_config({"next_link_domain_whitelist": ["example.com", "example.org"]})
|
||||
def test_next_link_domain_whitelist(self):
|
||||
"""Tests next_link parameters must fit the whitelist if provided"""
|
||||
|
||||
# Ensure not providing a next_link parameter still works
|
||||
self._request_token(
|
||||
"something@example.com", "some_secret", next_link=None, expect_code=200,
|
||||
)
|
||||
|
||||
self._request_token(
|
||||
"something@example.com",
|
||||
"some_secret",
|
||||
|
|
|
@ -137,6 +137,21 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
|
|||
count = self.get_success(self.store.get_monthly_active_count())
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
def test_appservice_user_not_counted_in_mau(self):
|
||||
self.get_success(
|
||||
self.store.register_user(
|
||||
user_id="@appservice_user:server", appservice_id="wibble"
|
||||
)
|
||||
)
|
||||
count = self.get_success(self.store.get_monthly_active_count())
|
||||
self.assertEqual(count, 0)
|
||||
|
||||
d = self.store.upsert_monthly_active_user("@appservice_user:server")
|
||||
self.get_success(d)
|
||||
|
||||
count = self.get_success(self.store.get_monthly_active_count())
|
||||
self.assertEqual(count, 0)
|
||||
|
||||
def test_user_last_seen_monthly_active(self):
|
||||
user_id1 = "@user1:server"
|
||||
user_id2 = "@user2:server"
|
||||
|
@ -383,7 +398,7 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
|
|||
self.get_success(self.store.upsert_monthly_active_user(appservice2_user1))
|
||||
|
||||
count = self.get_success(self.store.get_monthly_active_count())
|
||||
self.assertEqual(count, 4)
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
d = self.store.get_monthly_active_count_by_service()
|
||||
result = self.get_success(d)
|
||||
|
|
Loading…
Reference in New Issue