Inline the synchrotron and pusher configs into the main config

pull/874/head
Mark Haines 2016-06-16 11:06:12 +01:00
parent e9892b4b26
commit 885ee861f7
5 changed files with 144 additions and 281 deletions

View File

@ -18,10 +18,9 @@ import synapse
from synapse.server import HomeServer
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.config.emailconfig import EmailConfig
from synapse.config.key import KeyConfig
from synapse.config.workers import clobber_with_worker_config
from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.roommember import RoomMemberStore
@ -43,98 +42,12 @@ from twisted.web.resource import Resource
from daemonize import Daemonize
import gc
import sys
import logging
logger = logging.getLogger("synapse.app.pusher")
class SlaveConfig(DatabaseConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
"use_insecure_ssl_client_just_for_testing_do_not_use", False
)
self.user_agent_suffix = None
self.start_pushers = True
self.listeners = config["listeners"]
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
self.public_baseurl = config["public_baseurl"]
thresholds = config.get("gc_thresholds", None)
if thresholds is not None:
try:
assert len(thresholds) == 3
self.gc_thresholds = (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)
else:
self.gc_thresholds = None
# some things used by the auth handler but not actually used in the
# pusher codebase
self.bcrypt_rounds = None
self.ldap_enabled = None
self.ldap_server = None
self.ldap_port = None
self.ldap_tls = None
self.ldap_search_base = None
self.ldap_search_property = None
self.ldap_email_property = None
self.ldap_full_name_property = None
# We would otherwise try to use the registration shared secret as the
# macaroon shared secret if there was no macaroon_shared_secret, but
# that means pulling in RegistrationConfig too. We don't need to be
# backwards compaitible in the pusher codebase so just make people set
# macaroon_shared_secret. We set this to None to prevent it referencing
# an undefined key.
self.registration_shared_secret = None
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid")
return """\
# Slave configuration
# The replication listener on the synapse to talk to.
#replication_url: https://localhost:{replication_port}/_synapse/replication
server_name: "%(server_name)s"
listeners: []
# Enable a ssh manhole listener on the pusher.
# - type: manhole
# port: {manhole_port}
# bind_address: 127.0.0.1
# Enable a metric listener on the pusher.
# - type: http
# port: {metrics_port}
# bind_address: 127.0.0.1
# resources:
# - names: ["metrics"]
# compress: False
report_stats: False
daemonize: False
pid_file: %(pid_file)s
""" % locals()
class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
pass
class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
SlavedAccountDataStore
@ -232,8 +145,8 @@ class PusherServer(HomeServer):
)
logger.info("Synapse pusher now listening on port %d", port)
def start_listening(self):
for listener in self.config.listeners:
def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
@ -329,19 +242,32 @@ class PusherServer(HomeServer):
yield sleep(30)
def setup(config_options):
def setup(worker_name, config_options):
try:
config = PusherSlaveConfig.load_config(
config = HomeServerConfig.load_config(
"Synapse pusher", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
if not config:
sys.exit(0)
worker_config = config.workers[worker_name]
config.setup_logging()
setup_logging(worker_config.log_config, worker_config.log_file)
clobber_with_worker_config(config, worker_config)
if config.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``start_pushers: false`` to the main config"
"\n"
)
sys.exit(1)
# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True
database_engine = create_engine(config.database_config)
@ -354,11 +280,15 @@ def setup(config_options):
)
ps.setup()
ps.start_listening()
ps.start_listening(worker_config.listeners)
change_resource_limit(ps.config.soft_file_limit)
if ps.config.gc_thresholds:
gc.set_threshold(*ps.config.gc_thresholds)
def run():
with LoggingContext("run"):
logger.info("Running")
change_resource_limit(worker_config.soft_file_limit)
if worker_config.gc_thresholds:
ps.set_threshold(worker_config.gc_thresholds)
reactor.run()
def start():
ps.replicate()
@ -367,30 +297,21 @@ def setup(config_options):
reactor.callWhenRunning(start)
return ps
if worker_config.daemonize:
daemon = Daemonize(
app="synapse-pusher",
pid=worker_config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
run()
if __name__ == '__main__':
with LoggingContext("main"):
ps = setup(sys.argv[1:])
if ps.config.daemonize:
def run():
with LoggingContext("run"):
change_resource_limit(ps.config.soft_file_limit)
if ps.config.gc_thresholds:
gc.set_threshold(*ps.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
app="synapse-pusher",
pid=ps.config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
reactor.run()
worker_name = sys.argv[1]
ps = setup(worker_name, sys.argv[2:])

View File

@ -18,9 +18,9 @@ import synapse
from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.config.appservice import AppServiceConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.config.workers import clobber_with_worker_config
from synapse.events import FrozenEvent
from synapse.handlers.presence import PresenceHandler
from synapse.http.site import SynapseSite
@ -57,76 +57,11 @@ from daemonize import Daemonize
import sys
import logging
import contextlib
import gc
import ujson as json
logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
"use_insecure_ssl_client_just_for_testing_do_not_use", False
)
self.user_agent_suffix = None
self.listeners = config["listeners"]
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
self.macaroon_secret_key = config["macaroon_secret_key"]
self.expire_access_token = config.get("expire_access_token", False)
thresholds = config.get("gc_thresholds", None)
if thresholds is not None:
try:
assert len(thresholds) == 3
self.gc_thresholds = (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)
else:
self.gc_thresholds = None
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("synchroton.pid")
return """\
# Slave configuration
# The replication listener on the synapse to talk to.
#replication_url: https://localhost:{replication_port}/_synapse/replication
server_name: "%(server_name)s"
listeners:
# Enable a /sync listener on the synchrontron
#- type: http
# port: {http_port}
# bind_address: ""
# Enable a ssh manhole listener on the synchrotron
# - type: manhole
# port: {manhole_port}
# bind_address: 127.0.0.1
# Enable a metric listener on the synchrotron
# - type: http
# port: {metrics_port}
# bind_address: 127.0.0.1
# resources:
# - names: ["metrics"]
# compress: False
report_stats: False
daemonize: False
pid_file: %(pid_file)s
""" % locals()
class SynchrotronSlavedStore(
SlavedPushRuleStore,
SlavedEventStore,
@ -350,8 +285,8 @@ class SynchrotronServer(HomeServer):
)
logger.info("Synapse synchrotron now listening on port %d", port)
def start_listening(self):
for listener in self.config.listeners:
def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
@ -470,19 +405,20 @@ class SynchrotronServer(HomeServer):
return SynchrotronTyping(self)
def setup(config_options):
def start(worker_name, config_options):
try:
config = SynchrotronConfig.load_config(
config = HomeServerConfig.load_config(
"Synapse synchrotron", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
if not config:
sys.exit(0)
worker_config = config.workers[worker_name]
config.setup_logging()
setup_logging(worker_config.log_config, worker_config.log_file)
clobber_with_worker_config(config, worker_config)
database_engine = create_engine(config.database_config)
@ -496,11 +432,15 @@ def setup(config_options):
)
ss.setup()
ss.start_listening()
ss.start_listening(worker_config.listeners)
change_resource_limit(ss.config.soft_file_limit)
if ss.config.gc_thresholds:
ss.set_threshold(*ss.config.gc_thresholds)
def run():
with LoggingContext("run"):
logger.info("Running")
change_resource_limit(worker_config.soft_file_limit)
if worker_config.gc_thresholds:
ss.set_threshold(worker_config.gc_thresholds)
reactor.run()
def start():
ss.get_datastore().start_profiling()
@ -508,30 +448,21 @@ def setup(config_options):
reactor.callWhenRunning(start)
return ss
if worker_config.daemonize:
daemon = Daemonize(
app="synapse-synchrotron",
pid=worker_config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
run()
if __name__ == '__main__':
with LoggingContext("main"):
ss = setup(sys.argv[1:])
if ss.config.daemonize:
def run():
with LoggingContext("run"):
change_resource_limit(ss.config.soft_file_limit)
if ss.config.gc_thresholds:
gc.set_threshold(*ss.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
app="synapse-synchrotron",
pid=ss.config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
reactor.run()
worker_name = sys.argv[1]
start(worker_name, sys.argv[2:])

View File

@ -32,13 +32,15 @@ from .password import PasswordConfig
from .jwt import JWTConfig
from .ldap import LDAPConfig
from .emailconfig import EmailConfig
from .workers import WorkerConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,
WorkerConfig,):
pass

View File

@ -126,54 +126,58 @@ class LoggingConfig(Config):
)
def setup_logging(self):
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
" - %(message)s"
)
if self.log_config is None:
setup_logging(self.log_config, self.log_file, self.verbosity)
level = logging.INFO
level_for_storage = logging.INFO
if self.verbosity:
level = logging.DEBUG
if self.verbosity > 1:
level_for_storage = logging.DEBUG
# FIXME: we need a logging.WARN for a -q quiet option
logger = logging.getLogger('')
logger.setLevel(level)
def setup_logging(log_config=None, log_file=None, verbosity=None):
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
" - %(message)s"
)
if log_config is None:
logging.getLogger('synapse.storage').setLevel(level_for_storage)
level = logging.INFO
level_for_storage = logging.INFO
if verbosity:
level = logging.DEBUG
if verbosity > 1:
level_for_storage = logging.DEBUG
formatter = logging.Formatter(log_format)
if self.log_file:
# TODO: Customisable file size / backup count
handler = logging.handlers.RotatingFileHandler(
self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
)
# FIXME: we need a logging.WARN for a -q quiet option
logger = logging.getLogger('')
logger.setLevel(level)
def sighup(signum, stack):
logger.info("Closing log file due to SIGHUP")
handler.doRollover()
logger.info("Opened new log file due to SIGHUP")
logging.getLogger('synapse.storage').setLevel(level_for_storage)
# TODO(paul): obviously this is a terrible mechanism for
# stealing SIGHUP, because it means no other part of synapse
# can use it instead. If we want to catch SIGHUP anywhere
# else as well, I'd suggest we find a nicer way to broadcast
# it around.
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
formatter = logging.Formatter(log_format)
if log_file:
# TODO: Customisable file size / backup count
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
)
handler.addFilter(LoggingContextFilter(request=""))
def sighup(signum, stack):
logger.info("Closing log file due to SIGHUP")
handler.doRollover()
logger.info("Opened new log file due to SIGHUP")
logger.addHandler(handler)
# TODO(paul): obviously this is a terrible mechanism for
# stealing SIGHUP, because it means no other part of synapse
# can use it instead. If we want to catch SIGHUP anywhere
# else as well, I'd suggest we find a nicer way to broadcast
# it around.
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
else:
with open(self.log_config, 'r') as f:
logging.config.dictConfig(yaml.load(f))
handler = logging.StreamHandler()
handler.setFormatter(formatter)
observer = PythonLoggingObserver()
observer.start()
handler.addFilter(LoggingContextFilter(request=""))
logger.addHandler(handler)
else:
with open(log_config, 'r') as f:
logging.config.dictConfig(yaml.load(f))
observer = PythonLoggingObserver()
observer.start()

View File

@ -38,19 +38,7 @@ class ServerConfig(Config):
self.listeners = config.get("listeners", [])
thresholds = config.get("gc_thresholds", None)
if thresholds is not None:
try:
assert len(thresholds) == 3
self.gc_thresholds = (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)
else:
self.gc_thresholds = None
self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
bind_port = config.get("bind_port")
if bind_port:
@ -264,3 +252,20 @@ class ServerConfig(Config):
type=int,
help="Turn on the twisted telnet manhole"
" service on the given port.")
def read_gc_thresholds(thresholds):
"""Reads the three integer thresholds for garbage collection. Ensures that
the thresholds are integers if thresholds are supplied.
"""
if thresholds is None:
return None
try:
assert len(thresholds) == 3
return (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)