433 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			433 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
# Copyright 2019 New Vector Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
import logging
 | 
						|
import urllib.parse
 | 
						|
from typing import Any, Generator, List, Optional
 | 
						|
from urllib.request import (  # type: ignore[attr-defined]
 | 
						|
    getproxies_environment,
 | 
						|
    proxy_bypass_environment,
 | 
						|
)
 | 
						|
 | 
						|
from netaddr import AddrFormatError, IPAddress, IPSet
 | 
						|
from zope.interface import implementer
 | 
						|
 | 
						|
from twisted.internet import defer
 | 
						|
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 | 
						|
from twisted.internet.interfaces import (
 | 
						|
    IProtocol,
 | 
						|
    IProtocolFactory,
 | 
						|
    IReactorCore,
 | 
						|
    IStreamClientEndpoint,
 | 
						|
)
 | 
						|
from twisted.web.client import URI, Agent, HTTPConnectionPool
 | 
						|
from twisted.web.http_headers import Headers
 | 
						|
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse
 | 
						|
 | 
						|
from synapse.crypto.context_factory import FederationPolicyForHTTPS
 | 
						|
from synapse.http import proxyagent
 | 
						|
from synapse.http.client import BlocklistingAgentWrapper, BlocklistingReactorWrapper
 | 
						|
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint
 | 
						|
from synapse.http.federation.srv_resolver import Server, SrvResolver
 | 
						|
from synapse.http.federation.well_known_resolver import WellKnownResolver
 | 
						|
from synapse.http.proxyagent import ProxyAgent
 | 
						|
from synapse.logging.context import make_deferred_yieldable, run_in_background
 | 
						|
from synapse.types import ISynapseReactor
 | 
						|
from synapse.util import Clock
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
@implementer(IAgent)
 | 
						|
class MatrixFederationAgent:
 | 
						|
    """An Agent-like thing which provides a `request` method which correctly
 | 
						|
    handles resolving matrix server names when using matrix://. Handles standard
 | 
						|
    https URIs as normal.
 | 
						|
 | 
						|
    Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
 | 
						|
 | 
						|
    Args:
 | 
						|
        reactor: twisted reactor to use for underlying requests
 | 
						|
 | 
						|
        tls_client_options_factory:
 | 
						|
            factory to use for fetching client tls options, or none to disable TLS.
 | 
						|
 | 
						|
        user_agent:
 | 
						|
            The user agent header to use for federation requests.
 | 
						|
 | 
						|
        ip_allowlist: Allowed IP addresses.
 | 
						|
 | 
						|
        ip_blocklist: Disallowed IP addresses.
 | 
						|
 | 
						|
        proxy_reactor: twisted reactor to use for connections to the proxy server
 | 
						|
           reactor might have some blocking applied (i.e. for DNS queries),
 | 
						|
           but we need unblocked access to the proxy.
 | 
						|
 | 
						|
        _srv_resolver:
 | 
						|
            SrvResolver implementation to use for looking up SRV records. None
 | 
						|
            to use a default implementation.
 | 
						|
 | 
						|
        _well_known_resolver:
 | 
						|
            WellKnownResolver to use to perform well-known lookups. None to use a
 | 
						|
            default implementation.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        reactor: ISynapseReactor,
 | 
						|
        tls_client_options_factory: Optional[FederationPolicyForHTTPS],
 | 
						|
        user_agent: bytes,
 | 
						|
        ip_allowlist: Optional[IPSet],
 | 
						|
        ip_blocklist: IPSet,
 | 
						|
        _srv_resolver: Optional[SrvResolver] = None,
 | 
						|
        _well_known_resolver: Optional[WellKnownResolver] = None,
 | 
						|
    ):
 | 
						|
        # proxy_reactor is not blocklisting reactor
 | 
						|
        proxy_reactor = reactor
 | 
						|
 | 
						|
        # We need to use a DNS resolver which filters out blocked IP
 | 
						|
        # addresses, to prevent DNS rebinding.
 | 
						|
        reactor = BlocklistingReactorWrapper(reactor, ip_allowlist, ip_blocklist)
 | 
						|
 | 
						|
        self._clock = Clock(reactor)
 | 
						|
        self._pool = HTTPConnectionPool(reactor)
 | 
						|
        self._pool.retryAutomatically = False
 | 
						|
        self._pool.maxPersistentPerHost = 5
 | 
						|
        self._pool.cachedConnectionTimeout = 2 * 60
 | 
						|
 | 
						|
        self._agent = Agent.usingEndpointFactory(
 | 
						|
            reactor,
 | 
						|
            MatrixHostnameEndpointFactory(
 | 
						|
                reactor,
 | 
						|
                proxy_reactor,
 | 
						|
                tls_client_options_factory,
 | 
						|
                _srv_resolver,
 | 
						|
            ),
 | 
						|
            pool=self._pool,
 | 
						|
        )
 | 
						|
        self.user_agent = user_agent
 | 
						|
 | 
						|
        if _well_known_resolver is None:
 | 
						|
            _well_known_resolver = WellKnownResolver(
 | 
						|
                reactor,
 | 
						|
                agent=BlocklistingAgentWrapper(
 | 
						|
                    ProxyAgent(
 | 
						|
                        reactor,
 | 
						|
                        proxy_reactor,
 | 
						|
                        pool=self._pool,
 | 
						|
                        contextFactory=tls_client_options_factory,
 | 
						|
                        use_proxy=True,
 | 
						|
                    ),
 | 
						|
                    ip_blocklist=ip_blocklist,
 | 
						|
                ),
 | 
						|
                user_agent=self.user_agent,
 | 
						|
            )
 | 
						|
 | 
						|
        self._well_known_resolver = _well_known_resolver
 | 
						|
 | 
						|
    @defer.inlineCallbacks
 | 
						|
    def request(
 | 
						|
        self,
 | 
						|
        method: bytes,
 | 
						|
        uri: bytes,
 | 
						|
        headers: Optional[Headers] = None,
 | 
						|
        bodyProducer: Optional[IBodyProducer] = None,
 | 
						|
    ) -> Generator[defer.Deferred, Any, IResponse]:
 | 
						|
        """
 | 
						|
        Args:
 | 
						|
            method: HTTP method: GET/POST/etc
 | 
						|
            uri: Absolute URI to be retrieved
 | 
						|
            headers:
 | 
						|
                HTTP headers to send with the request, or None to send no extra headers.
 | 
						|
            bodyProducer:
 | 
						|
                An object which can generate bytes to make up the
 | 
						|
                body of this request (for example, the properly encoded contents of
 | 
						|
                a file for a file upload).  Or None if the request is to have
 | 
						|
                no body.
 | 
						|
        Returns:
 | 
						|
            A deferred which fires when the header of the response has been received
 | 
						|
            (regardless of the response status code). Fails if there is any problem
 | 
						|
            which prevents that response from being received (including problems that
 | 
						|
            prevent the request from being sent).
 | 
						|
        """
 | 
						|
        # We use urlparse as that will set `port` to None if there is no
 | 
						|
        # explicit port.
 | 
						|
        parsed_uri = urllib.parse.urlparse(uri)
 | 
						|
 | 
						|
        # There must be a valid hostname.
 | 
						|
        assert parsed_uri.hostname
 | 
						|
 | 
						|
        # If this is a matrix:// URI check if the server has delegated matrix
 | 
						|
        # traffic using well-known delegation.
 | 
						|
        #
 | 
						|
        # We have to do this here and not in the endpoint as we need to rewrite
 | 
						|
        # the host header with the delegated server name.
 | 
						|
        delegated_server = None
 | 
						|
        if (
 | 
						|
            parsed_uri.scheme == b"matrix"
 | 
						|
            and not _is_ip_literal(parsed_uri.hostname)
 | 
						|
            and not parsed_uri.port
 | 
						|
        ):
 | 
						|
            well_known_result = yield defer.ensureDeferred(
 | 
						|
                self._well_known_resolver.get_well_known(parsed_uri.hostname)
 | 
						|
            )
 | 
						|
            delegated_server = well_known_result.delegated_server
 | 
						|
 | 
						|
        if delegated_server:
 | 
						|
            # Ok, the server has delegated matrix traffic to somewhere else, so
 | 
						|
            # lets rewrite the URL to replace the server with the delegated
 | 
						|
            # server name.
 | 
						|
            uri = urllib.parse.urlunparse(
 | 
						|
                (
 | 
						|
                    parsed_uri.scheme,
 | 
						|
                    delegated_server,
 | 
						|
                    parsed_uri.path,
 | 
						|
                    parsed_uri.params,
 | 
						|
                    parsed_uri.query,
 | 
						|
                    parsed_uri.fragment,
 | 
						|
                )
 | 
						|
            )
 | 
						|
            parsed_uri = urllib.parse.urlparse(uri)
 | 
						|
 | 
						|
        # We need to make sure the host header is set to the netloc of the
 | 
						|
        # server and that a user-agent is provided.
 | 
						|
        if headers is None:
 | 
						|
            request_headers = Headers()
 | 
						|
        else:
 | 
						|
            request_headers = headers.copy()
 | 
						|
 | 
						|
        if not request_headers.hasHeader(b"host"):
 | 
						|
            request_headers.addRawHeader(b"host", parsed_uri.netloc)
 | 
						|
        if not request_headers.hasHeader(b"user-agent"):
 | 
						|
            request_headers.addRawHeader(b"user-agent", self.user_agent)
 | 
						|
 | 
						|
        res = yield make_deferred_yieldable(
 | 
						|
            self._agent.request(method, uri, request_headers, bodyProducer)
 | 
						|
        )
 | 
						|
 | 
						|
        return res
 | 
						|
 | 
						|
 | 
						|
@implementer(IAgentEndpointFactory)
 | 
						|
class MatrixHostnameEndpointFactory:
 | 
						|
    """Factory for MatrixHostnameEndpoint for parsing to an Agent."""
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        reactor: IReactorCore,
 | 
						|
        proxy_reactor: IReactorCore,
 | 
						|
        tls_client_options_factory: Optional[FederationPolicyForHTTPS],
 | 
						|
        srv_resolver: Optional[SrvResolver],
 | 
						|
    ):
 | 
						|
        self._reactor = reactor
 | 
						|
        self._proxy_reactor = proxy_reactor
 | 
						|
        self._tls_client_options_factory = tls_client_options_factory
 | 
						|
 | 
						|
        if srv_resolver is None:
 | 
						|
            srv_resolver = SrvResolver()
 | 
						|
 | 
						|
        self._srv_resolver = srv_resolver
 | 
						|
 | 
						|
    def endpointForURI(self, parsed_uri: URI) -> "MatrixHostnameEndpoint":
 | 
						|
        return MatrixHostnameEndpoint(
 | 
						|
            self._reactor,
 | 
						|
            self._proxy_reactor,
 | 
						|
            self._tls_client_options_factory,
 | 
						|
            self._srv_resolver,
 | 
						|
            parsed_uri,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
@implementer(IStreamClientEndpoint)
 | 
						|
class MatrixHostnameEndpoint:
 | 
						|
    """An endpoint that resolves matrix:// URLs using Matrix server name
 | 
						|
    resolution (i.e. via SRV). Does not check for well-known delegation.
 | 
						|
 | 
						|
    Args:
 | 
						|
        reactor: twisted reactor to use for underlying requests
 | 
						|
        proxy_reactor: twisted reactor to use for connections to the proxy server.
 | 
						|
           'reactor' might have some blocking applied (i.e. for DNS queries),
 | 
						|
           but we need unblocked access to the proxy.
 | 
						|
        tls_client_options_factory:
 | 
						|
            factory to use for fetching client tls options, or none to disable TLS.
 | 
						|
        srv_resolver: The SRV resolver to use
 | 
						|
        parsed_uri: The parsed URI that we're wanting to connect to.
 | 
						|
 | 
						|
    Raises:
 | 
						|
        ValueError if the environment variables contain an invalid proxy specification.
 | 
						|
        RuntimeError if no tls_options_factory is given for a https connection
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        reactor: IReactorCore,
 | 
						|
        proxy_reactor: IReactorCore,
 | 
						|
        tls_client_options_factory: Optional[FederationPolicyForHTTPS],
 | 
						|
        srv_resolver: SrvResolver,
 | 
						|
        parsed_uri: URI,
 | 
						|
    ):
 | 
						|
        self._reactor = reactor
 | 
						|
        self._parsed_uri = parsed_uri
 | 
						|
 | 
						|
        # http_proxy is not needed because federation is always over TLS
 | 
						|
        proxies = getproxies_environment()
 | 
						|
        https_proxy = proxies["https"].encode() if "https" in proxies else None
 | 
						|
        self.no_proxy = proxies["no"] if "no" in proxies else None
 | 
						|
 | 
						|
        # endpoint and credentials to use to connect to the outbound https proxy, if any.
 | 
						|
        (
 | 
						|
            self._https_proxy_endpoint,
 | 
						|
            self._https_proxy_creds,
 | 
						|
        ) = proxyagent.http_proxy_endpoint(
 | 
						|
            https_proxy,
 | 
						|
            proxy_reactor,
 | 
						|
            tls_client_options_factory,
 | 
						|
        )
 | 
						|
 | 
						|
        # set up the TLS connection params
 | 
						|
        #
 | 
						|
        # XXX disabling TLS is really only supported here for the benefit of the
 | 
						|
        # unit tests. We should make the UTs cope with TLS rather than having to make
 | 
						|
        # the code support the unit tests.
 | 
						|
 | 
						|
        if tls_client_options_factory is None:
 | 
						|
            self._tls_options = None
 | 
						|
        else:
 | 
						|
            self._tls_options = tls_client_options_factory.get_options(
 | 
						|
                self._parsed_uri.host
 | 
						|
            )
 | 
						|
 | 
						|
        self._srv_resolver = srv_resolver
 | 
						|
 | 
						|
    def connect(
 | 
						|
        self, protocol_factory: IProtocolFactory
 | 
						|
    ) -> "defer.Deferred[IProtocol]":
 | 
						|
        """Implements IStreamClientEndpoint interface"""
 | 
						|
 | 
						|
        return run_in_background(self._do_connect, protocol_factory)
 | 
						|
 | 
						|
    async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
 | 
						|
        first_exception = None
 | 
						|
 | 
						|
        server_list = await self._resolve_server()
 | 
						|
 | 
						|
        for server in server_list:
 | 
						|
            host = server.host
 | 
						|
            port = server.port
 | 
						|
 | 
						|
            should_skip_proxy = False
 | 
						|
            if self.no_proxy is not None:
 | 
						|
                should_skip_proxy = proxy_bypass_environment(
 | 
						|
                    host.decode(),
 | 
						|
                    proxies={"no": self.no_proxy},
 | 
						|
                )
 | 
						|
 | 
						|
            endpoint: IStreamClientEndpoint
 | 
						|
            try:
 | 
						|
                if self._https_proxy_endpoint and not should_skip_proxy:
 | 
						|
                    logger.debug(
 | 
						|
                        "Connecting to %s:%i via %s",
 | 
						|
                        host.decode("ascii"),
 | 
						|
                        port,
 | 
						|
                        self._https_proxy_endpoint,
 | 
						|
                    )
 | 
						|
                    endpoint = HTTPConnectProxyEndpoint(
 | 
						|
                        self._reactor,
 | 
						|
                        self._https_proxy_endpoint,
 | 
						|
                        host,
 | 
						|
                        port,
 | 
						|
                        proxy_creds=self._https_proxy_creds,
 | 
						|
                    )
 | 
						|
                else:
 | 
						|
                    logger.debug("Connecting to %s:%i", host.decode("ascii"), port)
 | 
						|
                    # not using a proxy
 | 
						|
                    endpoint = HostnameEndpoint(self._reactor, host, port)
 | 
						|
                if self._tls_options:
 | 
						|
                    endpoint = wrapClientTLS(self._tls_options, endpoint)
 | 
						|
                result = await make_deferred_yieldable(
 | 
						|
                    endpoint.connect(protocol_factory)
 | 
						|
                )
 | 
						|
 | 
						|
                return result
 | 
						|
            except Exception as e:
 | 
						|
                logger.info(
 | 
						|
                    "Failed to connect to %s:%i: %s", host.decode("ascii"), port, e
 | 
						|
                )
 | 
						|
                if not first_exception:
 | 
						|
                    first_exception = e
 | 
						|
 | 
						|
        # We return the first failure because that's probably the most interesting.
 | 
						|
        if first_exception:
 | 
						|
            raise first_exception
 | 
						|
 | 
						|
        # This shouldn't happen as we should always have at least one host/port
 | 
						|
        # to try and if that doesn't work then we'll have an exception.
 | 
						|
        raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
 | 
						|
 | 
						|
    async def _resolve_server(self) -> List[Server]:
 | 
						|
        """Resolves the server name to a list of hosts and ports to attempt to
 | 
						|
        connect to.
 | 
						|
        """
 | 
						|
 | 
						|
        if self._parsed_uri.scheme != b"matrix":
 | 
						|
            return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
 | 
						|
 | 
						|
        # Note: We don't do well-known lookup as that needs to have happened
 | 
						|
        # before now, due to needing to rewrite the Host header of the HTTP
 | 
						|
        # request.
 | 
						|
 | 
						|
        # We reparse the URI so that defaultPort is -1 rather than 80
 | 
						|
        parsed_uri = urllib.parse.urlparse(self._parsed_uri.toBytes())
 | 
						|
 | 
						|
        host = parsed_uri.hostname
 | 
						|
        port = parsed_uri.port
 | 
						|
 | 
						|
        # If there is an explicit port or the host is an IP address we bypass
 | 
						|
        # SRV lookups and just use the given host/port.
 | 
						|
        if port or _is_ip_literal(host):
 | 
						|
            return [Server(host, port or 8448)]
 | 
						|
 | 
						|
        logger.debug("Looking up SRV record for %s", host.decode(errors="replace"))
 | 
						|
        server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
 | 
						|
 | 
						|
        if server_list:
 | 
						|
            logger.debug(
 | 
						|
                "Got %s from SRV lookup for %s",
 | 
						|
                ", ".join(map(str, server_list)),
 | 
						|
                host.decode(errors="replace"),
 | 
						|
            )
 | 
						|
            return server_list
 | 
						|
 | 
						|
        # No SRV records, so we fallback to host and 8448
 | 
						|
        logger.debug("No SRV records for %s", host.decode(errors="replace"))
 | 
						|
        return [Server(host, 8448)]
 | 
						|
 | 
						|
 | 
						|
def _is_ip_literal(host: bytes) -> bool:
 | 
						|
    """Test if the given host name is either an IPv4 or IPv6 literal.
 | 
						|
 | 
						|
    Args:
 | 
						|
        host: The host name to check
 | 
						|
 | 
						|
    Returns:
 | 
						|
        True if the hostname is an IP address literal.
 | 
						|
    """
 | 
						|
 | 
						|
    host_str = host.decode("ascii")
 | 
						|
 | 
						|
    try:
 | 
						|
        IPAddress(host_str)
 | 
						|
        return True
 | 
						|
    except AddrFormatError:
 | 
						|
        return False
 |