386 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			386 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
#!/usr/bin/env python
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2014-2016 OpenMarket Ltd
 | 
						|
# Copyright 2018 New Vector Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import argparse
 | 
						|
import collections
 | 
						|
import errno
 | 
						|
import glob
 | 
						|
import os
 | 
						|
import os.path
 | 
						|
import signal
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
 | 
						|
import yaml
 | 
						|
 | 
						|
from synapse.config import find_config_files
 | 
						|
 | 
						|
SYNAPSE = [sys.executable, "-m", "synapse.app.homeserver"]
 | 
						|
 | 
						|
GREEN = "\x1b[1;32m"
 | 
						|
YELLOW = "\x1b[1;33m"
 | 
						|
RED = "\x1b[1;31m"
 | 
						|
NORMAL = "\x1b[m"
 | 
						|
 | 
						|
 | 
						|
def pid_running(pid):
 | 
						|
    try:
 | 
						|
        os.kill(pid, 0)
 | 
						|
        return True
 | 
						|
    except OSError as err:
 | 
						|
        if err.errno == errno.EPERM:
 | 
						|
            return True
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def write(message, colour=NORMAL, stream=sys.stdout):
 | 
						|
    # Lets check if we're writing to a TTY before colouring
 | 
						|
    should_colour = False
 | 
						|
    try:
 | 
						|
        should_colour = stream.isatty()
 | 
						|
    except AttributeError:
 | 
						|
        # Just in case `isatty` isn't defined on everything. The python
 | 
						|
        # docs are incredibly vague.
 | 
						|
        pass
 | 
						|
 | 
						|
    if not should_colour:
 | 
						|
        stream.write(message + "\n")
 | 
						|
    else:
 | 
						|
        stream.write(colour + message + NORMAL + "\n")
 | 
						|
 | 
						|
 | 
						|
def abort(message, colour=RED, stream=sys.stderr):
 | 
						|
    write(message, colour, stream)
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def start(configfile: str, daemonize: bool = True) -> bool:
 | 
						|
    """Attempts to start synapse.
 | 
						|
    Args:
 | 
						|
        configfile: path to a yaml synapse config file
 | 
						|
        daemonize: whether to daemonize synapse or keep it attached to the current
 | 
						|
            session
 | 
						|
 | 
						|
    Returns:
 | 
						|
        True if the process started successfully
 | 
						|
        False if there was an error starting the process
 | 
						|
 | 
						|
        If deamonize is False it will only return once synapse exits.
 | 
						|
    """
 | 
						|
 | 
						|
    write("Starting ...")
 | 
						|
    args = SYNAPSE
 | 
						|
 | 
						|
    if daemonize:
 | 
						|
        args.extend(["--daemonize", "-c", configfile])
 | 
						|
    else:
 | 
						|
        args.extend(["-c", configfile])
 | 
						|
 | 
						|
    try:
 | 
						|
        subprocess.check_call(args)
 | 
						|
        write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
 | 
						|
        return True
 | 
						|
    except subprocess.CalledProcessError as e:
 | 
						|
        write(
 | 
						|
            "error starting (exit code: %d); see above for logs" % e.returncode,
 | 
						|
            colour=RED,
 | 
						|
        )
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def start_worker(app: str, configfile: str, worker_configfile: str) -> bool:
 | 
						|
    """Attempts to start a synapse worker.
 | 
						|
    Args:
 | 
						|
        app: name of the worker's appservice
 | 
						|
        configfile: path to a yaml synapse config file
 | 
						|
        worker_configfile: path to worker specific yaml synapse file
 | 
						|
 | 
						|
    Returns:
 | 
						|
        True if the process started successfully
 | 
						|
        False if there was an error starting the process
 | 
						|
    """
 | 
						|
 | 
						|
    args = [
 | 
						|
        sys.executable,
 | 
						|
        "-m",
 | 
						|
        app,
 | 
						|
        "-c",
 | 
						|
        configfile,
 | 
						|
        "-c",
 | 
						|
        worker_configfile,
 | 
						|
        "--daemonize",
 | 
						|
    ]
 | 
						|
 | 
						|
    try:
 | 
						|
        subprocess.check_call(args)
 | 
						|
        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
 | 
						|
        return True
 | 
						|
    except subprocess.CalledProcessError as e:
 | 
						|
        write(
 | 
						|
            "error starting %s(%r) (exit code: %d); see above for logs"
 | 
						|
            % (app, worker_configfile, e.returncode),
 | 
						|
            colour=RED,
 | 
						|
        )
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def stop(pidfile: str, app: str) -> bool:
 | 
						|
    """Attempts to kill a synapse worker from the pidfile.
 | 
						|
    Args:
 | 
						|
        pidfile: path to file containing worker's pid
 | 
						|
        app: name of the worker's appservice
 | 
						|
 | 
						|
    Returns:
 | 
						|
        True if the process stopped successfully
 | 
						|
        False if process was already stopped or an error occured
 | 
						|
    """
 | 
						|
 | 
						|
    if os.path.exists(pidfile):
 | 
						|
        pid = int(open(pidfile).read())
 | 
						|
        try:
 | 
						|
            os.kill(pid, signal.SIGTERM)
 | 
						|
            write("stopped %s" % (app,), colour=GREEN)
 | 
						|
            return True
 | 
						|
        except OSError as err:
 | 
						|
            if err.errno == errno.ESRCH:
 | 
						|
                write("%s not running" % (app,), colour=YELLOW)
 | 
						|
            elif err.errno == errno.EPERM:
 | 
						|
                abort("Cannot stop %s: Operation not permitted" % (app,))
 | 
						|
            else:
 | 
						|
                abort("Cannot stop %s: Unknown error" % (app,))
 | 
						|
            return False
 | 
						|
    else:
 | 
						|
        write(
 | 
						|
            "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)"
 | 
						|
            % (app, pidfile),
 | 
						|
            colour=YELLOW,
 | 
						|
        )
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
Worker = collections.namedtuple(
 | 
						|
    "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"]
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        "action",
 | 
						|
        choices=["start", "stop", "restart"],
 | 
						|
        help="whether to start, stop or restart the synapse",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "configfile",
 | 
						|
        nargs="?",
 | 
						|
        default="homeserver.yaml",
 | 
						|
        help="the homeserver config file. Defaults to homeserver.yaml. May also be"
 | 
						|
        " a directory with *.yaml files",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-a",
 | 
						|
        "--all-processes",
 | 
						|
        metavar="WORKERCONFIGDIR",
 | 
						|
        help="start or stop all the workers in the given directory"
 | 
						|
        " and the main synapse process",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--no-daemonize",
 | 
						|
        action="store_false",
 | 
						|
        dest="daemonize",
 | 
						|
        help="Run synapse in the foreground for debugging. "
 | 
						|
        "Will work only if the daemonize option is not set in the config.",
 | 
						|
    )
 | 
						|
 | 
						|
    options = parser.parse_args()
 | 
						|
 | 
						|
    if options.worker and options.all_processes:
 | 
						|
        write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr)
 | 
						|
        sys.exit(1)
 | 
						|
    if not options.daemonize and options.all_processes:
 | 
						|
        write('Cannot use "--no-daemonize" with "--all-processes"', stream=sys.stderr)
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    configfile = options.configfile
 | 
						|
 | 
						|
    if not os.path.exists(configfile):
 | 
						|
        write(
 | 
						|
            "No config file found\n"
 | 
						|
            "To generate a config file, run '%s -c %s --generate-config"
 | 
						|
            " --server-name=<server name> --report-stats=<yes/no>'\n"
 | 
						|
            % (" ".join(SYNAPSE), options.configfile),
 | 
						|
            stream=sys.stderr,
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    config_files = find_config_files([configfile])
 | 
						|
    config = {}
 | 
						|
    for config_file in config_files:
 | 
						|
        with open(config_file) as file_stream:
 | 
						|
            yaml_config = yaml.safe_load(file_stream)
 | 
						|
        if yaml_config is not None:
 | 
						|
            config.update(yaml_config)
 | 
						|
 | 
						|
    pidfile = config["pid_file"]
 | 
						|
    cache_factor = config.get("synctl_cache_factor")
 | 
						|
    start_stop_synapse = True
 | 
						|
 | 
						|
    if cache_factor:
 | 
						|
        os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 | 
						|
 | 
						|
    cache_factors = config.get("synctl_cache_factors", {})
 | 
						|
    for cache_name, factor in cache_factors.items():
 | 
						|
        os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
 | 
						|
 | 
						|
    worker_configfiles = []
 | 
						|
    if options.worker:
 | 
						|
        start_stop_synapse = False
 | 
						|
        worker_configfile = options.worker
 | 
						|
        if not os.path.exists(worker_configfile):
 | 
						|
            write(
 | 
						|
                "No worker config found at %r" % (worker_configfile,), stream=sys.stderr
 | 
						|
            )
 | 
						|
            sys.exit(1)
 | 
						|
        worker_configfiles.append(worker_configfile)
 | 
						|
 | 
						|
    if options.all_processes:
 | 
						|
        # To start the main synapse with -a you need to add a worker file
 | 
						|
        # with worker_app == "synapse.app.homeserver"
 | 
						|
        start_stop_synapse = False
 | 
						|
        worker_configdir = options.all_processes
 | 
						|
        if not os.path.isdir(worker_configdir):
 | 
						|
            write(
 | 
						|
                "No worker config directory found at %r" % (worker_configdir,),
 | 
						|
                stream=sys.stderr,
 | 
						|
            )
 | 
						|
            sys.exit(1)
 | 
						|
        worker_configfiles.extend(
 | 
						|
            sorted(glob.glob(os.path.join(worker_configdir, "*.yaml")))
 | 
						|
        )
 | 
						|
 | 
						|
    workers = []
 | 
						|
    for worker_configfile in worker_configfiles:
 | 
						|
        with open(worker_configfile) as stream:
 | 
						|
            worker_config = yaml.safe_load(stream)
 | 
						|
        worker_app = worker_config["worker_app"]
 | 
						|
        if worker_app == "synapse.app.homeserver":
 | 
						|
            # We need to special case all of this to pick up options that may
 | 
						|
            # be set in the main config file or in this worker config file.
 | 
						|
            worker_pidfile = worker_config.get("pid_file") or pidfile
 | 
						|
            worker_cache_factor = (
 | 
						|
                worker_config.get("synctl_cache_factor") or cache_factor
 | 
						|
            )
 | 
						|
            worker_cache_factors = (
 | 
						|
                worker_config.get("synctl_cache_factors") or cache_factors
 | 
						|
            )
 | 
						|
            # The master process doesn't support using worker_* config.
 | 
						|
            for key in worker_config:
 | 
						|
                if key == "worker_app":  # But we allow worker_app
 | 
						|
                    continue
 | 
						|
                assert not key.startswith(
 | 
						|
                    "worker_"
 | 
						|
                ), "Main process cannot use worker_* config"
 | 
						|
        else:
 | 
						|
            worker_pidfile = worker_config["worker_pid_file"]
 | 
						|
            worker_cache_factor = worker_config.get("synctl_cache_factor")
 | 
						|
            worker_cache_factors = worker_config.get("synctl_cache_factors", {})
 | 
						|
        workers.append(
 | 
						|
            Worker(
 | 
						|
                worker_app,
 | 
						|
                worker_configfile,
 | 
						|
                worker_pidfile,
 | 
						|
                worker_cache_factor,
 | 
						|
                worker_cache_factors,
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    action = options.action
 | 
						|
 | 
						|
    if action == "stop" or action == "restart":
 | 
						|
        has_stopped = True
 | 
						|
        for worker in workers:
 | 
						|
            if not stop(worker.pidfile, worker.app):
 | 
						|
                # A worker could not be stopped.
 | 
						|
                has_stopped = False
 | 
						|
 | 
						|
        if start_stop_synapse:
 | 
						|
            if not stop(pidfile, "synapse.app.homeserver"):
 | 
						|
                has_stopped = False
 | 
						|
        if not has_stopped and action == "stop":
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
    # Wait for synapse to actually shutdown before starting it again
 | 
						|
    if action == "restart":
 | 
						|
        running_pids = []
 | 
						|
        if start_stop_synapse and os.path.exists(pidfile):
 | 
						|
            running_pids.append(int(open(pidfile).read()))
 | 
						|
        for worker in workers:
 | 
						|
            if os.path.exists(worker.pidfile):
 | 
						|
                running_pids.append(int(open(worker.pidfile).read()))
 | 
						|
        if len(running_pids) > 0:
 | 
						|
            write("Waiting for process to exit before restarting...")
 | 
						|
            for running_pid in running_pids:
 | 
						|
                while pid_running(running_pid):
 | 
						|
                    time.sleep(0.2)
 | 
						|
            write("All processes exited; now restarting...")
 | 
						|
 | 
						|
    if action == "start" or action == "restart":
 | 
						|
        error = False
 | 
						|
        if start_stop_synapse:
 | 
						|
            # Check if synapse is already running
 | 
						|
            if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
 | 
						|
                abort("synapse.app.homeserver already running")
 | 
						|
 | 
						|
            if not start(configfile, bool(options.daemonize)):
 | 
						|
                error = True
 | 
						|
 | 
						|
        for worker in workers:
 | 
						|
            env = os.environ.copy()
 | 
						|
 | 
						|
            # Skip starting a worker if its already running
 | 
						|
            if os.path.exists(worker.pidfile) and pid_running(
 | 
						|
                int(open(worker.pidfile).read())
 | 
						|
            ):
 | 
						|
                print(worker.app + " already running")
 | 
						|
                continue
 | 
						|
 | 
						|
            if worker.cache_factor:
 | 
						|
                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
 | 
						|
 | 
						|
            for cache_name, factor in worker.cache_factors.items():
 | 
						|
                os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
 | 
						|
 | 
						|
            if not start_worker(worker.app, configfile, worker.configfile):
 | 
						|
                error = True
 | 
						|
 | 
						|
            # Reset env back to the original
 | 
						|
            os.environ.clear()
 | 
						|
            os.environ.update(env)
 | 
						|
 | 
						|
        if error:
 | 
						|
            exit(1)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |