309 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			309 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2014 OpenMarket Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
 | 
						|
from twisted.internet import defer, reactor
 | 
						|
from twisted.internet.error import DNSLookupError
 | 
						|
from twisted.web.client import readBody, _AgentBase, _URI
 | 
						|
from twisted.web.http_headers import Headers
 | 
						|
 | 
						|
from synapse.http.endpoint import matrix_federation_endpoint
 | 
						|
from synapse.util.async import sleep
 | 
						|
from synapse.util.logcontext import PreserveLoggingContext
 | 
						|
 | 
						|
from syutil.jsonutil import encode_canonical_json
 | 
						|
 | 
						|
from synapse.api.errors import CodeMessageException, SynapseError
 | 
						|
 | 
						|
from syutil.crypto.jsonsign import sign_json
 | 
						|
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import urllib
 | 
						|
import urlparse
 | 
						|
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class MatrixFederationHttpAgent(_AgentBase):
 | 
						|
 | 
						|
    def __init__(self, reactor, pool=None):
 | 
						|
        _AgentBase.__init__(self, reactor, pool)
 | 
						|
 | 
						|
    def request(self, destination, endpoint, method, path, params, query,
 | 
						|
                headers, body_producer):
 | 
						|
 | 
						|
        host = b""
 | 
						|
        port = 0
 | 
						|
        fragment = b""
 | 
						|
 | 
						|
        parsed_URI = _URI(b"http", destination, host, port, path, params,
 | 
						|
                          query, fragment)
 | 
						|
 | 
						|
        # Set the connection pool key to be the destination.
 | 
						|
        key = destination
 | 
						|
 | 
						|
        return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
 | 
						|
                                         headers, body_producer,
 | 
						|
                                         parsed_URI.originForm)
 | 
						|
 | 
						|
 | 
						|
class MatrixFederationHttpClient(object):
 | 
						|
    """HTTP client used to talk to other homeservers over the federation
 | 
						|
    protocol. Send client certificates and signs requests.
 | 
						|
 | 
						|
    Attributes:
 | 
						|
        agent (twisted.web.client.Agent): The twisted Agent used to send the
 | 
						|
            requests.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, hs):
 | 
						|
        self.hs = hs
 | 
						|
        self.signing_key = hs.config.signing_key[0]
 | 
						|
        self.server_name = hs.hostname
 | 
						|
        self.agent = MatrixFederationHttpAgent(reactor)
 | 
						|
 | 
						|
    @defer.inlineCallbacks
 | 
						|
    def _create_request(self, destination, method, path_bytes,
 | 
						|
                        body_callback, headers_dict={}, param_bytes=b"",
 | 
						|
                        query_bytes=b"", retry_on_dns_fail=True):
 | 
						|
        """ Creates and sends a request to the given url
 | 
						|
        """
 | 
						|
        headers_dict[b"User-Agent"] = [b"Synapse"]
 | 
						|
        headers_dict[b"Host"] = [destination]
 | 
						|
 | 
						|
        url_bytes = urlparse.urlunparse(
 | 
						|
            ("", "", path_bytes, param_bytes, query_bytes, "",)
 | 
						|
        )
 | 
						|
 | 
						|
        logger.debug("Sending request to %s: %s %s",
 | 
						|
                     destination, method, url_bytes)
 | 
						|
 | 
						|
        logger.debug(
 | 
						|
            "Types: %s",
 | 
						|
            [
 | 
						|
                type(destination), type(method), type(path_bytes),
 | 
						|
                type(param_bytes),
 | 
						|
                type(query_bytes)
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
        retries_left = 5
 | 
						|
 | 
						|
        endpoint = self._getEndpoint(reactor, destination)
 | 
						|
 | 
						|
        while True:
 | 
						|
            producer = None
 | 
						|
            if body_callback:
 | 
						|
                producer = body_callback(method, url_bytes, headers_dict)
 | 
						|
 | 
						|
            try:
 | 
						|
                with PreserveLoggingContext():
 | 
						|
                    response = yield self.agent.request(
 | 
						|
                        destination,
 | 
						|
                        endpoint,
 | 
						|
                        method,
 | 
						|
                        path_bytes,
 | 
						|
                        param_bytes,
 | 
						|
                        query_bytes,
 | 
						|
                        Headers(headers_dict),
 | 
						|
                        producer
 | 
						|
                    )
 | 
						|
 | 
						|
                logger.debug("Got response to %s", method)
 | 
						|
                break
 | 
						|
            except Exception as e:
 | 
						|
                if not retry_on_dns_fail and isinstance(e, DNSLookupError):
 | 
						|
                    logger.warn("DNS Lookup failed to %s with %s", destination,
 | 
						|
                                e)
 | 
						|
                    raise SynapseError(400, "Domain specified not found.")
 | 
						|
 | 
						|
                logger.exception("Got error in _create_request")
 | 
						|
                _print_ex(e)
 | 
						|
 | 
						|
                if retries_left:
 | 
						|
                    yield sleep(2 ** (5 - retries_left))
 | 
						|
                    retries_left -= 1
 | 
						|
                else:
 | 
						|
                    raise
 | 
						|
 | 
						|
        if 200 <= response.code < 300:
 | 
						|
            # We need to update the transactions table to say it was sent?
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            # :'(
 | 
						|
            # Update transactions table?
 | 
						|
            logger.error(
 | 
						|
                "Got response %d %s", response.code, response.phrase
 | 
						|
            )
 | 
						|
            raise CodeMessageException(
 | 
						|
                response.code, response.phrase
 | 
						|
            )
 | 
						|
 | 
						|
        defer.returnValue(response)
 | 
						|
 | 
						|
    def sign_request(self, destination, method, url_bytes, headers_dict,
 | 
						|
                     content=None):
 | 
						|
        request = {
 | 
						|
            "method": method,
 | 
						|
            "uri": url_bytes,
 | 
						|
            "origin": self.server_name,
 | 
						|
            "destination": destination,
 | 
						|
        }
 | 
						|
 | 
						|
        if content is not None:
 | 
						|
            request["content"] = content
 | 
						|
 | 
						|
        request = sign_json(request, self.server_name, self.signing_key)
 | 
						|
 | 
						|
        auth_headers = []
 | 
						|
 | 
						|
        for key, sig in request["signatures"][self.server_name].items():
 | 
						|
            auth_headers.append(bytes(
 | 
						|
                "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
 | 
						|
                    self.server_name, key, sig,
 | 
						|
                )
 | 
						|
            ))
 | 
						|
 | 
						|
        headers_dict[b"Authorization"] = auth_headers
 | 
						|
 | 
						|
    @defer.inlineCallbacks
 | 
						|
    def put_json(self, destination, path, data={}, json_data_callback=None):
 | 
						|
        """ Sends the specifed json data using PUT
 | 
						|
 | 
						|
        Args:
 | 
						|
            destination (str): The remote server to send the HTTP request
 | 
						|
                to.
 | 
						|
            path (str): The HTTP path.
 | 
						|
            data (dict): A dict containing the data that will be used as
 | 
						|
                the request body. This will be encoded as JSON.
 | 
						|
            json_data_callback (callable): A callable returning the dict to
 | 
						|
                use as the request body.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Deferred: Succeeds when we get a 2xx HTTP response. The result
 | 
						|
            will be the decoded JSON body. On a 4xx or 5xx error response a
 | 
						|
            CodeMessageException is raised.
 | 
						|
        """
 | 
						|
 | 
						|
        if not json_data_callback:
 | 
						|
            def json_data_callback():
 | 
						|
                return data
 | 
						|
 | 
						|
        def body_callback(method, url_bytes, headers_dict):
 | 
						|
            json_data = json_data_callback()
 | 
						|
            self.sign_request(
 | 
						|
                destination, method, url_bytes, headers_dict, json_data
 | 
						|
            )
 | 
						|
            producer = _JsonProducer(json_data)
 | 
						|
            return producer
 | 
						|
 | 
						|
        response = yield self._create_request(
 | 
						|
            destination.encode("ascii"),
 | 
						|
            "PUT",
 | 
						|
            path.encode("ascii"),
 | 
						|
            body_callback=body_callback,
 | 
						|
            headers_dict={"Content-Type": ["application/json"]},
 | 
						|
        )
 | 
						|
 | 
						|
        logger.debug("Getting resp body")
 | 
						|
        body = yield readBody(response)
 | 
						|
        logger.debug("Got resp body")
 | 
						|
 | 
						|
        defer.returnValue((response.code, body))
 | 
						|
 | 
						|
    @defer.inlineCallbacks
 | 
						|
    def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
 | 
						|
        """ Get's some json from the given host homeserver and path
 | 
						|
 | 
						|
        Args:
 | 
						|
            destination (str): The remote server to send the HTTP request
 | 
						|
                to.
 | 
						|
            path (str): The HTTP path.
 | 
						|
            args (dict): A dictionary used to create query strings, defaults to
 | 
						|
                None.
 | 
						|
                **Note**: The value of each key is assumed to be an iterable
 | 
						|
                and *not* a string.
 | 
						|
 | 
						|
        Returns:
 | 
						|
            Deferred: Succeeds when we get *any* HTTP response.
 | 
						|
 | 
						|
            The result of the deferred is a tuple of `(code, response)`,
 | 
						|
            where `response` is a dict representing the decoded JSON body.
 | 
						|
        """
 | 
						|
        logger.debug("get_json args: %s", args)
 | 
						|
 | 
						|
        encoded_args = {}
 | 
						|
        for k, vs in args.items():
 | 
						|
            if isinstance(vs, basestring):
 | 
						|
                vs = [vs]
 | 
						|
            encoded_args[k] = [v.encode("UTF-8") for v in vs]
 | 
						|
 | 
						|
        query_bytes = urllib.urlencode(encoded_args, True)
 | 
						|
        logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 | 
						|
 | 
						|
        def body_callback(method, url_bytes, headers_dict):
 | 
						|
            self.sign_request(destination, method, url_bytes, headers_dict)
 | 
						|
            return None
 | 
						|
 | 
						|
        response = yield self._create_request(
 | 
						|
            destination.encode("ascii"),
 | 
						|
            "GET",
 | 
						|
            path.encode("ascii"),
 | 
						|
            query_bytes=query_bytes,
 | 
						|
            body_callback=body_callback,
 | 
						|
            retry_on_dns_fail=retry_on_dns_fail
 | 
						|
        )
 | 
						|
 | 
						|
        body = yield readBody(response)
 | 
						|
 | 
						|
        defer.returnValue(json.loads(body))
 | 
						|
 | 
						|
    def _getEndpoint(self, reactor, destination):
 | 
						|
        return matrix_federation_endpoint(
 | 
						|
            reactor, destination, timeout=10,
 | 
						|
            ssl_context_factory=self.hs.tls_context_factory
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def _print_ex(e):
 | 
						|
    if hasattr(e, "reasons") and e.reasons:
 | 
						|
        for ex in e.reasons:
 | 
						|
            _print_ex(ex)
 | 
						|
    else:
 | 
						|
        logger.exception(e)
 | 
						|
 | 
						|
 | 
						|
class _JsonProducer(object):
 | 
						|
    """ Used by the twisted http client to create the HTTP body from json
 | 
						|
    """
 | 
						|
    def __init__(self, jsn):
 | 
						|
        self.reset(jsn)
 | 
						|
 | 
						|
    def reset(self, jsn):
 | 
						|
        self.body = encode_canonical_json(jsn)
 | 
						|
        self.length = len(self.body)
 | 
						|
 | 
						|
    def startProducing(self, consumer):
 | 
						|
        consumer.write(self.body)
 | 
						|
        return defer.succeed(None)
 | 
						|
 | 
						|
    def pauseProducing(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def stopProducing(self):
 | 
						|
        pass
 |