315 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			315 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2017 New Vector Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import gc
 | 
						|
import logging
 | 
						|
import signal
 | 
						|
import sys
 | 
						|
import traceback
 | 
						|
 | 
						|
import psutil
 | 
						|
from daemonize import Daemonize
 | 
						|
 | 
						|
from twisted.internet import error, reactor
 | 
						|
from twisted.protocols.tls import TLSMemoryBIOFactory
 | 
						|
 | 
						|
import synapse
 | 
						|
from synapse.app import check_bind_error
 | 
						|
from synapse.crypto import context_factory
 | 
						|
from synapse.util import PreserveLoggingContext
 | 
						|
from synapse.util.rlimit import change_resource_limit
 | 
						|
from synapse.util.versionstring import get_version_string
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
_sighup_callbacks = []
 | 
						|
 | 
						|
 | 
						|
def register_sighup(func):
 | 
						|
    """
 | 
						|
    Register a function to be called when a SIGHUP occurs.
 | 
						|
 | 
						|
    Args:
 | 
						|
        func (function): Function to be called when sent a SIGHUP signal.
 | 
						|
            Will be called with a single argument, the homeserver.
 | 
						|
    """
 | 
						|
    _sighup_callbacks.append(func)
 | 
						|
 | 
						|
 | 
						|
def start_worker_reactor(appname, config):
 | 
						|
    """ Run the reactor in the main process
 | 
						|
 | 
						|
    Daemonizes if necessary, and then configures some resources, before starting
 | 
						|
    the reactor. Pulls configuration from the 'worker' settings in 'config'.
 | 
						|
 | 
						|
    Args:
 | 
						|
        appname (str): application name which will be sent to syslog
 | 
						|
        config (synapse.config.Config): config object
 | 
						|
    """
 | 
						|
 | 
						|
    logger = logging.getLogger(config.worker_app)
 | 
						|
 | 
						|
    start_reactor(
 | 
						|
        appname,
 | 
						|
        soft_file_limit=config.soft_file_limit,
 | 
						|
        gc_thresholds=config.gc_thresholds,
 | 
						|
        pid_file=config.worker_pid_file,
 | 
						|
        daemonize=config.worker_daemonize,
 | 
						|
        cpu_affinity=config.worker_cpu_affinity,
 | 
						|
        print_pidfile=config.print_pidfile,
 | 
						|
        logger=logger,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def start_reactor(
 | 
						|
        appname,
 | 
						|
        soft_file_limit,
 | 
						|
        gc_thresholds,
 | 
						|
        pid_file,
 | 
						|
        daemonize,
 | 
						|
        cpu_affinity,
 | 
						|
        print_pidfile,
 | 
						|
        logger,
 | 
						|
):
 | 
						|
    """ Run the reactor in the main process
 | 
						|
 | 
						|
    Daemonizes if necessary, and then configures some resources, before starting
 | 
						|
    the reactor
 | 
						|
 | 
						|
    Args:
 | 
						|
        appname (str): application name which will be sent to syslog
 | 
						|
        soft_file_limit (int):
 | 
						|
        gc_thresholds:
 | 
						|
        pid_file (str): name of pid file to write to if daemonize is True
 | 
						|
        daemonize (bool): true to run the reactor in a background process
 | 
						|
        cpu_affinity (int|None): cpu affinity mask
 | 
						|
        print_pidfile (bool): whether to print the pid file, if daemonize is True
 | 
						|
        logger (logging.Logger): logger instance to pass to Daemonize
 | 
						|
    """
 | 
						|
 | 
						|
    def run():
 | 
						|
        # make sure that we run the reactor with the sentinel log context,
 | 
						|
        # otherwise other PreserveLoggingContext instances will get confused
 | 
						|
        # and complain when they see the logcontext arbitrarily swapping
 | 
						|
        # between the sentinel and `run` logcontexts.
 | 
						|
        with PreserveLoggingContext():
 | 
						|
            logger.info("Running")
 | 
						|
            if cpu_affinity is not None:
 | 
						|
                # Turn the bitmask into bits, reverse it so we go from 0 up
 | 
						|
                mask_to_bits = bin(cpu_affinity)[2:][::-1]
 | 
						|
 | 
						|
                cpus = []
 | 
						|
                cpu_num = 0
 | 
						|
 | 
						|
                for i in mask_to_bits:
 | 
						|
                    if i == "1":
 | 
						|
                        cpus.append(cpu_num)
 | 
						|
                    cpu_num += 1
 | 
						|
 | 
						|
                p = psutil.Process()
 | 
						|
                p.cpu_affinity(cpus)
 | 
						|
 | 
						|
            change_resource_limit(soft_file_limit)
 | 
						|
            if gc_thresholds:
 | 
						|
                gc.set_threshold(*gc_thresholds)
 | 
						|
            reactor.run()
 | 
						|
 | 
						|
    if daemonize:
 | 
						|
        if print_pidfile:
 | 
						|
            print(pid_file)
 | 
						|
 | 
						|
        daemon = Daemonize(
 | 
						|
            app=appname,
 | 
						|
            pid=pid_file,
 | 
						|
            action=run,
 | 
						|
            auto_close_fds=False,
 | 
						|
            verbose=True,
 | 
						|
            logger=logger,
 | 
						|
        )
 | 
						|
        daemon.start()
 | 
						|
    else:
 | 
						|
        run()
 | 
						|
 | 
						|
 | 
						|
def quit_with_error(error_string):
 | 
						|
    message_lines = error_string.split("\n")
 | 
						|
    line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
 | 
						|
    sys.stderr.write("*" * line_length + '\n')
 | 
						|
    for line in message_lines:
 | 
						|
        sys.stderr.write(" %s\n" % (line.rstrip(),))
 | 
						|
    sys.stderr.write("*" * line_length + '\n')
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def listen_metrics(bind_addresses, port):
 | 
						|
    """
 | 
						|
    Start Prometheus metrics server.
 | 
						|
    """
 | 
						|
    from synapse.metrics import RegistryProxy
 | 
						|
    from prometheus_client import start_http_server
 | 
						|
 | 
						|
    for host in bind_addresses:
 | 
						|
        logger.info("Starting metrics listener on %s:%d", host, port)
 | 
						|
        start_http_server(port, addr=host, registry=RegistryProxy)
 | 
						|
 | 
						|
 | 
						|
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
 | 
						|
    """
 | 
						|
    Create a TCP socket for a port and several addresses
 | 
						|
 | 
						|
    Returns:
 | 
						|
        list[twisted.internet.tcp.Port]: listening for TCP connections
 | 
						|
    """
 | 
						|
    r = []
 | 
						|
    for address in bind_addresses:
 | 
						|
        try:
 | 
						|
            r.append(
 | 
						|
                reactor.listenTCP(
 | 
						|
                    port,
 | 
						|
                    factory,
 | 
						|
                    backlog,
 | 
						|
                    address
 | 
						|
                )
 | 
						|
            )
 | 
						|
        except error.CannotListenError as e:
 | 
						|
            check_bind_error(e, address, bind_addresses)
 | 
						|
 | 
						|
    return r
 | 
						|
 | 
						|
 | 
						|
def listen_ssl(
 | 
						|
    bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
 | 
						|
):
 | 
						|
    """
 | 
						|
    Create an TLS-over-TCP socket for a port and several addresses
 | 
						|
 | 
						|
    Returns:
 | 
						|
        list of twisted.internet.tcp.Port listening for TLS connections
 | 
						|
    """
 | 
						|
    r = []
 | 
						|
    for address in bind_addresses:
 | 
						|
        try:
 | 
						|
            r.append(
 | 
						|
                reactor.listenSSL(
 | 
						|
                    port,
 | 
						|
                    factory,
 | 
						|
                    context_factory,
 | 
						|
                    backlog,
 | 
						|
                    address
 | 
						|
                )
 | 
						|
            )
 | 
						|
        except error.CannotListenError as e:
 | 
						|
            check_bind_error(e, address, bind_addresses)
 | 
						|
 | 
						|
    return r
 | 
						|
 | 
						|
 | 
						|
def refresh_certificate(hs):
 | 
						|
    """
 | 
						|
    Refresh the TLS certificates that Synapse is using by re-reading them from
 | 
						|
    disk and updating the TLS context factories to use them.
 | 
						|
    """
 | 
						|
 | 
						|
    if not hs.config.has_tls_listener():
 | 
						|
        # attempt to reload the certs for the good of the tls_fingerprints
 | 
						|
        hs.config.read_certificate_from_disk(require_cert_and_key=False)
 | 
						|
        return
 | 
						|
 | 
						|
    hs.config.read_certificate_from_disk(require_cert_and_key=True)
 | 
						|
    hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config)
 | 
						|
 | 
						|
    if hs._listening_services:
 | 
						|
        logger.info("Updating context factories...")
 | 
						|
        for i in hs._listening_services:
 | 
						|
            # When you listenSSL, it doesn't make an SSL port but a TCP one with
 | 
						|
            # a TLS wrapping factory around the factory you actually want to get
 | 
						|
            # requests. This factory attribute is public but missing from
 | 
						|
            # Twisted's documentation.
 | 
						|
            if isinstance(i.factory, TLSMemoryBIOFactory):
 | 
						|
                addr = i.getHost()
 | 
						|
                logger.info(
 | 
						|
                    "Replacing TLS context factory on [%s]:%i", addr.host, addr.port,
 | 
						|
                )
 | 
						|
                # We want to replace TLS factories with a new one, with the new
 | 
						|
                # TLS configuration. We do this by reaching in and pulling out
 | 
						|
                # the wrappedFactory, and then re-wrapping it.
 | 
						|
                i.factory = TLSMemoryBIOFactory(
 | 
						|
                    hs.tls_server_context_factory,
 | 
						|
                    False,
 | 
						|
                    i.factory.wrappedFactory
 | 
						|
                )
 | 
						|
        logger.info("Context factories updated.")
 | 
						|
 | 
						|
 | 
						|
def start(hs, listeners=None):
 | 
						|
    """
 | 
						|
    Start a Synapse server or worker.
 | 
						|
 | 
						|
    Args:
 | 
						|
        hs (synapse.server.HomeServer)
 | 
						|
        listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        # Set up the SIGHUP machinery.
 | 
						|
        if hasattr(signal, "SIGHUP"):
 | 
						|
            def handle_sighup(*args, **kwargs):
 | 
						|
                for i in _sighup_callbacks:
 | 
						|
                    i(hs)
 | 
						|
 | 
						|
            signal.signal(signal.SIGHUP, handle_sighup)
 | 
						|
 | 
						|
            register_sighup(refresh_certificate)
 | 
						|
 | 
						|
        # Load the certificate from disk.
 | 
						|
        refresh_certificate(hs)
 | 
						|
 | 
						|
        # It is now safe to start your Synapse.
 | 
						|
        hs.start_listening(listeners)
 | 
						|
        hs.get_datastore().start_profiling()
 | 
						|
 | 
						|
        setup_sentry(hs)
 | 
						|
    except Exception:
 | 
						|
        traceback.print_exc(file=sys.stderr)
 | 
						|
        reactor = hs.get_reactor()
 | 
						|
        if reactor.running:
 | 
						|
            reactor.stop()
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def setup_sentry(hs):
 | 
						|
    """Enable sentry integration, if enabled in configuration
 | 
						|
 | 
						|
    Args:
 | 
						|
        hs (synapse.server.HomeServer)
 | 
						|
    """
 | 
						|
 | 
						|
    if not hs.config.sentry_enabled:
 | 
						|
        return
 | 
						|
 | 
						|
    import sentry_sdk
 | 
						|
    sentry_sdk.init(
 | 
						|
        dsn=hs.config.sentry_dsn,
 | 
						|
        release=get_version_string(synapse),
 | 
						|
    )
 | 
						|
 | 
						|
    # We set some default tags that give some context to this instance
 | 
						|
    with sentry_sdk.configure_scope() as scope:
 | 
						|
        scope.set_tag("matrix_server_name", hs.config.server_name)
 | 
						|
 | 
						|
        app = hs.config.worker_app if hs.config.worker_app else "synapse.app.homeserver"
 | 
						|
        name = hs.config.worker_name if hs.config.worker_name else "master"
 | 
						|
        scope.set_tag("worker_app", app)
 | 
						|
        scope.set_tag("worker_name", name)
 |