Merge branch 'develop' of github.com:matrix-org/synapse into anoa/info-mainline-no-check-password-reset
						commit
						f43c66d23b
					
				| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Support testing the local Synapse checkout against the [Complement homeserver test suite](https://github.com/matrix-org/complement/).
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Fix incorrect handling of timeouts on outgoing HTTP requests.
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Do not include appservice users when calculating the total MAU for a server.
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Add prometheus metrics for replication requests.
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number.
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,22 @@
 | 
			
		|||
#! /bin/bash -eu
 | 
			
		||||
# This script is designed for developers who want to test their code
 | 
			
		||||
# against Complement.
 | 
			
		||||
#
 | 
			
		||||
# It makes a Synapse image which represents the current checkout,
 | 
			
		||||
# then downloads Complement and runs it with that image.
 | 
			
		||||
 | 
			
		||||
cd "$(dirname $0)/.."
 | 
			
		||||
 | 
			
		||||
# Build the base Synapse image from the local checkout
 | 
			
		||||
docker build -t matrixdotorg/synapse:latest -f docker/Dockerfile .
 | 
			
		||||
 | 
			
		||||
# Download Complement
 | 
			
		||||
wget -N https://github.com/matrix-org/complement/archive/master.tar.gz
 | 
			
		||||
tar -xzf master.tar.gz
 | 
			
		||||
cd complement-master
 | 
			
		||||
 | 
			
		||||
# Build the Synapse image from Complement, based on the above image we just built
 | 
			
		||||
docker build -t complement-synapse -f dockerfiles/Synapse.Dockerfile ./dockerfiles
 | 
			
		||||
 | 
			
		||||
# Run the tests on the resulting image!
 | 
			
		||||
COMPLEMENT_BASE_IMAGE=complement-synapse go test -v -count=1 ./tests
 | 
			
		||||
| 
						 | 
				
			
			@ -21,8 +21,6 @@ import logging
 | 
			
		|||
import urllib.parse
 | 
			
		||||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
 | 
			
		||||
 | 
			
		||||
from twisted.internet.error import TimeoutError
 | 
			
		||||
 | 
			
		||||
from synapse.api.errors import (
 | 
			
		||||
    CodeMessageException,
 | 
			
		||||
    Codes,
 | 
			
		||||
| 
						 | 
				
			
			@ -30,6 +28,7 @@ from synapse.api.errors import (
 | 
			
		|||
    SynapseError,
 | 
			
		||||
)
 | 
			
		||||
from synapse.config.emailconfig import ThreepidBehaviour
 | 
			
		||||
from synapse.http import RequestTimedOutError
 | 
			
		||||
from synapse.http.client import SimpleHttpClient
 | 
			
		||||
from synapse.types import JsonDict, Requester
 | 
			
		||||
from synapse.util import json_decoder
 | 
			
		||||
| 
						 | 
				
			
			@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
 | 
			
		||||
        try:
 | 
			
		||||
            data = await self.http_client.get_json(url, query_params)
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
        except HttpResponseException as e:
 | 
			
		||||
            logger.info(
 | 
			
		||||
| 
						 | 
				
			
			@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
            if e.code != 404 or not use_v2:
 | 
			
		||||
                logger.error("3PID bind failed with Matrix error: %r", e)
 | 
			
		||||
                raise e.to_synapse_error()
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
        except CodeMessageException as e:
 | 
			
		||||
            data = json_decoder.decode(e.msg)  # XXX WAT?
 | 
			
		||||
| 
						 | 
				
			
			@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
            else:
 | 
			
		||||
                logger.error("Failed to unbind threepid on identity server: %s", e)
 | 
			
		||||
                raise SynapseError(500, "Failed to contact identity server")
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
 | 
			
		||||
        await self.store.remove_user_bound_threepid(
 | 
			
		||||
| 
						 | 
				
			
			@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
        except HttpResponseException as e:
 | 
			
		||||
            logger.info("Proxied requestToken failed: %r", e)
 | 
			
		||||
            raise e.to_synapse_error()
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
 | 
			
		||||
    async def requestMsisdnToken(
 | 
			
		||||
| 
						 | 
				
			
			@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
        except HttpResponseException as e:
 | 
			
		||||
            logger.info("Proxied requestToken failed: %r", e)
 | 
			
		||||
            raise e.to_synapse_error()
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
 | 
			
		||||
        assert self.hs.config.public_baseurl
 | 
			
		||||
| 
						 | 
				
			
			@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
                id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
 | 
			
		||||
                body,
 | 
			
		||||
            )
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
        except HttpResponseException as e:
 | 
			
		||||
            logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
 | 
			
		||||
| 
						 | 
				
			
			@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
                # require or validate it. See the following for context:
 | 
			
		||||
                # https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
 | 
			
		||||
                return data["mxid"]
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
        except IOError as e:
 | 
			
		||||
            logger.warning("Error from v1 identity server lookup: %s" % (e,))
 | 
			
		||||
| 
						 | 
				
			
			@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
                "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
 | 
			
		||||
                {"access_token": id_access_token},
 | 
			
		||||
            )
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
 | 
			
		||||
        if not isinstance(hash_details, dict):
 | 
			
		||||
| 
						 | 
				
			
			@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
                },
 | 
			
		||||
                headers=headers,
 | 
			
		||||
            )
 | 
			
		||||
        except TimeoutError:
 | 
			
		||||
        except RequestTimedOutError:
 | 
			
		||||
            raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.warning("Error when performing a v2 3pid lookup: %s", e)
 | 
			
		||||
| 
						 | 
				
			
			@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
                    invite_config,
 | 
			
		||||
                    {"Authorization": create_id_access_token_header(id_access_token)},
 | 
			
		||||
                )
 | 
			
		||||
            except TimeoutError:
 | 
			
		||||
            except RequestTimedOutError:
 | 
			
		||||
                raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
            except HttpResponseException as e:
 | 
			
		||||
                if e.code != 404:
 | 
			
		||||
| 
						 | 
				
			
			@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler):
 | 
			
		|||
                data = await self.blacklisting_http_client.post_json_get_json(
 | 
			
		||||
                    url, invite_config
 | 
			
		||||
                )
 | 
			
		||||
            except TimeoutError:
 | 
			
		||||
            except RequestTimedOutError:
 | 
			
		||||
                raise SynapseError(500, "Timed out contacting identity server")
 | 
			
		||||
            except HttpResponseException as e:
 | 
			
		||||
                logger.warning(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,8 +16,6 @@
 | 
			
		|||
import re
 | 
			
		||||
 | 
			
		||||
from twisted.internet import task
 | 
			
		||||
from twisted.internet.defer import CancelledError
 | 
			
		||||
from twisted.python import failure
 | 
			
		||||
from twisted.web.client import FileBodyProducer
 | 
			
		||||
 | 
			
		||||
from synapse.api.errors import SynapseError
 | 
			
		||||
| 
						 | 
				
			
			@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError
 | 
			
		|||
class RequestTimedOutError(SynapseError):
 | 
			
		||||
    """Exception representing timeout of an outbound request"""
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super().__init__(504, "Timed out")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def cancelled_to_request_timed_out_error(value, timeout):
 | 
			
		||||
    """Turns CancelledErrors into RequestTimedOutErrors.
 | 
			
		||||
 | 
			
		||||
    For use with async.add_timeout_to_deferred
 | 
			
		||||
    """
 | 
			
		||||
    if isinstance(value, failure.Failure):
 | 
			
		||||
        value.trap(CancelledError)
 | 
			
		||||
        raise RequestTimedOutError()
 | 
			
		||||
    return value
 | 
			
		||||
    def __init__(self, msg):
 | 
			
		||||
        super().__init__(504, msg)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,7 +13,6 @@
 | 
			
		|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
# See the License for the specific language governing permissions and
 | 
			
		||||
# limitations under the License.
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import urllib
 | 
			
		||||
from io import BytesIO
 | 
			
		||||
| 
						 | 
				
			
			@ -38,7 +37,7 @@ from zope.interface import implementer, provider
 | 
			
		|||
 | 
			
		||||
from OpenSSL import SSL
 | 
			
		||||
from OpenSSL.SSL import VERIFY_NONE
 | 
			
		||||
from twisted.internet import defer, protocol, ssl
 | 
			
		||||
from twisted.internet import defer, error as twisted_error, protocol, ssl
 | 
			
		||||
from twisted.internet.interfaces import (
 | 
			
		||||
    IReactorPluggableNameResolver,
 | 
			
		||||
    IResolutionReceiver,
 | 
			
		||||
| 
						 | 
				
			
			@ -46,17 +45,18 @@ from twisted.internet.interfaces import (
 | 
			
		|||
from twisted.internet.task import Cooperator
 | 
			
		||||
from twisted.python.failure import Failure
 | 
			
		||||
from twisted.web._newclient import ResponseDone
 | 
			
		||||
from twisted.web.client import Agent, HTTPConnectionPool, readBody
 | 
			
		||||
from twisted.web.client import (
 | 
			
		||||
    Agent,
 | 
			
		||||
    HTTPConnectionPool,
 | 
			
		||||
    ResponseNeverReceived,
 | 
			
		||||
    readBody,
 | 
			
		||||
)
 | 
			
		||||
from twisted.web.http import PotentialDataLoss
 | 
			
		||||
from twisted.web.http_headers import Headers
 | 
			
		||||
from twisted.web.iweb import IResponse
 | 
			
		||||
 | 
			
		||||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
 | 
			
		||||
from synapse.http import (
 | 
			
		||||
    QuieterFileBodyProducer,
 | 
			
		||||
    cancelled_to_request_timed_out_error,
 | 
			
		||||
    redact_uri,
 | 
			
		||||
)
 | 
			
		||||
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
 | 
			
		||||
from synapse.http.proxyagent import ProxyAgent
 | 
			
		||||
from synapse.logging.context import make_deferred_yieldable
 | 
			
		||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
 | 
			
		||||
| 
						 | 
				
			
			@ -332,8 +332,6 @@ class SimpleHttpClient:
 | 
			
		|||
            RequestTimedOutError if the request times out before the headers are read
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        # A small wrapper around self.agent.request() so we can easily attach
 | 
			
		||||
        # counters to it
 | 
			
		||||
        outgoing_requests_counter.labels(method).inc()
 | 
			
		||||
 | 
			
		||||
        # log request but strip `access_token` (AS requests for example include this)
 | 
			
		||||
| 
						 | 
				
			
			@ -362,15 +360,17 @@ class SimpleHttpClient:
 | 
			
		|||
                    data=body_producer,
 | 
			
		||||
                    headers=headers,
 | 
			
		||||
                    **self._extra_treq_args
 | 
			
		||||
                )
 | 
			
		||||
                )  # type: defer.Deferred
 | 
			
		||||
 | 
			
		||||
                # we use our own timeout mechanism rather than treq's as a workaround
 | 
			
		||||
                # for https://twistedmatrix.com/trac/ticket/9534.
 | 
			
		||||
                request_deferred = timeout_deferred(
 | 
			
		||||
                    request_deferred,
 | 
			
		||||
                    60,
 | 
			
		||||
                    self.hs.get_reactor(),
 | 
			
		||||
                    cancelled_to_request_timed_out_error,
 | 
			
		||||
                    request_deferred, 60, self.hs.get_reactor(),
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # turn timeouts into RequestTimedOutErrors
 | 
			
		||||
                request_deferred.addErrback(_timeout_to_request_timed_out_error)
 | 
			
		||||
 | 
			
		||||
                response = await make_deferred_yieldable(request_deferred)
 | 
			
		||||
 | 
			
		||||
                incoming_responses_counter.labels(method, response.code).inc()
 | 
			
		||||
| 
						 | 
				
			
			@ -410,7 +410,7 @@ class SimpleHttpClient:
 | 
			
		|||
            parsed json
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            RequestTimedOutException: if there is a timeout before the response headers
 | 
			
		||||
            RequestTimedOutError: if there is a timeout before the response headers
 | 
			
		||||
               are received. Note there is currently no timeout on reading the response
 | 
			
		||||
               body.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -461,7 +461,7 @@ class SimpleHttpClient:
 | 
			
		|||
            parsed json
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            RequestTimedOutException: if there is a timeout before the response headers
 | 
			
		||||
            RequestTimedOutError: if there is a timeout before the response headers
 | 
			
		||||
               are received. Note there is currently no timeout on reading the response
 | 
			
		||||
               body.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -506,7 +506,7 @@ class SimpleHttpClient:
 | 
			
		|||
        Returns:
 | 
			
		||||
            Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
 | 
			
		||||
        Raises:
 | 
			
		||||
            RequestTimedOutException: if there is a timeout before the response headers
 | 
			
		||||
            RequestTimedOutError: if there is a timeout before the response headers
 | 
			
		||||
               are received. Note there is currently no timeout on reading the response
 | 
			
		||||
               body.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -538,7 +538,7 @@ class SimpleHttpClient:
 | 
			
		|||
        Returns:
 | 
			
		||||
            Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
 | 
			
		||||
        Raises:
 | 
			
		||||
             RequestTimedOutException: if there is a timeout before the response headers
 | 
			
		||||
             RequestTimedOutError: if there is a timeout before the response headers
 | 
			
		||||
               are received. Note there is currently no timeout on reading the response
 | 
			
		||||
               body.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -586,7 +586,7 @@ class SimpleHttpClient:
 | 
			
		|||
            Succeeds when we get a 2xx HTTP response, with the
 | 
			
		||||
            HTTP body as bytes.
 | 
			
		||||
        Raises:
 | 
			
		||||
            RequestTimedOutException: if there is a timeout before the response headers
 | 
			
		||||
            RequestTimedOutError: if there is a timeout before the response headers
 | 
			
		||||
               are received. Note there is currently no timeout on reading the response
 | 
			
		||||
               body.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -631,7 +631,7 @@ class SimpleHttpClient:
 | 
			
		|||
            headers, absolute URI of the response and HTTP response code.
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            RequestTimedOutException: if there is a timeout before the response headers
 | 
			
		||||
            RequestTimedOutError: if there is a timeout before the response headers
 | 
			
		||||
               are received. Note there is currently no timeout on reading the response
 | 
			
		||||
               body.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -684,6 +684,18 @@ class SimpleHttpClient:
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _timeout_to_request_timed_out_error(f: Failure):
 | 
			
		||||
    if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
 | 
			
		||||
        # The TCP connection has its own timeout (set by the 'connectTimeout' param
 | 
			
		||||
        # on the Agent), which raises twisted_error.TimeoutError exception.
 | 
			
		||||
        raise RequestTimedOutError("Timeout connecting to remote server")
 | 
			
		||||
    elif f.check(defer.TimeoutError, ResponseNeverReceived):
 | 
			
		||||
        # this one means that we hit our overall timeout on the request
 | 
			
		||||
        raise RequestTimedOutError("Timeout waiting for response from remote server")
 | 
			
		||||
 | 
			
		||||
    return f
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
 | 
			
		||||
# The two should be factored out.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -171,7 +171,7 @@ async def _handle_json_response(
 | 
			
		|||
        d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 | 
			
		||||
 | 
			
		||||
        body = await make_deferred_yieldable(d)
 | 
			
		||||
    except TimeoutError as e:
 | 
			
		||||
    except defer.TimeoutError as e:
 | 
			
		||||
        logger.warning(
 | 
			
		||||
            "{%s} [%s] Timed out reading response - %s %s",
 | 
			
		||||
            request.txn_id,
 | 
			
		||||
| 
						 | 
				
			
			@ -655,10 +655,14 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            long_retries (bool): whether to use the long retry algorithm. See
 | 
			
		||||
                docs on _send_request for details.
 | 
			
		||||
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response headers
 | 
			
		||||
                (including connecting to the server), *for each attempt*.
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response.
 | 
			
		||||
                self._default_timeout (60s) by default.
 | 
			
		||||
 | 
			
		||||
                Note that we may make several attempts to send the request; this
 | 
			
		||||
                timeout applies to the time spent waiting for response headers for
 | 
			
		||||
                *each* attempt (including connection time) as well as the time spent
 | 
			
		||||
                reading the response body after a 200 response.
 | 
			
		||||
 | 
			
		||||
            ignore_backoff (bool): true to ignore the historical backoff data
 | 
			
		||||
                and try the request anyway.
 | 
			
		||||
            backoff_on_404 (bool): True if we should count a 404 response as
 | 
			
		||||
| 
						 | 
				
			
			@ -704,8 +708,13 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            timeout=timeout,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if timeout is not None:
 | 
			
		||||
            _sec_timeout = timeout / 1000
 | 
			
		||||
        else:
 | 
			
		||||
            _sec_timeout = self.default_timeout
 | 
			
		||||
 | 
			
		||||
        body = await _handle_json_response(
 | 
			
		||||
            self.reactor, self.default_timeout, request, response, start_ms
 | 
			
		||||
            self.reactor, _sec_timeout, request, response, start_ms
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return body
 | 
			
		||||
| 
						 | 
				
			
			@ -734,10 +743,14 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            long_retries (bool): whether to use the long retry algorithm. See
 | 
			
		||||
                docs on _send_request for details.
 | 
			
		||||
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response headers
 | 
			
		||||
                (including connecting to the server), *for each attempt*.
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response.
 | 
			
		||||
                self._default_timeout (60s) by default.
 | 
			
		||||
 | 
			
		||||
                Note that we may make several attempts to send the request; this
 | 
			
		||||
                timeout applies to the time spent waiting for response headers for
 | 
			
		||||
                *each* attempt (including connection time) as well as the time spent
 | 
			
		||||
                reading the response body after a 200 response.
 | 
			
		||||
 | 
			
		||||
            ignore_backoff (bool): true to ignore the historical backoff data and
 | 
			
		||||
                try the request anyway.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -801,10 +814,14 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            args (dict|None): A dictionary used to create query strings, defaults to
 | 
			
		||||
                None.
 | 
			
		||||
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response headers
 | 
			
		||||
                (including connecting to the server), *for each attempt*.
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response.
 | 
			
		||||
                self._default_timeout (60s) by default.
 | 
			
		||||
 | 
			
		||||
                Note that we may make several attempts to send the request; this
 | 
			
		||||
                timeout applies to the time spent waiting for response headers for
 | 
			
		||||
                *each* attempt (including connection time) as well as the time spent
 | 
			
		||||
                reading the response body after a 200 response.
 | 
			
		||||
 | 
			
		||||
            ignore_backoff (bool): true to ignore the historical backoff data
 | 
			
		||||
                and try the request anyway.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -840,8 +857,13 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            timeout=timeout,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if timeout is not None:
 | 
			
		||||
            _sec_timeout = timeout / 1000
 | 
			
		||||
        else:
 | 
			
		||||
            _sec_timeout = self.default_timeout
 | 
			
		||||
 | 
			
		||||
        body = await _handle_json_response(
 | 
			
		||||
            self.reactor, self.default_timeout, request, response, start_ms
 | 
			
		||||
            self.reactor, _sec_timeout, request, response, start_ms
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return body
 | 
			
		||||
| 
						 | 
				
			
			@ -865,10 +887,14 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            long_retries (bool): whether to use the long retry algorithm. See
 | 
			
		||||
                docs on _send_request for details.
 | 
			
		||||
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response headers
 | 
			
		||||
                (including connecting to the server), *for each attempt*.
 | 
			
		||||
            timeout (int|None): number of milliseconds to wait for the response.
 | 
			
		||||
                self._default_timeout (60s) by default.
 | 
			
		||||
 | 
			
		||||
                Note that we may make several attempts to send the request; this
 | 
			
		||||
                timeout applies to the time spent waiting for response headers for
 | 
			
		||||
                *each* attempt (including connection time) as well as the time spent
 | 
			
		||||
                reading the response body after a 200 response.
 | 
			
		||||
 | 
			
		||||
            ignore_backoff (bool): true to ignore the historical backoff data and
 | 
			
		||||
                try the request anyway.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -900,8 +926,13 @@ class MatrixFederationHttpClient:
 | 
			
		|||
            ignore_backoff=ignore_backoff,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if timeout is not None:
 | 
			
		||||
            _sec_timeout = timeout / 1000
 | 
			
		||||
        else:
 | 
			
		||||
            _sec_timeout = self.default_timeout
 | 
			
		||||
 | 
			
		||||
        body = await _handle_json_response(
 | 
			
		||||
            self.reactor, self.default_timeout, request, response, start_ms
 | 
			
		||||
            self.reactor, _sec_timeout, request, response, start_ms
 | 
			
		||||
        )
 | 
			
		||||
        return body
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase):
 | 
			
		|||
            `BrowserLikePolicyForHTTPS`, so unless you have special
 | 
			
		||||
            requirements you can leave this as-is.
 | 
			
		||||
 | 
			
		||||
        connectTimeout (float): The amount of time that this Agent will wait
 | 
			
		||||
            for the peer to accept a connection.
 | 
			
		||||
        connectTimeout (Optional[float]): The amount of time that this Agent will wait
 | 
			
		||||
            for the peer to accept a connection, in seconds. If 'None',
 | 
			
		||||
            HostnameEndpoint's default (30s) will be used.
 | 
			
		||||
 | 
			
		||||
            This is used for connections to both proxies and destination servers.
 | 
			
		||||
 | 
			
		||||
        bindAddress (bytes): The local address for client sockets to bind to.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase):
 | 
			
		|||
        Returns:
 | 
			
		||||
            Deferred[IResponse]: completes when the header of the response has
 | 
			
		||||
                 been received (regardless of the response status code).
 | 
			
		||||
 | 
			
		||||
                 Can fail with:
 | 
			
		||||
                    SchemeNotSupported: if the uri is not http or https
 | 
			
		||||
 | 
			
		||||
                    twisted.internet.error.TimeoutError if the server we are connecting
 | 
			
		||||
                        to (proxy or destination) does not accept a connection before
 | 
			
		||||
                        connectTimeout.
 | 
			
		||||
 | 
			
		||||
                    ... other things too.
 | 
			
		||||
        """
 | 
			
		||||
        uri = uri.strip()
 | 
			
		||||
        if not _VALID_URI.match(uri):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,18 +20,28 @@ import urllib
 | 
			
		|||
from inspect import signature
 | 
			
		||||
from typing import Dict, List, Tuple
 | 
			
		||||
 | 
			
		||||
from synapse.api.errors import (
 | 
			
		||||
    CodeMessageException,
 | 
			
		||||
    HttpResponseException,
 | 
			
		||||
    RequestSendFailed,
 | 
			
		||||
    SynapseError,
 | 
			
		||||
)
 | 
			
		||||
from prometheus_client import Counter, Gauge
 | 
			
		||||
 | 
			
		||||
from synapse.api.errors import HttpResponseException, SynapseError
 | 
			
		||||
from synapse.http import RequestTimedOutError
 | 
			
		||||
from synapse.logging.opentracing import inject_active_span_byte_dict, trace
 | 
			
		||||
from synapse.util.caches.response_cache import ResponseCache
 | 
			
		||||
from synapse.util.stringutils import random_string
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
_pending_outgoing_requests = Gauge(
 | 
			
		||||
    "synapse_pending_outgoing_replication_requests",
 | 
			
		||||
    "Number of active outgoing replication requests, by replication method name",
 | 
			
		||||
    ["name"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
_outgoing_request_counter = Counter(
 | 
			
		||||
    "synapse_outgoing_replication_requests",
 | 
			
		||||
    "Number of outgoing replication requests, by replication method name and result",
 | 
			
		||||
    ["name", "code"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ReplicationEndpoint(metaclass=abc.ABCMeta):
 | 
			
		||||
    """Helper base class for defining new replication HTTP endpoints.
 | 
			
		||||
| 
						 | 
				
			
			@ -138,7 +148,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
 | 
			
		|||
 | 
			
		||||
        instance_map = hs.config.worker.instance_map
 | 
			
		||||
 | 
			
		||||
        outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
 | 
			
		||||
 | 
			
		||||
        @trace(opname="outgoing_replication_request")
 | 
			
		||||
        @outgoing_gauge.track_inprogress()
 | 
			
		||||
        async def send_request(instance_name="master", **kwargs):
 | 
			
		||||
            if instance_name == local_instance_name:
 | 
			
		||||
                raise Exception("Trying to send HTTP request to self")
 | 
			
		||||
| 
						 | 
				
			
			@ -193,23 +206,26 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
 | 
			
		|||
                    try:
 | 
			
		||||
                        result = await request_func(uri, data, headers=headers)
 | 
			
		||||
                        break
 | 
			
		||||
                    except CodeMessageException as e:
 | 
			
		||||
                        if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
 | 
			
		||||
                    except RequestTimedOutError:
 | 
			
		||||
                        if not cls.RETRY_ON_TIMEOUT:
 | 
			
		||||
                            raise
 | 
			
		||||
 | 
			
		||||
                    logger.warning("%s request timed out", cls.NAME)
 | 
			
		||||
                    logger.warning("%s request timed out; retrying", cls.NAME)
 | 
			
		||||
 | 
			
		||||
                    # If we timed out we probably don't need to worry about backing
 | 
			
		||||
                    # off too much, but lets just wait a little anyway.
 | 
			
		||||
                    await clock.sleep(1)
 | 
			
		||||
            except HttpResponseException as e:
 | 
			
		||||
                # We convert to SynapseError as we know that it was a SynapseError
 | 
			
		||||
                # on the master process that we should send to the client. (And
 | 
			
		||||
                # on the main process that we should send to the client. (And
 | 
			
		||||
                # importantly, not stack traces everywhere)
 | 
			
		||||
                _outgoing_request_counter.labels(cls.NAME, e.code).inc()
 | 
			
		||||
                raise e.to_synapse_error()
 | 
			
		||||
            except RequestSendFailed as e:
 | 
			
		||||
                raise SynapseError(502, "Failed to talk to master") from e
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                _outgoing_request_counter.labels(cls.NAME, "ERR").inc()
 | 
			
		||||
                raise SynapseError(502, "Failed to talk to main process") from e
 | 
			
		||||
 | 
			
		||||
            _outgoing_request_counter.labels(cls.NAME, 200).inc()
 | 
			
		||||
            return result
 | 
			
		||||
 | 
			
		||||
        return send_request
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -96,6 +96,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
 | 
			
		|||
        send_attempt = body["send_attempt"]
 | 
			
		||||
        next_link = body.get("next_link")  # Optional param
 | 
			
		||||
 | 
			
		||||
        if next_link:
 | 
			
		||||
            # Raise if the provided next_link value isn't valid
 | 
			
		||||
            assert_valid_next_link(self.hs, next_link)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -372,6 +373,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
 | 
			
		|||
                Codes.THREEPID_DENIED,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if next_link:
 | 
			
		||||
            # Raise if the provided next_link value isn't valid
 | 
			
		||||
            assert_valid_next_link(self.hs, next_link)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -446,6 +448,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 | 
			
		|||
                Codes.THREEPID_DENIED,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if next_link:
 | 
			
		||||
            # Raise if the provided next_link value isn't valid
 | 
			
		||||
            assert_valid_next_link(self.hs, next_link)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -41,7 +41,14 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
 | 
			
		|||
        """
 | 
			
		||||
 | 
			
		||||
        def _count_users(txn):
 | 
			
		||||
            sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
 | 
			
		||||
            # Exclude app service users
 | 
			
		||||
            sql = """
 | 
			
		||||
                SELECT COALESCE(count(*), 0)
 | 
			
		||||
                FROM monthly_active_users
 | 
			
		||||
                    LEFT JOIN users
 | 
			
		||||
                    ON monthly_active_users.user_id=users.name
 | 
			
		||||
                WHERE (users.appservice_id IS NULL OR users.appservice_id = '');
 | 
			
		||||
            """
 | 
			
		||||
            txn.execute(sql)
 | 
			
		||||
            (count,) = txn.fetchone()
 | 
			
		||||
            return count
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -449,18 +449,8 @@ class ReadWriteLock:
 | 
			
		|||
R = TypeVar("R")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _cancelled_to_timed_out_error(value: R, timeout: float) -> R:
 | 
			
		||||
    if isinstance(value, failure.Failure):
 | 
			
		||||
        value.trap(CancelledError)
 | 
			
		||||
        raise defer.TimeoutError(timeout, "Deferred")
 | 
			
		||||
    return value
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def timeout_deferred(
 | 
			
		||||
    deferred: defer.Deferred,
 | 
			
		||||
    timeout: float,
 | 
			
		||||
    reactor: IReactorTime,
 | 
			
		||||
    on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None,
 | 
			
		||||
    deferred: defer.Deferred, timeout: float, reactor: IReactorTime,
 | 
			
		||||
) -> defer.Deferred:
 | 
			
		||||
    """The in built twisted `Deferred.addTimeout` fails to time out deferreds
 | 
			
		||||
    that have a canceller that throws exceptions. This method creates a new
 | 
			
		||||
| 
						 | 
				
			
			@ -469,27 +459,21 @@ def timeout_deferred(
 | 
			
		|||
 | 
			
		||||
    (See https://twistedmatrix.com/trac/ticket/9534)
 | 
			
		||||
 | 
			
		||||
    NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
 | 
			
		||||
    NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred.
 | 
			
		||||
 | 
			
		||||
    NOTE: the TimeoutError raised by the resultant deferred is
 | 
			
		||||
    twisted.internet.defer.TimeoutError, which is *different* to the built-in
 | 
			
		||||
    TimeoutError, as well as various other TimeoutErrors you might have imported.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        deferred: The Deferred to potentially timeout.
 | 
			
		||||
        timeout: Timeout in seconds
 | 
			
		||||
        reactor: The twisted reactor to use
 | 
			
		||||
        on_timeout_cancel: A callable which is called immediately
 | 
			
		||||
            after the deferred times out, and not if this deferred is
 | 
			
		||||
            otherwise cancelled before the timeout.
 | 
			
		||||
 | 
			
		||||
            It takes an arbitrary value, which is the value of the deferred at
 | 
			
		||||
            that exact point in time (probably a CancelledError Failure), and
 | 
			
		||||
            the timeout.
 | 
			
		||||
 | 
			
		||||
            The default callable (if none is provided) will translate a
 | 
			
		||||
            CancelledError Failure into a defer.TimeoutError.
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        A new Deferred.
 | 
			
		||||
        A new Deferred, which will errback with defer.TimeoutError on timeout.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    new_d = defer.Deferred()
 | 
			
		||||
 | 
			
		||||
    timed_out = [False]
 | 
			
		||||
| 
						 | 
				
			
			@ -502,18 +486,23 @@ def timeout_deferred(
 | 
			
		|||
        except:  # noqa: E722, if we throw any exception it'll break time outs
 | 
			
		||||
            logger.exception("Canceller failed during timeout")
 | 
			
		||||
 | 
			
		||||
        # the cancel() call should have set off a chain of errbacks which
 | 
			
		||||
        # will have errbacked new_d, but in case it hasn't, errback it now.
 | 
			
		||||
 | 
			
		||||
        if not new_d.called:
 | 
			
		||||
            new_d.errback(defer.TimeoutError(timeout, "Deferred"))
 | 
			
		||||
            new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
 | 
			
		||||
 | 
			
		||||
    delayed_call = reactor.callLater(timeout, time_it_out)
 | 
			
		||||
 | 
			
		||||
    def convert_cancelled(value):
 | 
			
		||||
        if timed_out[0]:
 | 
			
		||||
            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
 | 
			
		||||
            return to_call(value, timeout)
 | 
			
		||||
    def convert_cancelled(value: failure.Failure):
 | 
			
		||||
        # if the orgininal deferred was cancelled, and our timeout has fired, then
 | 
			
		||||
        # the reason it was cancelled was due to our timeout. Turn the CancelledError
 | 
			
		||||
        # into a TimeoutError.
 | 
			
		||||
        if timed_out[0] and value.check(CancelledError):
 | 
			
		||||
            raise defer.TimeoutError("Timed out after %gs" % (timeout,))
 | 
			
		||||
        return value
 | 
			
		||||
 | 
			
		||||
    deferred.addBoth(convert_cancelled)
 | 
			
		||||
    deferred.addErrback(convert_cancelled)
 | 
			
		||||
 | 
			
		||||
    def cancel_timeout(result):
 | 
			
		||||
        # stop the pending call to cancel the deferred if it's been fired
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -318,14 +318,14 @@ class FederationClientTests(HomeserverTestCase):
 | 
			
		|||
        r = self.successResultOf(d)
 | 
			
		||||
        self.assertEqual(r.code, 200)
 | 
			
		||||
 | 
			
		||||
    def test_client_headers_no_body(self):
 | 
			
		||||
    @parameterized.expand(["get_json", "post_json", "delete_json", "put_json"])
 | 
			
		||||
    def test_timeout_reading_body(self, method_name: str):
 | 
			
		||||
        """
 | 
			
		||||
        If the HTTP request is connected, but gets no response before being
 | 
			
		||||
        timed out, it'll give a ResponseNeverReceived.
 | 
			
		||||
        timed out, it'll give a RequestSendFailed with can_retry.
 | 
			
		||||
        """
 | 
			
		||||
        d = defer.ensureDeferred(
 | 
			
		||||
            self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)
 | 
			
		||||
        )
 | 
			
		||||
        method = getattr(self.cl, method_name)
 | 
			
		||||
        d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000))
 | 
			
		||||
 | 
			
		||||
        self.pump()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -349,7 +349,9 @@ class FederationClientTests(HomeserverTestCase):
 | 
			
		|||
        self.reactor.advance(10.5)
 | 
			
		||||
        f = self.failureResultOf(d)
 | 
			
		||||
 | 
			
		||||
        self.assertIsInstance(f.value, TimeoutError)
 | 
			
		||||
        self.assertIsInstance(f.value, RequestSendFailed)
 | 
			
		||||
        self.assertTrue(f.value.can_retry)
 | 
			
		||||
        self.assertIsInstance(f.value.inner_exception, defer.TimeoutError)
 | 
			
		||||
 | 
			
		||||
    def test_client_requires_trailing_slashes(self):
 | 
			
		||||
        """
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,180 @@
 | 
			
		|||
# -*- coding: utf-8 -*-
 | 
			
		||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
# you may not use this file except in compliance with the License.
 | 
			
		||||
# You may obtain a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
# See the License for the specific language governing permissions and
 | 
			
		||||
# limitations under the License.
 | 
			
		||||
from mock import Mock
 | 
			
		||||
 | 
			
		||||
from netaddr import IPSet
 | 
			
		||||
 | 
			
		||||
from twisted.internet import defer
 | 
			
		||||
from twisted.internet.error import DNSLookupError
 | 
			
		||||
 | 
			
		||||
from synapse.http import RequestTimedOutError
 | 
			
		||||
from synapse.http.client import SimpleHttpClient
 | 
			
		||||
from synapse.server import HomeServer
 | 
			
		||||
 | 
			
		||||
from tests.unittest import HomeserverTestCase
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SimpleHttpClientTests(HomeserverTestCase):
 | 
			
		||||
    def prepare(self, reactor, clock, hs: "HomeServer"):
 | 
			
		||||
        # Add a DNS entry for a test server
 | 
			
		||||
        self.reactor.lookups["testserv"] = "1.2.3.4"
 | 
			
		||||
 | 
			
		||||
        self.cl = hs.get_simple_http_client()
 | 
			
		||||
 | 
			
		||||
    def test_dns_error(self):
 | 
			
		||||
        """
 | 
			
		||||
        If the DNS lookup returns an error, it will bubble up.
 | 
			
		||||
        """
 | 
			
		||||
        d = defer.ensureDeferred(self.cl.get_json("http://testserv2:8008/foo/bar"))
 | 
			
		||||
        self.pump()
 | 
			
		||||
 | 
			
		||||
        f = self.failureResultOf(d)
 | 
			
		||||
        self.assertIsInstance(f.value, DNSLookupError)
 | 
			
		||||
 | 
			
		||||
    def test_client_connection_refused(self):
 | 
			
		||||
        d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
 | 
			
		||||
 | 
			
		||||
        self.pump()
 | 
			
		||||
 | 
			
		||||
        # Nothing happened yet
 | 
			
		||||
        self.assertNoResult(d)
 | 
			
		||||
 | 
			
		||||
        clients = self.reactor.tcpClients
 | 
			
		||||
        self.assertEqual(len(clients), 1)
 | 
			
		||||
        (host, port, factory, _timeout, _bindAddress) = clients[0]
 | 
			
		||||
        self.assertEqual(host, "1.2.3.4")
 | 
			
		||||
        self.assertEqual(port, 8008)
 | 
			
		||||
        e = Exception("go away")
 | 
			
		||||
        factory.clientConnectionFailed(None, e)
 | 
			
		||||
        self.pump(0.5)
 | 
			
		||||
 | 
			
		||||
        f = self.failureResultOf(d)
 | 
			
		||||
 | 
			
		||||
        self.assertIs(f.value, e)
 | 
			
		||||
 | 
			
		||||
    def test_client_never_connect(self):
 | 
			
		||||
        """
 | 
			
		||||
        If the HTTP request is not connected and is timed out, it'll give a
 | 
			
		||||
        ConnectingCancelledError or TimeoutError.
 | 
			
		||||
        """
 | 
			
		||||
        d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
 | 
			
		||||
 | 
			
		||||
        self.pump()
 | 
			
		||||
 | 
			
		||||
        # Nothing happened yet
 | 
			
		||||
        self.assertNoResult(d)
 | 
			
		||||
 | 
			
		||||
        # Make sure treq is trying to connect
 | 
			
		||||
        clients = self.reactor.tcpClients
 | 
			
		||||
        self.assertEqual(len(clients), 1)
 | 
			
		||||
        self.assertEqual(clients[0][0], "1.2.3.4")
 | 
			
		||||
        self.assertEqual(clients[0][1], 8008)
 | 
			
		||||
 | 
			
		||||
        # Deferred is still without a result
 | 
			
		||||
        self.assertNoResult(d)
 | 
			
		||||
 | 
			
		||||
        # Push by enough to time it out
 | 
			
		||||
        self.reactor.advance(120)
 | 
			
		||||
        f = self.failureResultOf(d)
 | 
			
		||||
 | 
			
		||||
        self.assertIsInstance(f.value, RequestTimedOutError)
 | 
			
		||||
 | 
			
		||||
    def test_client_connect_no_response(self):
 | 
			
		||||
        """
 | 
			
		||||
        If the HTTP request is connected, but gets no response before being
 | 
			
		||||
        timed out, it'll give a ResponseNeverReceived.
 | 
			
		||||
        """
 | 
			
		||||
        d = defer.ensureDeferred(self.cl.get_json("http://testserv:8008/foo/bar"))
 | 
			
		||||
 | 
			
		||||
        self.pump()
 | 
			
		||||
 | 
			
		||||
        # Nothing happened yet
 | 
			
		||||
        self.assertNoResult(d)
 | 
			
		||||
 | 
			
		||||
        # Make sure treq is trying to connect
 | 
			
		||||
        clients = self.reactor.tcpClients
 | 
			
		||||
        self.assertEqual(len(clients), 1)
 | 
			
		||||
        self.assertEqual(clients[0][0], "1.2.3.4")
 | 
			
		||||
        self.assertEqual(clients[0][1], 8008)
 | 
			
		||||
 | 
			
		||||
        conn = Mock()
 | 
			
		||||
        client = clients[0][2].buildProtocol(None)
 | 
			
		||||
        client.makeConnection(conn)
 | 
			
		||||
 | 
			
		||||
        # Deferred is still without a result
 | 
			
		||||
        self.assertNoResult(d)
 | 
			
		||||
 | 
			
		||||
        # Push by enough to time it out
 | 
			
		||||
        self.reactor.advance(120)
 | 
			
		||||
        f = self.failureResultOf(d)
 | 
			
		||||
 | 
			
		||||
        self.assertIsInstance(f.value, RequestTimedOutError)
 | 
			
		||||
 | 
			
		||||
    def test_client_ip_range_blacklist(self):
 | 
			
		||||
        """Ensure that Synapse does not try to connect to blacklisted IPs"""
 | 
			
		||||
 | 
			
		||||
        # Add some DNS entries we'll blacklist
 | 
			
		||||
        self.reactor.lookups["internal"] = "127.0.0.1"
 | 
			
		||||
        self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337"
 | 
			
		||||
        ip_blacklist = IPSet(["127.0.0.0/8", "fe80::/64"])
 | 
			
		||||
 | 
			
		||||
        cl = SimpleHttpClient(self.hs, ip_blacklist=ip_blacklist)
 | 
			
		||||
 | 
			
		||||
        # Try making a GET request to a blacklisted IPv4 address
 | 
			
		||||
        # ------------------------------------------------------
 | 
			
		||||
        # Make the request
 | 
			
		||||
        d = defer.ensureDeferred(cl.get_json("http://internal:8008/foo/bar"))
 | 
			
		||||
        self.pump(1)
 | 
			
		||||
 | 
			
		||||
        # Check that it was unable to resolve the address
 | 
			
		||||
        clients = self.reactor.tcpClients
 | 
			
		||||
        self.assertEqual(len(clients), 0)
 | 
			
		||||
 | 
			
		||||
        self.failureResultOf(d, DNSLookupError)
 | 
			
		||||
 | 
			
		||||
        # Try making a POST request to a blacklisted IPv6 address
 | 
			
		||||
        # -------------------------------------------------------
 | 
			
		||||
        # Make the request
 | 
			
		||||
        d = defer.ensureDeferred(
 | 
			
		||||
            cl.post_json_get_json("http://internalv6:8008/foo/bar", {})
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Move the reactor forwards
 | 
			
		||||
        self.pump(1)
 | 
			
		||||
 | 
			
		||||
        # Check that it was unable to resolve the address
 | 
			
		||||
        clients = self.reactor.tcpClients
 | 
			
		||||
        self.assertEqual(len(clients), 0)
 | 
			
		||||
 | 
			
		||||
        # Check that it was due to a blacklisted DNS lookup
 | 
			
		||||
        self.failureResultOf(d, DNSLookupError)
 | 
			
		||||
 | 
			
		||||
        # Try making a GET request to a non-blacklisted IPv4 address
 | 
			
		||||
        # ----------------------------------------------------------
 | 
			
		||||
        # Make the request
 | 
			
		||||
        d = defer.ensureDeferred(cl.get_json("http://testserv:8008/foo/bar"))
 | 
			
		||||
 | 
			
		||||
        # Nothing has happened yet
 | 
			
		||||
        self.assertNoResult(d)
 | 
			
		||||
 | 
			
		||||
        # Move the reactor forwards
 | 
			
		||||
        self.pump(1)
 | 
			
		||||
 | 
			
		||||
        # Check that it was able to resolve the address
 | 
			
		||||
        clients = self.reactor.tcpClients
 | 
			
		||||
        self.assertNotEqual(len(clients), 0)
 | 
			
		||||
 | 
			
		||||
        # Connection will still fail as this IP address does not resolve to anything
 | 
			
		||||
        self.failureResultOf(d, RequestTimedOutError)
 | 
			
		||||
| 
						 | 
				
			
			@ -732,6 +732,12 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
 | 
			
		|||
    @override_config({"next_link_domain_whitelist": ["example.com", "example.org"]})
 | 
			
		||||
    def test_next_link_domain_whitelist(self):
 | 
			
		||||
        """Tests next_link parameters must fit the whitelist if provided"""
 | 
			
		||||
 | 
			
		||||
        # Ensure not providing a next_link parameter still works
 | 
			
		||||
        self._request_token(
 | 
			
		||||
            "something@example.com", "some_secret", next_link=None, expect_code=200,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._request_token(
 | 
			
		||||
            "something@example.com",
 | 
			
		||||
            "some_secret",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -137,6 +137,21 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
 | 
			
		|||
        count = self.get_success(self.store.get_monthly_active_count())
 | 
			
		||||
        self.assertEqual(count, 1)
 | 
			
		||||
 | 
			
		||||
    def test_appservice_user_not_counted_in_mau(self):
 | 
			
		||||
        self.get_success(
 | 
			
		||||
            self.store.register_user(
 | 
			
		||||
                user_id="@appservice_user:server", appservice_id="wibble"
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        count = self.get_success(self.store.get_monthly_active_count())
 | 
			
		||||
        self.assertEqual(count, 0)
 | 
			
		||||
 | 
			
		||||
        d = self.store.upsert_monthly_active_user("@appservice_user:server")
 | 
			
		||||
        self.get_success(d)
 | 
			
		||||
 | 
			
		||||
        count = self.get_success(self.store.get_monthly_active_count())
 | 
			
		||||
        self.assertEqual(count, 0)
 | 
			
		||||
 | 
			
		||||
    def test_user_last_seen_monthly_active(self):
 | 
			
		||||
        user_id1 = "@user1:server"
 | 
			
		||||
        user_id2 = "@user2:server"
 | 
			
		||||
| 
						 | 
				
			
			@ -383,7 +398,7 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
 | 
			
		|||
        self.get_success(self.store.upsert_monthly_active_user(appservice2_user1))
 | 
			
		||||
 | 
			
		||||
        count = self.get_success(self.store.get_monthly_active_count())
 | 
			
		||||
        self.assertEqual(count, 4)
 | 
			
		||||
        self.assertEqual(count, 1)
 | 
			
		||||
 | 
			
		||||
        d = self.store.get_monthly_active_count_by_service()
 | 
			
		||||
        result = self.get_success(d)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue