pull/16650/merge
reivilibre 2023-12-13 10:14:16 -08:00 committed by GitHub
commit a5bc7d1fac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 260 additions and 295 deletions

1
changelog.d/16650.misc Normal file
View File

@ -0,0 +1 @@
Refactor the `configure_workers_and_start.py` script used internally by Complement.

View File

@ -6,7 +6,7 @@ command=/usr/local/bin/python -m synapse.app.complement_fork_starter
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
{%- for worker in workers %}
-- {{ worker.app }}
-- synapse.app.generic_worker
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml
@ -36,7 +36,7 @@ exitcodes=0
{% for worker in workers %}
[program:synapse_{{ worker.name }}]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.generic_worker
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml

View File

@ -3,7 +3,7 @@
# Values will be change depending on whichever workers are selected when
# running that image.
worker_app: "{{ app }}"
worker_app: "synapse.app.generic_worker"
worker_name: "{{ name }}"
worker_listeners:

View File

@ -47,16 +47,20 @@
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.
import dataclasses
import os
import platform
import re
import subprocess
import sys
from argparse import ArgumentParser
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import chain
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
@ -78,9 +82,30 @@ MAIN_PROCESS_REPLICATION_PORT = 9093
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"
# We place a file at this path to indicate that the script has already been
# run and should not be run again.
MARKER_FILE_PATH = "/conf/workers_have_been_configured"
@dataclass
class WorkerTemplate:
"""
A definition of individual settings for a specific worker type.
A worker name can be fed into the template in order to generate a config.
These worker templates can be merged with `merge_worker_template_configs`
in order for a single worker to be made from multiple templates.
"""
listener_resources: Set[str] = field(default_factory=set)
endpoint_patterns: Set[str] = field(default_factory=set)
# (worker_name) -> {config}
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
worker_extra_conf: str = ""
# True if and only if multiple of this worker type are allowed.
sharding_allowed: bool = True
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
@ -88,75 +113,60 @@ WORKER_PLACEHOLDER_NAME = "placeholder_name"
# Watching /_matrix/media and related needs a "media" listener
# Stream Writers require "client" and "replication" listeners because they
# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"user_dir": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"pusher_instances": [worker_name],
}
),
"user_dir": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"media_repository": {
"app": "synapse.app.generic_worker",
"listener_resources": ["media"],
"endpoint_patterns": [
shared_extra_conf=lambda worker_name: {
"update_user_directory_from_worker": worker_name
},
),
"media_repository": WorkerTemplate(
listener_resources={"media"},
endpoint_patterns={
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
],
},
# The first configured media worker will run the media background jobs
"shared_extra_conf": {
shared_extra_conf=lambda worker_name: {
"enable_media_repo": False,
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
"media_instance_running_background_jobs": worker_name,
},
"worker_extra_conf": "enable_media_repo: true",
},
"appservice": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
worker_extra_conf="enable_media_repo: true",
),
"appservice": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"notify_appservices_from_worker": worker_name
},
"worker_extra_conf": "",
},
"federation_sender": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"synchrotron": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
),
"federation_sender": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"federation_sender_instances": [worker_name],
}
),
"synchrotron": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"client_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
},
),
"client_reader": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
@ -184,14 +194,11 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
"endpoint_patterns": [
},
),
"federation_reader": WorkerTemplate(
listener_resources={"federation"},
endpoint_patterns={
"^/_matrix/federation/(v1|v2)/event/",
"^/_matrix/federation/(v1|v2)/state/",
"^/_matrix/federation/(v1|v2)/state_ids/",
@ -211,97 +218,85 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/federation/(v1|v2)/user/devices/",
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
"^/_matrix/key/v2/query",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_inbound": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"event_persister": {
"app": "synapse.app.generic_worker",
"listener_resources": ["replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"background_worker": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
},
),
"federation_inbound": WorkerTemplate(
listener_resources={"federation"},
endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"},
),
"event_persister": WorkerTemplate(
listener_resources={"replication"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"events": [worker_name]}
},
),
"background_worker": WorkerTemplate(
# This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database.
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
"worker_extra_conf": "",
},
"event_creator": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name},
sharding_allowed=False,
),
"event_creator": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"frontend_proxy": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"account_data": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
},
),
"frontend_proxy": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"},
),
"account_data": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"presence": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"receipts": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"account_data": [worker_name]}
},
sharding_allowed=False,
),
"presence": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"presence": [worker_name]}
},
sharding_allowed=False,
),
"receipts": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"to_device": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"typing": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"receipts": [worker_name]}
},
sharding_allowed=False,
),
"to_device": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"to_device": [worker_name]}
},
sharding_allowed=False,
),
"typing": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"typing": [worker_name]}
},
sharding_allowed=False,
),
}
# Templates for sections that may be inserted multiple times in config files
@ -336,6 +331,40 @@ def flush_buffers() -> None:
sys.stderr.flush()
def merge_into(dest: Any, new: Any) -> None:
"""
Merges `new` into `dest` with the following rules:
- dicts: values with the same key will be merged recursively
- lists: `new` will be appended to `dest`
- primitives: they will be checked for equality and inequality will result
in a ValueError
It is an error for `dest` and `new` to be of different types.
"""
if isinstance(dest, dict) and isinstance(new, dict):
for k, v in new.items():
if k in dest:
merge_into(dest[k], v)
else:
dest[k] = v
elif isinstance(dest, list) and isinstance(new, list):
dest.extend(new)
elif type(dest) != type(new):
raise TypeError(f"Cannot merge {type(dest).__name__} and {type(new).__name__}")
elif dest != new:
raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}")
def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
"""
Merges `b` into `a` and returns `a`. Here because we can't use `merge_into`
in a lamba conveniently.
"""
merge_into(a, b)
return a
def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template
@ -364,138 +393,79 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
outfile.write(rendered)
def add_worker_roles_to_shared_config(
def add_worker_to_instance_map(
shared_config: dict,
worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
"""Given a dictionary representing a config file shared across all workers,
append appropriate worker information to it for the current worker_type instance.
"""
Update the shared config map to add the worker in the instance_map.
Args:
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication
# streams
instance_map = shared_config.setdefault("instance_map", {})
# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
]
# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)
if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)
# Map of stream writer instance names to host/ports combos
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs(
existing_dict: Optional[Dict[str, Any]],
to_be_merged_dict: Dict[str, Any],
left: WorkerTemplate,
right: WorkerTemplate,
) -> WorkerTemplate:
"""Merges two templates together, returning a new template that includes
the listeners, endpoint patterns and configuration from both.
Does not mutate the input templates.
"""
return WorkerTemplate(
# include listener resources from both
listener_resources=left.listener_resources | right.listener_resources,
# include endpoint patterns from both
endpoint_patterns=left.endpoint_patterns | right.endpoint_patterns,
# merge shared config dictionaries; the worker name will be replaced later
shared_extra_conf=lambda worker_name: merged(
left.shared_extra_conf(worker_name),
right.shared_extra_conf(worker_name),
),
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
worker_extra_conf=(left.worker_extra_conf + right.worker_extra_conf),
# (This is unused, but in principle sharding this hybrid worker type
# would be allowed if both constituent types are shardable)
sharding_allowed=left.sharding_allowed and right.sharding_allowed,
)
def instantiate_worker_template(
template: WorkerTemplate, worker_name: str
) -> Dict[str, Any]:
"""When given an existing dict of worker template configuration consisting with both
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
return new dict.
"""Given a worker template, instantiate it into a worker configuration
(which is currently represented as a dictionary).
Args:
existing_dict: Either an existing worker template or a fresh blank one.
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
existing_dict.
Returns: The newly merged together dict values.
template: The WorkerTemplate to template
worker_name: The name of the worker to use.
Returns: worker configuration dictionary
"""
new_dict: Dict[str, Any] = {}
if not existing_dict:
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
new_dict = to_be_merged_dict.copy()
else:
for i in to_be_merged_dict.keys():
if (i == "endpoint_patterns") or (i == "listener_resources"):
# merge the two lists, remove duplicates
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
elif i == "shared_extra_conf":
# merge dictionary's, the worker name will be replaced later
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
elif i == "worker_extra_conf":
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
else:
# Everything else should be identical, like "app", which only works
# because all apps are now generic_workers.
new_dict[i] = to_be_merged_dict[i]
return new_dict
def insert_worker_name_for_worker_config(
existing_dict: Dict[str, Any], worker_name: str
) -> Dict[str, Any]:
"""Insert a given worker name into the worker's configuration dict.
Args:
existing_dict: The worker_config dict that is imported into shared_config.
worker_name: The name of the worker to insert.
Returns: Copy of the dict with newly inserted worker name
"""
dict_to_edit = existing_dict.copy()
for k, v in dict_to_edit["shared_extra_conf"].items():
# Only proceed if it's the placeholder name string
if v == WORKER_PLACEHOLDER_NAME:
dict_to_edit["shared_extra_conf"][k] = worker_name
return dict_to_edit
worker_config_dict = dataclasses.asdict(template)
worker_config_dict["shared_extra_conf"] = template.shared_extra_conf(worker_name)
worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns)
worker_config_dict["listener_resources"] = sorted(template.listener_resources)
return worker_config_dict
def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
@ -540,23 +510,6 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
return new_worker_types
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"""Helper to check to make sure worker types that cannot have multiples do not.
Args:
worker_type: The type of worker to check against.
Returns: True if allowed, False if not
"""
return worker_type not in [
"background_worker",
"account_data",
"presence",
"receipts",
"typing",
"to_device",
]
def split_and_strip_string(
given_string: str, split_char: str, max_split: SupportsIndex = -1
) -> List[str]:
@ -682,7 +635,7 @@ def parse_worker_types(
)
if worker_type in worker_type_shard_counter:
if not is_sharding_allowed_for_worker_type(worker_type):
if not WORKERS_CONFIG[worker_type].sharding_allowed:
error(
f"There can be only a single worker with {worker_type} "
"type. Please recount and remove."
@ -811,36 +764,35 @@ def generate_worker_files(
# Map locations to upstreams (corresponding to worker types) in Nginx
# but only if we use the appropriate worker type
for worker_type in all_worker_types_in_use:
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns):
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
# For each worker type specified by the user, create config values and write it's
# yaml config file
for worker_name, worker_types_set in requested_worker_types.items():
# The collected and processed data will live here.
worker_config: Dict[str, Any] = {}
worker_template: WorkerTemplate = WorkerTemplate()
# Merge all worker config templates for this worker into a single config
for worker_type in worker_types_set:
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
# Merge worker type template configuration data. It's a combination of lists
# and dicts, so use this helper.
worker_config = merge_worker_template_configs(
worker_config, copy_of_template_config
worker_template = merge_worker_template_configs(
worker_template, WORKERS_CONFIG[worker_type]
)
# Replace placeholder names in the config template with the actual worker name.
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
worker_config: Dict[str, Any] = instantiate_worker_template(
worker_template, worker_name
)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any worker_type specific options. The first of a
# given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config)
shared_config = worker_config["shared_extra_conf"]
# Update the shared config with any options needed to enable this worker.
merge_into(shared_config, worker_config["shared_extra_conf"])
if using_unix_sockets:
healthcheck_urls.append(
f"--unix-socket /run/worker.{worker_port} http://localhost/health"
@ -848,10 +800,10 @@ def generate_worker_files(
else:
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_types_set, worker_name, worker_port
)
# Add all workers to the `instance_map`
# Technically only certain types of workers, such as stream writers, are needed
# here but it is simpler just to be consistent.
add_worker_to_instance_map(shared_config, worker_name, worker_port)
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
@ -1018,6 +970,14 @@ def generate_worker_log_config(
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
parser = ArgumentParser()
parser.add_argument(
"--generate-only",
action="store_true",
help="Only generate configuration; don't run Synapse.",
)
opts = parser.parse_args(args)
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
@ -1034,8 +994,8 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
log("Base homeserver config exists—not regenerating")
# This script may be run multiple times (mostly by Complement, see note at top of
# file). Don't re-configure workers in this instance.
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
if not os.path.exists(MARKER_FILE_PATH):
# Collect and validate worker_type requests
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
@ -1054,11 +1014,15 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
# Mark workers as being configured
with open(mark_filepath, "w") as f:
with open(MARKER_FILE_PATH, "w") as f:
f.write("")
else:
log("Worker config exists—not regenerating")
if opts.generate_only:
log("--generate-only: won't run Synapse")
return
# Lifted right out of start.py
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
@ -1081,4 +1045,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if __name__ == "__main__":
main(sys.argv, os.environ)
main(sys.argv[1:], os.environ)