244 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			244 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
# Copyright 2014-2016 OpenMarket Ltd
 | 
						|
# Copyright 2019 New Vector Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import logging
 | 
						|
 | 
						|
from service_identity import VerificationError
 | 
						|
from service_identity.pyopenssl import verify_hostname, verify_ip_address
 | 
						|
from zope.interface import implementer
 | 
						|
 | 
						|
from OpenSSL import SSL, crypto
 | 
						|
from twisted.internet._sslverify import _defaultCurveName
 | 
						|
from twisted.internet.abstract import isIPAddress, isIPv6Address
 | 
						|
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
 | 
						|
from twisted.internet.ssl import (
 | 
						|
    CertificateOptions,
 | 
						|
    ContextFactory,
 | 
						|
    TLSVersion,
 | 
						|
    platformTrust,
 | 
						|
)
 | 
						|
from twisted.python.failure import Failure
 | 
						|
from twisted.web.iweb import IPolicyForHTTPS
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
_TLS_VERSION_MAP = {
 | 
						|
    "1": TLSVersion.TLSv1_0,
 | 
						|
    "1.1": TLSVersion.TLSv1_1,
 | 
						|
    "1.2": TLSVersion.TLSv1_2,
 | 
						|
    "1.3": TLSVersion.TLSv1_3,
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class ServerContextFactory(ContextFactory):
 | 
						|
    """Factory for PyOpenSSL SSL contexts that are used to handle incoming
 | 
						|
    connections."""
 | 
						|
 | 
						|
    def __init__(self, config):
 | 
						|
        self._context = SSL.Context(SSL.SSLv23_METHOD)
 | 
						|
        self.configure_context(self._context, config)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def configure_context(context, config):
 | 
						|
        try:
 | 
						|
            _ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
 | 
						|
            context.set_tmp_ecdh(_ecCurve)
 | 
						|
        except Exception:
 | 
						|
            logger.exception("Failed to enable elliptic curve for TLS")
 | 
						|
 | 
						|
        context.set_options(
 | 
						|
            SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
 | 
						|
        )
 | 
						|
        context.use_certificate_chain_file(config.tls_certificate_file)
 | 
						|
        context.use_privatekey(config.tls_private_key)
 | 
						|
 | 
						|
        # https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
 | 
						|
        context.set_cipher_list(
 | 
						|
            "ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
 | 
						|
        )
 | 
						|
 | 
						|
    def getContext(self):
 | 
						|
        return self._context
 | 
						|
 | 
						|
 | 
						|
@implementer(IPolicyForHTTPS)
 | 
						|
class FederationPolicyForHTTPS(object):
 | 
						|
    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
 | 
						|
    to remote servers for federation.
 | 
						|
 | 
						|
    Uses one of two OpenSSL context objects for all connections, depending on whether
 | 
						|
    we should do SSL certificate verification.
 | 
						|
 | 
						|
    get_options decides whether we should do SSL certificate verification and
 | 
						|
    constructs an SSLClientConnectionCreator factory accordingly.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, config):
 | 
						|
        self._config = config
 | 
						|
 | 
						|
        # Check if we're using a custom list of a CA certificates
 | 
						|
        trust_root = config.federation_ca_trust_root
 | 
						|
        if trust_root is None:
 | 
						|
            # Use CA root certs provided by OpenSSL
 | 
						|
            trust_root = platformTrust()
 | 
						|
 | 
						|
        # "insecurelyLowerMinimumTo" is the argument that will go lower than
 | 
						|
        # Twisted's default, which is why it is marked as "insecure" (since
 | 
						|
        # Twisted's defaults are reasonably secure). But, since Twisted is
 | 
						|
        # moving to TLS 1.2 by default, we want to respect the config option if
 | 
						|
        # it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
 | 
						|
        # let us do).
 | 
						|
        minTLS = _TLS_VERSION_MAP[config.federation_client_minimum_tls_version]
 | 
						|
 | 
						|
        _verify_ssl = CertificateOptions(
 | 
						|
            trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
 | 
						|
        )
 | 
						|
        self._verify_ssl_context = _verify_ssl.getContext()
 | 
						|
        self._verify_ssl_context.set_info_callback(_context_info_cb)
 | 
						|
 | 
						|
        _no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
 | 
						|
        self._no_verify_ssl_context = _no_verify_ssl.getContext()
 | 
						|
        self._no_verify_ssl_context.set_info_callback(_context_info_cb)
 | 
						|
 | 
						|
    def get_options(self, host: bytes):
 | 
						|
 | 
						|
        # IPolicyForHTTPS.get_options takes bytes, but we want to compare
 | 
						|
        # against the str whitelist. The hostnames in the whitelist are already
 | 
						|
        # IDNA-encoded like the hosts will be here.
 | 
						|
        ascii_host = host.decode("ascii")
 | 
						|
 | 
						|
        # Check if certificate verification has been enabled
 | 
						|
        should_verify = self._config.federation_verify_certificates
 | 
						|
 | 
						|
        # Check if we've disabled certificate verification for this host
 | 
						|
        if should_verify:
 | 
						|
            for regex in self._config.federation_certificate_verification_whitelist:
 | 
						|
                if regex.match(ascii_host):
 | 
						|
                    should_verify = False
 | 
						|
                    break
 | 
						|
 | 
						|
        ssl_context = (
 | 
						|
            self._verify_ssl_context if should_verify else self._no_verify_ssl_context
 | 
						|
        )
 | 
						|
 | 
						|
        return SSLClientConnectionCreator(host, ssl_context, should_verify)
 | 
						|
 | 
						|
    def creatorForNetloc(self, hostname, port):
 | 
						|
        """Implements the IPolicyForHTTPS interace so that this can be passed
 | 
						|
        directly to agents.
 | 
						|
        """
 | 
						|
        return self.get_options(hostname)
 | 
						|
 | 
						|
 | 
						|
@implementer(IPolicyForHTTPS)
 | 
						|
class RegularPolicyForHTTPS(object):
 | 
						|
    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
 | 
						|
    to remote servers, for other than federation.
 | 
						|
 | 
						|
    Always uses the same OpenSSL context object, which uses the default OpenSSL CA
 | 
						|
    trust root.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        trust_root = platformTrust()
 | 
						|
        self._ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
 | 
						|
        self._ssl_context.set_info_callback(_context_info_cb)
 | 
						|
 | 
						|
    def creatorForNetloc(self, hostname, port):
 | 
						|
        return SSLClientConnectionCreator(hostname, self._ssl_context, True)
 | 
						|
 | 
						|
 | 
						|
def _context_info_cb(ssl_connection, where, ret):
 | 
						|
    """The 'information callback' for our openssl context objects.
 | 
						|
 | 
						|
    Note: Once this is set as the info callback on a Context object, the Context should
 | 
						|
    only be used with the SSLClientConnectionCreator.
 | 
						|
    """
 | 
						|
    # we assume that the app_data on the connection object has been set to
 | 
						|
    # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
 | 
						|
    tls_protocol = ssl_connection.get_app_data()
 | 
						|
    try:
 | 
						|
        # ... we further assume that SSLClientConnectionCreator has set the
 | 
						|
        # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
 | 
						|
        tls_protocol._synapse_tls_verifier.verify_context_info_cb(ssl_connection, where)
 | 
						|
    except:  # noqa: E722, taken from the twisted implementation
 | 
						|
        logger.exception("Error during info_callback")
 | 
						|
        f = Failure()
 | 
						|
        tls_protocol.failVerification(f)
 | 
						|
 | 
						|
 | 
						|
@implementer(IOpenSSLClientConnectionCreator)
 | 
						|
class SSLClientConnectionCreator(object):
 | 
						|
    """Creates openssl connection objects for client connections.
 | 
						|
 | 
						|
    Replaces twisted.internet.ssl.ClientTLSOptions
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, hostname: bytes, ctx, verify_certs: bool):
 | 
						|
        self._ctx = ctx
 | 
						|
        self._verifier = ConnectionVerifier(hostname, verify_certs)
 | 
						|
 | 
						|
    def clientConnectionForTLS(self, tls_protocol):
 | 
						|
        context = self._ctx
 | 
						|
        connection = SSL.Connection(context, None)
 | 
						|
 | 
						|
        # as per twisted.internet.ssl.ClientTLSOptions, we set the application
 | 
						|
        # data to our TLSMemoryBIOProtocol...
 | 
						|
        connection.set_app_data(tls_protocol)
 | 
						|
 | 
						|
        # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
 | 
						|
        # tls_protocol so that the SSL context's info callback has something to
 | 
						|
        # call to do the cert verification.
 | 
						|
        setattr(tls_protocol, "_synapse_tls_verifier", self._verifier)
 | 
						|
        return connection
 | 
						|
 | 
						|
 | 
						|
class ConnectionVerifier(object):
 | 
						|
    """Set the SNI, and do cert verification
 | 
						|
 | 
						|
    This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
 | 
						|
    the ssl context's info callback.
 | 
						|
    """
 | 
						|
 | 
						|
    # This code is based on twisted.internet.ssl.ClientTLSOptions.
 | 
						|
 | 
						|
    def __init__(self, hostname: bytes, verify_certs):
 | 
						|
        self._verify_certs = verify_certs
 | 
						|
 | 
						|
        _decoded = hostname.decode("ascii")
 | 
						|
        if isIPAddress(_decoded) or isIPv6Address(_decoded):
 | 
						|
            self._is_ip_address = True
 | 
						|
        else:
 | 
						|
            self._is_ip_address = False
 | 
						|
 | 
						|
        self._hostnameBytes = hostname
 | 
						|
        self._hostnameASCII = self._hostnameBytes.decode("ascii")
 | 
						|
 | 
						|
    def verify_context_info_cb(self, ssl_connection, where):
 | 
						|
        if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
 | 
						|
            ssl_connection.set_tlsext_host_name(self._hostnameBytes)
 | 
						|
 | 
						|
        if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
 | 
						|
            try:
 | 
						|
                if self._is_ip_address:
 | 
						|
                    verify_ip_address(ssl_connection, self._hostnameASCII)
 | 
						|
                else:
 | 
						|
                    verify_hostname(ssl_connection, self._hostnameASCII)
 | 
						|
            except VerificationError:
 | 
						|
                f = Failure()
 | 
						|
                tls_protocol = ssl_connection.get_app_data()
 | 
						|
                tls_protocol.failVerification(f)
 |