Create a ListenerConfig object (#7681)
This ended up being a bit more invasive than I'd hoped for (not helped by generic_worker duplicating some of the code from homeserver), but hopefully it's an improvement. The idea is that, rather than storing unstructured `dict`s in the config for the listener configurations, we instead parse it into a structured `ListenerConfig` object.pull/7708/head
parent
789606577a
commit
03619324fc
|
@ -0,0 +1 @@
|
||||||
|
Refactor handling of `listeners` configuration settings.
|
|
@ -20,6 +20,7 @@ import signal
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
from daemonize import Daemonize
|
from daemonize import Daemonize
|
||||||
from typing_extensions import NoReturn
|
from typing_extensions import NoReturn
|
||||||
|
@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
from synapse.app import check_bind_error
|
from synapse.app import check_bind_error
|
||||||
|
from synapse.config.server import ListenerConfig
|
||||||
from synapse.crypto import context_factory
|
from synapse.crypto import context_factory
|
||||||
from synapse.logging.context import PreserveLoggingContext
|
from synapse.logging.context import PreserveLoggingContext
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
@ -234,7 +236,7 @@ def refresh_certificate(hs):
|
||||||
logger.info("Context factories updated.")
|
logger.info("Context factories updated.")
|
||||||
|
|
||||||
|
|
||||||
def start(hs, listeners=None):
|
def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
||||||
"""
|
"""
|
||||||
Start a Synapse server or worker.
|
Start a Synapse server or worker.
|
||||||
|
|
||||||
|
@ -245,8 +247,8 @@ def start(hs, listeners=None):
|
||||||
notify systemd.
|
notify systemd.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
hs (synapse.server.HomeServer)
|
hs: homeserver instance
|
||||||
listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
|
listeners: Listener configuration ('listeners' in homeserver.yaml)
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Set up the SIGHUP machinery.
|
# Set up the SIGHUP machinery.
|
||||||
|
|
|
@ -37,6 +37,7 @@ from synapse.app import _base
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
|
from synapse.config.server import ListenerConfig
|
||||||
from synapse.federation import send_queue
|
from synapse.federation import send_queue
|
||||||
from synapse.federation.transport.server import TransportLayerServer
|
from synapse.federation.transport.server import TransportLayerServer
|
||||||
from synapse.handlers.presence import (
|
from synapse.handlers.presence import (
|
||||||
|
@ -514,13 +515,18 @@ class GenericWorkerSlavedStore(
|
||||||
class GenericWorkerServer(HomeServer):
|
class GenericWorkerServer(HomeServer):
|
||||||
DATASTORE_CLASS = GenericWorkerSlavedStore
|
DATASTORE_CLASS = GenericWorkerSlavedStore
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config: ListenerConfig):
|
||||||
port = listener_config["port"]
|
port = listener_config.port
|
||||||
bind_addresses = listener_config["bind_addresses"]
|
bind_addresses = listener_config.bind_addresses
|
||||||
site_tag = listener_config.get("tag", port)
|
|
||||||
|
assert listener_config.http_options is not None
|
||||||
|
|
||||||
|
site_tag = listener_config.http_options.tag
|
||||||
|
if site_tag is None:
|
||||||
|
site_tag = port
|
||||||
resources = {}
|
resources = {}
|
||||||
for res in listener_config["resources"]:
|
for res in listener_config.http_options.resources:
|
||||||
for name in res["names"]:
|
for name in res.names:
|
||||||
if name == "metrics":
|
if name == "metrics":
|
||||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||||
elif name == "client":
|
elif name == "client":
|
||||||
|
@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer):
|
||||||
" repository is disabled. Ignoring."
|
" repository is disabled. Ignoring."
|
||||||
)
|
)
|
||||||
|
|
||||||
if name == "openid" and "federation" not in res["names"]:
|
if name == "openid" and "federation" not in res.names:
|
||||||
# Only load the openid resource separately if federation resource
|
# Only load the openid resource separately if federation resource
|
||||||
# is not specified since federation resource includes openid
|
# is not specified since federation resource includes openid
|
||||||
# resource.
|
# resource.
|
||||||
|
@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer):
|
||||||
|
|
||||||
logger.info("Synapse worker now listening on port %d", port)
|
logger.info("Synapse worker now listening on port %d", port)
|
||||||
|
|
||||||
def start_listening(self, listeners):
|
def start_listening(self, listeners: Iterable[ListenerConfig]):
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
if listener["type"] == "http":
|
if listener.type == "http":
|
||||||
self._listen_http(listener)
|
self._listen_http(listener)
|
||||||
elif listener["type"] == "manhole":
|
elif listener.type == "manhole":
|
||||||
_base.listen_tcp(
|
_base.listen_tcp(
|
||||||
listener["bind_addresses"],
|
listener.bind_addresses,
|
||||||
listener["port"],
|
listener.port,
|
||||||
manhole(
|
manhole(
|
||||||
username="matrix", password="rabbithole", globals={"hs": self}
|
username="matrix", password="rabbithole", globals={"hs": self}
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
elif listener["type"] == "metrics":
|
elif listener.type == "metrics":
|
||||||
if not self.get_config().enable_metrics:
|
if not self.get_config().enable_metrics:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
(
|
(
|
||||||
|
@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
_base.listen_metrics(listener.bind_addresses, listener.port)
|
||||||
else:
|
else:
|
||||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
logger.warning("Unsupported listener type: %s", listener.type)
|
||||||
|
|
||||||
self.get_tcp_replication().start_replication(self)
|
self.get_tcp_replication().start_replication(self)
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import math
|
||||||
import os
|
import os
|
||||||
import resource
|
import resource
|
||||||
import sys
|
import sys
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
from prometheus_client import Gauge
|
from prometheus_client import Gauge
|
||||||
|
|
||||||
|
@ -48,6 +49,7 @@ from synapse.app import _base
|
||||||
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
|
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
|
from synapse.config.server import ListenerConfig
|
||||||
from synapse.federation.transport.server import TransportLayerServer
|
from synapse.federation.transport.server import TransportLayerServer
|
||||||
from synapse.http.additional_resource import AdditionalResource
|
from synapse.http.additional_resource import AdditionalResource
|
||||||
from synapse.http.server import (
|
from synapse.http.server import (
|
||||||
|
@ -87,24 +89,24 @@ def gz_wrap(r):
|
||||||
class SynapseHomeServer(HomeServer):
|
class SynapseHomeServer(HomeServer):
|
||||||
DATASTORE_CLASS = DataStore
|
DATASTORE_CLASS = DataStore
|
||||||
|
|
||||||
def _listener_http(self, config, listener_config):
|
def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
|
||||||
port = listener_config["port"]
|
port = listener_config.port
|
||||||
bind_addresses = listener_config["bind_addresses"]
|
bind_addresses = listener_config.bind_addresses
|
||||||
tls = listener_config.get("tls", False)
|
tls = listener_config.tls
|
||||||
site_tag = listener_config.get("tag", port)
|
site_tag = listener_config.http_options.tag
|
||||||
|
if site_tag is None:
|
||||||
|
site_tag = port
|
||||||
|
|
||||||
resources = {}
|
resources = {}
|
||||||
for res in listener_config["resources"]:
|
for res in listener_config.http_options.resources:
|
||||||
for name in res["names"]:
|
for name in res.names:
|
||||||
if name == "openid" and "federation" in res["names"]:
|
if name == "openid" and "federation" in res.names:
|
||||||
# Skip loading openid resource if federation is defined
|
# Skip loading openid resource if federation is defined
|
||||||
# since federation resource will include openid
|
# since federation resource will include openid
|
||||||
continue
|
continue
|
||||||
resources.update(
|
resources.update(self._configure_named_resource(name, res.compress))
|
||||||
self._configure_named_resource(name, res.get("compress", False))
|
|
||||||
)
|
|
||||||
|
|
||||||
additional_resources = listener_config.get("additional_resources", {})
|
additional_resources = listener_config.http_options.additional_resources
|
||||||
logger.debug("Configuring additional resources: %r", additional_resources)
|
logger.debug("Configuring additional resources: %r", additional_resources)
|
||||||
module_api = ModuleApi(self, self.get_auth_handler())
|
module_api = ModuleApi(self, self.get_auth_handler())
|
||||||
for path, resmodule in additional_resources.items():
|
for path, resmodule in additional_resources.items():
|
||||||
|
@ -276,7 +278,7 @@ class SynapseHomeServer(HomeServer):
|
||||||
|
|
||||||
return resources
|
return resources
|
||||||
|
|
||||||
def start_listening(self, listeners):
|
def start_listening(self, listeners: Iterable[ListenerConfig]):
|
||||||
config = self.get_config()
|
config = self.get_config()
|
||||||
|
|
||||||
if config.redis_enabled:
|
if config.redis_enabled:
|
||||||
|
@ -286,25 +288,25 @@ class SynapseHomeServer(HomeServer):
|
||||||
self.get_tcp_replication().start_replication(self)
|
self.get_tcp_replication().start_replication(self)
|
||||||
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
if listener["type"] == "http":
|
if listener.type == "http":
|
||||||
self._listening_services.extend(self._listener_http(config, listener))
|
self._listening_services.extend(self._listener_http(config, listener))
|
||||||
elif listener["type"] == "manhole":
|
elif listener.type == "manhole":
|
||||||
listen_tcp(
|
listen_tcp(
|
||||||
listener["bind_addresses"],
|
listener.bind_addresses,
|
||||||
listener["port"],
|
listener.port,
|
||||||
manhole(
|
manhole(
|
||||||
username="matrix", password="rabbithole", globals={"hs": self}
|
username="matrix", password="rabbithole", globals={"hs": self}
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
elif listener["type"] == "replication":
|
elif listener.type == "replication":
|
||||||
services = listen_tcp(
|
services = listen_tcp(
|
||||||
listener["bind_addresses"],
|
listener.bind_addresses,
|
||||||
listener["port"],
|
listener.port,
|
||||||
ReplicationStreamProtocolFactory(self),
|
ReplicationStreamProtocolFactory(self),
|
||||||
)
|
)
|
||||||
for s in services:
|
for s in services:
|
||||||
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
|
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
|
||||||
elif listener["type"] == "metrics":
|
elif listener.type == "metrics":
|
||||||
if not self.get_config().enable_metrics:
|
if not self.get_config().enable_metrics:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
(
|
(
|
||||||
|
@ -313,9 +315,11 @@ class SynapseHomeServer(HomeServer):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
_base.listen_metrics(listener.bind_addresses, listener.port)
|
||||||
else:
|
else:
|
||||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
# this shouldn't happen, as the listener type should have been checked
|
||||||
|
# during parsing
|
||||||
|
logger.warning("Unrecognized listener type: %s", listener.type)
|
||||||
|
|
||||||
|
|
||||||
# Gauges to expose monthly active user control metrics
|
# Gauges to expose monthly active user control metrics
|
||||||
|
|
|
@ -19,7 +19,7 @@ import logging
|
||||||
import os.path
|
import os.path
|
||||||
import re
|
import re
|
||||||
from textwrap import indent
|
from textwrap import indent
|
||||||
from typing import Dict, List, Optional
|
from typing import Any, Dict, Iterable, List, Optional
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
import yaml
|
import yaml
|
||||||
|
@ -57,6 +57,64 @@ on how to configure the new listener.
|
||||||
--------------------------------------------------------------------------------"""
|
--------------------------------------------------------------------------------"""
|
||||||
|
|
||||||
|
|
||||||
|
KNOWN_LISTENER_TYPES = {
|
||||||
|
"http",
|
||||||
|
"metrics",
|
||||||
|
"manhole",
|
||||||
|
"replication",
|
||||||
|
}
|
||||||
|
|
||||||
|
KNOWN_RESOURCES = {
|
||||||
|
"client",
|
||||||
|
"consent",
|
||||||
|
"federation",
|
||||||
|
"keys",
|
||||||
|
"media",
|
||||||
|
"metrics",
|
||||||
|
"openid",
|
||||||
|
"replication",
|
||||||
|
"static",
|
||||||
|
"webclient",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(frozen=True)
|
||||||
|
class HttpResourceConfig:
|
||||||
|
names = attr.ib(
|
||||||
|
type=List[str],
|
||||||
|
factory=list,
|
||||||
|
validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)), # type: ignore
|
||||||
|
)
|
||||||
|
compress = attr.ib(
|
||||||
|
type=bool,
|
||||||
|
default=False,
|
||||||
|
validator=attr.validators.optional(attr.validators.instance_of(bool)), # type: ignore[arg-type]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(frozen=True)
|
||||||
|
class HttpListenerConfig:
|
||||||
|
"""Object describing the http-specific parts of the config of a listener"""
|
||||||
|
|
||||||
|
x_forwarded = attr.ib(type=bool, default=False)
|
||||||
|
resources = attr.ib(type=List[HttpResourceConfig], factory=list)
|
||||||
|
additional_resources = attr.ib(type=Dict[str, dict], factory=dict)
|
||||||
|
tag = attr.ib(type=str, default=None)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(frozen=True)
|
||||||
|
class ListenerConfig:
|
||||||
|
"""Object describing the configuration of a single listener."""
|
||||||
|
|
||||||
|
port = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
||||||
|
bind_addresses = attr.ib(type=List[str])
|
||||||
|
type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
|
||||||
|
tls = attr.ib(type=bool, default=False)
|
||||||
|
|
||||||
|
# http_options is only populated if type=http
|
||||||
|
http_options = attr.ib(type=Optional[HttpListenerConfig], default=None)
|
||||||
|
|
||||||
|
|
||||||
class ServerConfig(Config):
|
class ServerConfig(Config):
|
||||||
section = "server"
|
section = "server"
|
||||||
|
|
||||||
|
@ -379,38 +437,21 @@ class ServerConfig(Config):
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
self.listeners = [] # type: List[dict]
|
self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
|
||||||
for listener in config.get("listeners", []):
|
|
||||||
if not isinstance(listener.get("port", None), int):
|
|
||||||
raise ConfigError(
|
|
||||||
"Listener configuration is lacking a valid 'port' option"
|
|
||||||
)
|
|
||||||
|
|
||||||
if listener.setdefault("tls", False):
|
|
||||||
# no_tls is not really supported any more, but let's grandfather it in
|
# no_tls is not really supported any more, but let's grandfather it in
|
||||||
# here.
|
# here.
|
||||||
if config.get("no_tls", False):
|
if config.get("no_tls", False):
|
||||||
|
l2 = []
|
||||||
|
for listener in self.listeners:
|
||||||
|
if listener.tls:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Ignoring TLS-enabled listener on port %i due to no_tls"
|
"Ignoring TLS-enabled listener on port %i due to no_tls",
|
||||||
|
listener.port,
|
||||||
)
|
)
|
||||||
continue
|
|
||||||
|
|
||||||
bind_address = listener.pop("bind_address", None)
|
|
||||||
bind_addresses = listener.setdefault("bind_addresses", [])
|
|
||||||
|
|
||||||
# if bind_address was specified, add it to the list of addresses
|
|
||||||
if bind_address:
|
|
||||||
bind_addresses.append(bind_address)
|
|
||||||
|
|
||||||
# if we still have an empty list of addresses, use the default list
|
|
||||||
if not bind_addresses:
|
|
||||||
if listener["type"] == "metrics":
|
|
||||||
# the metrics listener doesn't support IPv6
|
|
||||||
bind_addresses.append("0.0.0.0")
|
|
||||||
else:
|
else:
|
||||||
bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
|
l2.append(listener)
|
||||||
|
self.listeners = l2
|
||||||
self.listeners.append(listener)
|
|
||||||
|
|
||||||
if not self.web_client_location:
|
if not self.web_client_location:
|
||||||
_warn_if_webclient_configured(self.listeners)
|
_warn_if_webclient_configured(self.listeners)
|
||||||
|
@ -446,43 +487,41 @@ class ServerConfig(Config):
|
||||||
bind_host = config.get("bind_host", "")
|
bind_host = config.get("bind_host", "")
|
||||||
gzip_responses = config.get("gzip_responses", True)
|
gzip_responses = config.get("gzip_responses", True)
|
||||||
|
|
||||||
self.listeners.append(
|
http_options = HttpListenerConfig(
|
||||||
{
|
resources=[
|
||||||
"port": bind_port,
|
HttpResourceConfig(names=["client"], compress=gzip_responses),
|
||||||
"bind_addresses": [bind_host],
|
HttpResourceConfig(names=["federation"]),
|
||||||
"tls": True,
|
|
||||||
"type": "http",
|
|
||||||
"resources": [
|
|
||||||
{"names": ["client"], "compress": gzip_responses},
|
|
||||||
{"names": ["federation"], "compress": False},
|
|
||||||
],
|
],
|
||||||
}
|
)
|
||||||
|
|
||||||
|
self.listeners.append(
|
||||||
|
ListenerConfig(
|
||||||
|
port=bind_port,
|
||||||
|
bind_addresses=[bind_host],
|
||||||
|
tls=True,
|
||||||
|
type="http",
|
||||||
|
http_options=http_options,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
unsecure_port = config.get("unsecure_port", bind_port - 400)
|
unsecure_port = config.get("unsecure_port", bind_port - 400)
|
||||||
if unsecure_port:
|
if unsecure_port:
|
||||||
self.listeners.append(
|
self.listeners.append(
|
||||||
{
|
ListenerConfig(
|
||||||
"port": unsecure_port,
|
port=unsecure_port,
|
||||||
"bind_addresses": [bind_host],
|
bind_addresses=[bind_host],
|
||||||
"tls": False,
|
tls=False,
|
||||||
"type": "http",
|
type="http",
|
||||||
"resources": [
|
http_options=http_options,
|
||||||
{"names": ["client"], "compress": gzip_responses},
|
)
|
||||||
{"names": ["federation"], "compress": False},
|
|
||||||
],
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
manhole = config.get("manhole")
|
manhole = config.get("manhole")
|
||||||
if manhole:
|
if manhole:
|
||||||
self.listeners.append(
|
self.listeners.append(
|
||||||
{
|
ListenerConfig(
|
||||||
"port": manhole,
|
port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
|
||||||
"bind_addresses": ["127.0.0.1"],
|
)
|
||||||
"type": "manhole",
|
|
||||||
"tls": False,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
metrics_port = config.get("metrics_port")
|
metrics_port = config.get("metrics_port")
|
||||||
|
@ -490,13 +529,14 @@ class ServerConfig(Config):
|
||||||
logger.warning(METRICS_PORT_WARNING)
|
logger.warning(METRICS_PORT_WARNING)
|
||||||
|
|
||||||
self.listeners.append(
|
self.listeners.append(
|
||||||
{
|
ListenerConfig(
|
||||||
"port": metrics_port,
|
port=metrics_port,
|
||||||
"bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
|
bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")],
|
||||||
"tls": False,
|
type="http",
|
||||||
"type": "http",
|
http_options=HttpListenerConfig(
|
||||||
"resources": [{"names": ["metrics"], "compress": False}],
|
resources=[HttpResourceConfig(names=["metrics"])]
|
||||||
}
|
),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
_check_resource_config(self.listeners)
|
_check_resource_config(self.listeners)
|
||||||
|
@ -522,7 +562,7 @@ class ServerConfig(Config):
|
||||||
)
|
)
|
||||||
|
|
||||||
def has_tls_listener(self) -> bool:
|
def has_tls_listener(self) -> bool:
|
||||||
return any(listener["tls"] for listener in self.listeners)
|
return any(listener.tls for listener in self.listeners)
|
||||||
|
|
||||||
def generate_config_section(
|
def generate_config_section(
|
||||||
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
|
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
|
||||||
|
@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_listener_def(listener: Any) -> ListenerConfig:
|
||||||
|
"""parse a listener config from the config file"""
|
||||||
|
listener_type = listener["type"]
|
||||||
|
|
||||||
|
port = listener.get("port")
|
||||||
|
if not isinstance(port, int):
|
||||||
|
raise ConfigError("Listener configuration is lacking a valid 'port' option")
|
||||||
|
|
||||||
|
tls = listener.get("tls", False)
|
||||||
|
|
||||||
|
bind_addresses = listener.get("bind_addresses", [])
|
||||||
|
bind_address = listener.get("bind_address")
|
||||||
|
# if bind_address was specified, add it to the list of addresses
|
||||||
|
if bind_address:
|
||||||
|
bind_addresses.append(bind_address)
|
||||||
|
|
||||||
|
# if we still have an empty list of addresses, use the default list
|
||||||
|
if not bind_addresses:
|
||||||
|
if listener_type == "metrics":
|
||||||
|
# the metrics listener doesn't support IPv6
|
||||||
|
bind_addresses.append("0.0.0.0")
|
||||||
|
else:
|
||||||
|
bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
|
||||||
|
|
||||||
|
http_config = None
|
||||||
|
if listener_type == "http":
|
||||||
|
http_config = HttpListenerConfig(
|
||||||
|
x_forwarded=listener.get("x_forwarded", False),
|
||||||
|
resources=[
|
||||||
|
HttpResourceConfig(**res) for res in listener.get("resources", [])
|
||||||
|
],
|
||||||
|
additional_resources=listener.get("additional_resources", {}),
|
||||||
|
tag=listener.get("tag"),
|
||||||
|
)
|
||||||
|
|
||||||
|
return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
|
||||||
|
|
||||||
|
|
||||||
NO_MORE_WEB_CLIENT_WARNING = """
|
NO_MORE_WEB_CLIENT_WARNING = """
|
||||||
Synapse no longer includes a web client. To enable a web client, configure
|
Synapse no longer includes a web client. To enable a web client, configure
|
||||||
web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
|
web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
|
||||||
|
@ -1088,40 +1166,27 @@ configuration.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def _warn_if_webclient_configured(listeners):
|
def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None:
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
for res in listener.get("resources", []):
|
if not listener.http_options:
|
||||||
for name in res.get("names", []):
|
continue
|
||||||
|
for res in listener.http_options.resources:
|
||||||
|
for name in res.names:
|
||||||
if name == "webclient":
|
if name == "webclient":
|
||||||
logger.warning(NO_MORE_WEB_CLIENT_WARNING)
|
logger.warning(NO_MORE_WEB_CLIENT_WARNING)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
KNOWN_RESOURCES = (
|
def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None:
|
||||||
"client",
|
|
||||||
"consent",
|
|
||||||
"federation",
|
|
||||||
"keys",
|
|
||||||
"media",
|
|
||||||
"metrics",
|
|
||||||
"openid",
|
|
||||||
"replication",
|
|
||||||
"static",
|
|
||||||
"webclient",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _check_resource_config(listeners):
|
|
||||||
resource_names = {
|
resource_names = {
|
||||||
res_name
|
res_name
|
||||||
for listener in listeners
|
for listener in listeners
|
||||||
for res in listener.get("resources", [])
|
if listener.http_options
|
||||||
for res_name in res.get("names", [])
|
for res in listener.http_options.resources
|
||||||
|
for res_name in res.names
|
||||||
}
|
}
|
||||||
|
|
||||||
for resource in resource_names:
|
for resource in resource_names:
|
||||||
if resource not in KNOWN_RESOURCES:
|
|
||||||
raise ConfigError("Unknown listener resource '%s'" % (resource,))
|
|
||||||
if resource == "consent":
|
if resource == "consent":
|
||||||
try:
|
try:
|
||||||
check_requirements("resources.consent")
|
check_requirements("resources.consent")
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from ._base import Config, ConfigError
|
from ._base import Config, ConfigError
|
||||||
|
from .server import ListenerConfig, parse_listener_def
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
|
@ -52,7 +53,9 @@ class WorkerConfig(Config):
|
||||||
if self.worker_app == "synapse.app.homeserver":
|
if self.worker_app == "synapse.app.homeserver":
|
||||||
self.worker_app = None
|
self.worker_app = None
|
||||||
|
|
||||||
self.worker_listeners = config.get("worker_listeners", [])
|
self.worker_listeners = [
|
||||||
|
parse_listener_def(x) for x in config.get("worker_listeners", [])
|
||||||
|
]
|
||||||
self.worker_daemonize = config.get("worker_daemonize")
|
self.worker_daemonize = config.get("worker_daemonize")
|
||||||
self.worker_pid_file = config.get("worker_pid_file")
|
self.worker_pid_file = config.get("worker_pid_file")
|
||||||
self.worker_log_config = config.get("worker_log_config")
|
self.worker_log_config = config.get("worker_log_config")
|
||||||
|
@ -75,23 +78,10 @@ class WorkerConfig(Config):
|
||||||
manhole = config.get("worker_manhole")
|
manhole = config.get("worker_manhole")
|
||||||
if manhole:
|
if manhole:
|
||||||
self.worker_listeners.append(
|
self.worker_listeners.append(
|
||||||
{
|
ListenerConfig(
|
||||||
"port": manhole,
|
port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
|
||||||
"bind_addresses": ["127.0.0.1"],
|
)
|
||||||
"type": "manhole",
|
|
||||||
"tls": False,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.worker_listeners:
|
|
||||||
for listener in self.worker_listeners:
|
|
||||||
bind_address = listener.pop("bind_address", None)
|
|
||||||
bind_addresses = listener.setdefault("bind_addresses", [])
|
|
||||||
|
|
||||||
if bind_address:
|
|
||||||
bind_addresses.append(bind_address)
|
|
||||||
elif not bind_addresses:
|
|
||||||
bind_addresses.append("")
|
|
||||||
|
|
||||||
# A map from instance name to host/port of their HTTP replication endpoint.
|
# A map from instance name to host/port of their HTTP replication endpoint.
|
||||||
instance_map = config.get("instance_map") or {}
|
instance_map = config.get("instance_map") or {}
|
||||||
|
|
|
@ -19,6 +19,7 @@ from typing import Optional
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
from twisted.web.server import Request, Site
|
from twisted.web.server import Request, Site
|
||||||
|
|
||||||
|
from synapse.config.server import ListenerConfig
|
||||||
from synapse.http import redact_uri
|
from synapse.http import redact_uri
|
||||||
from synapse.http.request_metrics import RequestMetrics, requests_counter
|
from synapse.http.request_metrics import RequestMetrics, requests_counter
|
||||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||||
|
@ -350,7 +351,7 @@ class SynapseSite(Site):
|
||||||
self,
|
self,
|
||||||
logger_name,
|
logger_name,
|
||||||
site_tag,
|
site_tag,
|
||||||
config,
|
config: ListenerConfig,
|
||||||
resource,
|
resource,
|
||||||
server_version_string,
|
server_version_string,
|
||||||
*args,
|
*args,
|
||||||
|
@ -360,7 +361,8 @@ class SynapseSite(Site):
|
||||||
|
|
||||||
self.site_tag = site_tag
|
self.site_tag = site_tag
|
||||||
|
|
||||||
proxied = config.get("x_forwarded", False)
|
assert config.http_options is not None
|
||||||
|
proxied = config.http_options.x_forwarded
|
||||||
self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
|
self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
|
||||||
self.access_logger = logging.getLogger(logger_name)
|
self.access_logger = logging.getLogger(logger_name)
|
||||||
self.server_version_string = server_version_string.encode("ascii")
|
self.server_version_string = server_version_string.encode("ascii")
|
||||||
|
|
|
@ -68,9 +68,8 @@ REQUIREMENTS = [
|
||||||
"phonenumbers>=8.2.0",
|
"phonenumbers>=8.2.0",
|
||||||
"six>=1.10",
|
"six>=1.10",
|
||||||
"prometheus_client>=0.0.18,<0.8.0",
|
"prometheus_client>=0.0.18,<0.8.0",
|
||||||
# we use attr.s(slots), which arrived in 16.0.0
|
# we use attr.validators.deep_iterable, which arrived in 19.1.0
|
||||||
# Twisted 18.7.0 requires attrs>=17.4.0
|
"attrs>=19.1.0",
|
||||||
"attrs>=17.4.0",
|
|
||||||
"netaddr>=0.7.18",
|
"netaddr>=0.7.18",
|
||||||
"Jinja2>=2.9",
|
"Jinja2>=2.9",
|
||||||
"bleach>=1.4.3",
|
"bleach>=1.4.3",
|
||||||
|
|
|
@ -30,6 +30,16 @@ class FrontendProxyTests(HomeserverTestCase):
|
||||||
def default_config(self):
|
def default_config(self):
|
||||||
c = super().default_config()
|
c = super().default_config()
|
||||||
c["worker_app"] = "synapse.app.frontend_proxy"
|
c["worker_app"] = "synapse.app.frontend_proxy"
|
||||||
|
|
||||||
|
c["worker_listeners"] = [
|
||||||
|
{
|
||||||
|
"type": "http",
|
||||||
|
"port": 8080,
|
||||||
|
"bind_addresses": ["0.0.0.0"],
|
||||||
|
"resources": [{"names": ["client"]}],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
return c
|
return c
|
||||||
|
|
||||||
def test_listen_http_with_presence_enabled(self):
|
def test_listen_http_with_presence_enabled(self):
|
||||||
|
@ -39,14 +49,8 @@ class FrontendProxyTests(HomeserverTestCase):
|
||||||
# Presence is on
|
# Presence is on
|
||||||
self.hs.config.use_presence = True
|
self.hs.config.use_presence = True
|
||||||
|
|
||||||
config = {
|
|
||||||
"port": 8080,
|
|
||||||
"bind_addresses": ["0.0.0.0"],
|
|
||||||
"resources": [{"names": ["client"]}],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Listen with the config
|
# Listen with the config
|
||||||
self.hs._listen_http(config)
|
self.hs._listen_http(self.hs.config.worker.worker_listeners[0])
|
||||||
|
|
||||||
# Grab the resource from the site that was told to listen
|
# Grab the resource from the site that was told to listen
|
||||||
self.assertEqual(len(self.reactor.tcpServers), 1)
|
self.assertEqual(len(self.reactor.tcpServers), 1)
|
||||||
|
@ -67,14 +71,8 @@ class FrontendProxyTests(HomeserverTestCase):
|
||||||
# Presence is off
|
# Presence is off
|
||||||
self.hs.config.use_presence = False
|
self.hs.config.use_presence = False
|
||||||
|
|
||||||
config = {
|
|
||||||
"port": 8080,
|
|
||||||
"bind_addresses": ["0.0.0.0"],
|
|
||||||
"resources": [{"names": ["client"]}],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Listen with the config
|
# Listen with the config
|
||||||
self.hs._listen_http(config)
|
self.hs._listen_http(self.hs.config.worker.worker_listeners[0])
|
||||||
|
|
||||||
# Grab the resource from the site that was told to listen
|
# Grab the resource from the site that was told to listen
|
||||||
self.assertEqual(len(self.reactor.tcpServers), 1)
|
self.assertEqual(len(self.reactor.tcpServers), 1)
|
||||||
|
|
|
@ -18,6 +18,7 @@ from parameterized import parameterized
|
||||||
|
|
||||||
from synapse.app.generic_worker import GenericWorkerServer
|
from synapse.app.generic_worker import GenericWorkerServer
|
||||||
from synapse.app.homeserver import SynapseHomeServer
|
from synapse.app.homeserver import SynapseHomeServer
|
||||||
|
from synapse.config.server import parse_listener_def
|
||||||
|
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
@ -35,6 +36,7 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase):
|
||||||
# have to tell the FederationHandler not to try to access stuff that is only
|
# have to tell the FederationHandler not to try to access stuff that is only
|
||||||
# in the primary store.
|
# in the primary store.
|
||||||
conf["worker_app"] = "yes"
|
conf["worker_app"] = "yes"
|
||||||
|
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
@parameterized.expand(
|
@parameterized.expand(
|
||||||
|
@ -53,12 +55,13 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
config = {
|
config = {
|
||||||
"port": 8080,
|
"port": 8080,
|
||||||
|
"type": "http",
|
||||||
"bind_addresses": ["0.0.0.0"],
|
"bind_addresses": ["0.0.0.0"],
|
||||||
"resources": [{"names": names}],
|
"resources": [{"names": names}],
|
||||||
}
|
}
|
||||||
|
|
||||||
# Listen with the config
|
# Listen with the config
|
||||||
self.hs._listen_http(config)
|
self.hs._listen_http(parse_listener_def(config))
|
||||||
|
|
||||||
# Grab the resource from the site that was told to listen
|
# Grab the resource from the site that was told to listen
|
||||||
site = self.reactor.tcpServers[0][1]
|
site = self.reactor.tcpServers[0][1]
|
||||||
|
@ -101,12 +104,13 @@ class SynapseHomeserverOpenIDListenerTests(HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
config = {
|
config = {
|
||||||
"port": 8080,
|
"port": 8080,
|
||||||
|
"type": "http",
|
||||||
"bind_addresses": ["0.0.0.0"],
|
"bind_addresses": ["0.0.0.0"],
|
||||||
"resources": [{"names": names}],
|
"resources": [{"names": names}],
|
||||||
}
|
}
|
||||||
|
|
||||||
# Listen with the config
|
# Listen with the config
|
||||||
self.hs._listener_http(config, config)
|
self.hs._listener_http(self.hs.get_config(), parse_listener_def(config))
|
||||||
|
|
||||||
# Grab the resource from the site that was told to listen
|
# Grab the resource from the site that was told to listen
|
||||||
site = self.reactor.tcpServers[0][1]
|
site = self.reactor.tcpServers[0][1]
|
||||||
|
|
|
@ -24,6 +24,7 @@ from twisted.web.resource import Resource
|
||||||
from twisted.web.server import NOT_DONE_YET
|
from twisted.web.server import NOT_DONE_YET
|
||||||
|
|
||||||
from synapse.api.errors import Codes, RedirectException, SynapseError
|
from synapse.api.errors import Codes, RedirectException, SynapseError
|
||||||
|
from synapse.config.server import parse_listener_def
|
||||||
from synapse.http.server import (
|
from synapse.http.server import (
|
||||||
DirectServeResource,
|
DirectServeResource,
|
||||||
JsonResource,
|
JsonResource,
|
||||||
|
@ -189,7 +190,13 @@ class OptionsResourceTests(unittest.TestCase):
|
||||||
request.prepath = [] # This doesn't get set properly by make_request.
|
request.prepath = [] # This doesn't get set properly by make_request.
|
||||||
|
|
||||||
# Create a site and query for the resource.
|
# Create a site and query for the resource.
|
||||||
site = SynapseSite("test", "site_tag", {}, self.resource, "1.0")
|
site = SynapseSite(
|
||||||
|
"test",
|
||||||
|
"site_tag",
|
||||||
|
parse_listener_def({"type": "http", "port": 0}),
|
||||||
|
self.resource,
|
||||||
|
"1.0",
|
||||||
|
)
|
||||||
request.site = site
|
request.site = site
|
||||||
resource = site.getResourceFor(request)
|
resource = site.getResourceFor(request)
|
||||||
|
|
||||||
|
@ -348,7 +355,9 @@ class SiteTestCase(unittest.HomeserverTestCase):
|
||||||
# time out the request while it's 'processing'
|
# time out the request while it's 'processing'
|
||||||
base_resource = Resource()
|
base_resource = Resource()
|
||||||
base_resource.putChild(b"", HangingResource())
|
base_resource.putChild(b"", HangingResource())
|
||||||
site = SynapseSite("test", "site_tag", {}, base_resource, "1.0")
|
site = SynapseSite(
|
||||||
|
"test", "site_tag", self.hs.config.listeners[0], base_resource, "1.0"
|
||||||
|
)
|
||||||
|
|
||||||
server = site.buildProtocol(None)
|
server = site.buildProtocol(None)
|
||||||
client = AccumulatingProtocol()
|
client = AccumulatingProtocol()
|
||||||
|
|
|
@ -229,7 +229,7 @@ class HomeserverTestCase(TestCase):
|
||||||
self.site = SynapseSite(
|
self.site = SynapseSite(
|
||||||
logger_name="synapse.access.http.fake",
|
logger_name="synapse.access.http.fake",
|
||||||
site_tag="test",
|
site_tag="test",
|
||||||
config={},
|
config=self.hs.config.server.listeners[0],
|
||||||
resource=self.resource,
|
resource=self.resource,
|
||||||
server_version_string="1",
|
server_version_string="1",
|
||||||
)
|
)
|
||||||
|
|
|
@ -168,6 +168,7 @@ def default_config(name, parse=False):
|
||||||
# background, which upsets the test runner.
|
# background, which upsets the test runner.
|
||||||
"update_user_directory": False,
|
"update_user_directory": False,
|
||||||
"caches": {"global_factor": 1},
|
"caches": {"global_factor": 1},
|
||||||
|
"listeners": [{"port": 0, "type": "http"}],
|
||||||
}
|
}
|
||||||
|
|
||||||
if parse:
|
if parse:
|
||||||
|
|
Loading…
Reference in New Issue