Remove special logic for adding stream_writers: just make it part of the extra config template

rei/cwas_extension
Olivier Wilkinson (reivilibre) 2023-11-16 15:47:28 +00:00
parent 8b7463957f
commit f38297b619
1 changed files with 53 additions and 67 deletions

View File

@ -98,7 +98,11 @@ class WorkerTemplate:
# Stream Writers require "client" and "replication" listeners because they
# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate(),
"pusher": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"pusher_instances": [worker_name],
}
),
"user_dir": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
@ -130,7 +134,11 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"notify_appservices_from_worker": worker_name
},
),
"federation_sender": WorkerTemplate(),
"federation_sender": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"federation_sender_instances": [worker_name],
}
),
"synchrotron": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
@ -202,6 +210,9 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
),
"event_persister": WorkerTemplate(
listener_resources={"replication"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"events": [worker_name]}
},
),
"background_worker": WorkerTemplate(
# This worker cannot be sharded. Therefore, there should only ever be one
@ -229,10 +240,16 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"account_data": [worker_name]}
},
),
"presence": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"presence": [worker_name]}
},
),
"receipts": WorkerTemplate(
listener_resources={"client", "replication"},
@ -240,14 +257,23 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"receipts": [worker_name]}
},
),
"to_device": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"to_device": [worker_name]}
},
),
"typing": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"typing": [worker_name]}
},
),
}
@ -308,6 +334,14 @@ def merge_into(dest: Any, new: Any) -> None:
raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}")
def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
"""
Merges `b` into `a` and returns `a`.
"""
merge_into(a, b)
return a
def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template
@ -338,7 +372,6 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
def add_worker_roles_to_shared_config(
shared_config: dict,
worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
@ -348,8 +381,6 @@ def add_worker_roles_to_shared_config(
Args:
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
@ -357,61 +388,18 @@ def add_worker_roles_to_shared_config(
# streams
instance_map = shared_config.setdefault("instance_map", {})
# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
]
# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)
if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)
# Map of stream writer instance names to host/ports combos
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Add all workers to the `instance_map`
# Technically only certain types of workers, such as stream writers, are needed
# here but it is simpler just to be consistent.
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs(
@ -438,10 +426,10 @@ def merge_worker_template_configs(
new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns
# merge dictionaries; the worker name will be replaced later
new_template.shared_extra_conf = lambda worker_name: {
**new_template.shared_extra_conf(worker_name),
**to_be_merged_template.shared_extra_conf(worker_name),
}
new_template.shared_extra_conf = lambda worker_name: merged(
existing_template.shared_extra_conf(worker_name),
to_be_merged_template.shared_extra_conf(worker_name),
)
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
@ -821,9 +809,7 @@ def generate_worker_files(
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_types_set, worker_name, worker_port
)
add_worker_roles_to_shared_config(shared_config, worker_name, worker_port)
# Enable the worker in supervisord
worker_descriptors.append(worker_config)