216 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			216 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
# Copyright 2014-2016 OpenMarket Ltd
 | 
						|
# Copyright 2019 New Vector Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import logging
 | 
						|
 | 
						|
import idna
 | 
						|
from service_identity import VerificationError
 | 
						|
from service_identity.pyopenssl import verify_hostname, verify_ip_address
 | 
						|
from zope.interface import implementer
 | 
						|
 | 
						|
from OpenSSL import SSL, crypto
 | 
						|
from twisted.internet._sslverify import _defaultCurveName
 | 
						|
from twisted.internet.abstract import isIPAddress, isIPv6Address
 | 
						|
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
 | 
						|
from twisted.internet.ssl import (
 | 
						|
    CertificateOptions,
 | 
						|
    ContextFactory,
 | 
						|
    TLSVersion,
 | 
						|
    platformTrust,
 | 
						|
)
 | 
						|
from twisted.python.failure import Failure
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
_TLS_VERSION_MAP = {
 | 
						|
    "1": TLSVersion.TLSv1_0,
 | 
						|
    "1.1": TLSVersion.TLSv1_1,
 | 
						|
    "1.2": TLSVersion.TLSv1_2,
 | 
						|
    "1.3": TLSVersion.TLSv1_3,
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class ServerContextFactory(ContextFactory):
 | 
						|
    """Factory for PyOpenSSL SSL contexts that are used to handle incoming
 | 
						|
    connections."""
 | 
						|
 | 
						|
    def __init__(self, config):
 | 
						|
        self._context = SSL.Context(SSL.SSLv23_METHOD)
 | 
						|
        self.configure_context(self._context, config)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def configure_context(context, config):
 | 
						|
        try:
 | 
						|
            _ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
 | 
						|
            context.set_tmp_ecdh(_ecCurve)
 | 
						|
        except Exception:
 | 
						|
            logger.exception("Failed to enable elliptic curve for TLS")
 | 
						|
 | 
						|
        context.set_options(
 | 
						|
            SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
 | 
						|
        )
 | 
						|
        context.use_certificate_chain_file(config.tls_certificate_file)
 | 
						|
        context.use_privatekey(config.tls_private_key)
 | 
						|
 | 
						|
        # https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
 | 
						|
        context.set_cipher_list(
 | 
						|
            "ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
 | 
						|
        )
 | 
						|
 | 
						|
    def getContext(self):
 | 
						|
        return self._context
 | 
						|
 | 
						|
 | 
						|
class ClientTLSOptionsFactory(object):
 | 
						|
    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
 | 
						|
    to remote servers for federation.
 | 
						|
 | 
						|
    Uses one of two OpenSSL context objects for all connections, depending on whether
 | 
						|
    we should do SSL certificate verification.
 | 
						|
 | 
						|
    get_options decides whether we should do SSL certificate verification and
 | 
						|
    constructs an SSLClientConnectionCreator factory accordingly.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, config):
 | 
						|
        self._config = config
 | 
						|
 | 
						|
        # Check if we're using a custom list of a CA certificates
 | 
						|
        trust_root = config.federation_ca_trust_root
 | 
						|
        if trust_root is None:
 | 
						|
            # Use CA root certs provided by OpenSSL
 | 
						|
            trust_root = platformTrust()
 | 
						|
 | 
						|
        # "insecurelyLowerMinimumTo" is the argument that will go lower than
 | 
						|
        # Twisted's default, which is why it is marked as "insecure" (since
 | 
						|
        # Twisted's defaults are reasonably secure). But, since Twisted is
 | 
						|
        # moving to TLS 1.2 by default, we want to respect the config option if
 | 
						|
        # it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
 | 
						|
        # let us do).
 | 
						|
        minTLS = _TLS_VERSION_MAP[config.federation_client_minimum_tls_version]
 | 
						|
 | 
						|
        self._verify_ssl = CertificateOptions(
 | 
						|
            trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
 | 
						|
        )
 | 
						|
        self._verify_ssl_context = self._verify_ssl.getContext()
 | 
						|
        self._verify_ssl_context.set_info_callback(self._context_info_cb)
 | 
						|
 | 
						|
        self._no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
 | 
						|
        self._no_verify_ssl_context = self._no_verify_ssl.getContext()
 | 
						|
        self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
 | 
						|
 | 
						|
    def get_options(self, host):
 | 
						|
        # Check if certificate verification has been enabled
 | 
						|
        should_verify = self._config.federation_verify_certificates
 | 
						|
 | 
						|
        # Check if we've disabled certificate verification for this host
 | 
						|
        if should_verify:
 | 
						|
            for regex in self._config.federation_certificate_verification_whitelist:
 | 
						|
                if regex.match(host):
 | 
						|
                    should_verify = False
 | 
						|
                    break
 | 
						|
 | 
						|
        ssl_context = (
 | 
						|
            self._verify_ssl_context if should_verify else self._no_verify_ssl_context
 | 
						|
        )
 | 
						|
 | 
						|
        return SSLClientConnectionCreator(host, ssl_context, should_verify)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _context_info_cb(ssl_connection, where, ret):
 | 
						|
        """The 'information callback' for our openssl context object."""
 | 
						|
        # we assume that the app_data on the connection object has been set to
 | 
						|
        # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
 | 
						|
        tls_protocol = ssl_connection.get_app_data()
 | 
						|
        try:
 | 
						|
            # ... we further assume that SSLClientConnectionCreator has set the
 | 
						|
            # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
 | 
						|
            tls_protocol._synapse_tls_verifier.verify_context_info_cb(
 | 
						|
                ssl_connection, where
 | 
						|
            )
 | 
						|
        except:  # noqa: E722, taken from the twisted implementation
 | 
						|
            logger.exception("Error during info_callback")
 | 
						|
            f = Failure()
 | 
						|
            tls_protocol.failVerification(f)
 | 
						|
 | 
						|
 | 
						|
@implementer(IOpenSSLClientConnectionCreator)
 | 
						|
class SSLClientConnectionCreator(object):
 | 
						|
    """Creates openssl connection objects for client connections.
 | 
						|
 | 
						|
    Replaces twisted.internet.ssl.ClientTLSOptions
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, hostname, ctx, verify_certs):
 | 
						|
        self._ctx = ctx
 | 
						|
        self._verifier = ConnectionVerifier(hostname, verify_certs)
 | 
						|
 | 
						|
    def clientConnectionForTLS(self, tls_protocol):
 | 
						|
        context = self._ctx
 | 
						|
        connection = SSL.Connection(context, None)
 | 
						|
 | 
						|
        # as per twisted.internet.ssl.ClientTLSOptions, we set the application
 | 
						|
        # data to our TLSMemoryBIOProtocol...
 | 
						|
        connection.set_app_data(tls_protocol)
 | 
						|
 | 
						|
        # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
 | 
						|
        # tls_protocol so that the SSL context's info callback has something to
 | 
						|
        # call to do the cert verification.
 | 
						|
        setattr(tls_protocol, "_synapse_tls_verifier", self._verifier)
 | 
						|
        return connection
 | 
						|
 | 
						|
 | 
						|
class ConnectionVerifier(object):
 | 
						|
    """Set the SNI, and do cert verification
 | 
						|
 | 
						|
    This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
 | 
						|
    the ssl context's info callback.
 | 
						|
    """
 | 
						|
 | 
						|
    # This code is based on twisted.internet.ssl.ClientTLSOptions.
 | 
						|
 | 
						|
    def __init__(self, hostname, verify_certs):
 | 
						|
        self._verify_certs = verify_certs
 | 
						|
 | 
						|
        if isIPAddress(hostname) or isIPv6Address(hostname):
 | 
						|
            self._hostnameBytes = hostname.encode("ascii")
 | 
						|
            self._is_ip_address = True
 | 
						|
        else:
 | 
						|
            # twisted's ClientTLSOptions falls back to the stdlib impl here if
 | 
						|
            # idna is not installed, but points out that lacks support for
 | 
						|
            # IDNA2008 (http://bugs.python.org/issue17305).
 | 
						|
            #
 | 
						|
            # We can rely on having idna.
 | 
						|
            self._hostnameBytes = idna.encode(hostname)
 | 
						|
            self._is_ip_address = False
 | 
						|
 | 
						|
        self._hostnameASCII = self._hostnameBytes.decode("ascii")
 | 
						|
 | 
						|
    def verify_context_info_cb(self, ssl_connection, where):
 | 
						|
        if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
 | 
						|
            ssl_connection.set_tlsext_host_name(self._hostnameBytes)
 | 
						|
 | 
						|
        if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
 | 
						|
            try:
 | 
						|
                if self._is_ip_address:
 | 
						|
                    verify_ip_address(ssl_connection, self._hostnameASCII)
 | 
						|
                else:
 | 
						|
                    verify_hostname(ssl_connection, self._hostnameASCII)
 | 
						|
            except VerificationError:
 | 
						|
                f = Failure()
 | 
						|
                tls_protocol = ssl_connection.get_app_data()
 | 
						|
                tls_protocol.failVerification(f)
 |