1030 lines
		
	
	
		
			41 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			1030 lines
		
	
	
		
			41 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python
 | |
| # Copyright 2021 The Matrix.org Foundation C.I.C.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| # This script reads environment variables and generates a shared Synapse worker,
 | |
| # nginx and supervisord configs depending on the workers requested.
 | |
| #
 | |
| # The environment variables it reads are:
 | |
| #   * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
 | |
| #   * SYNAPSE_REPORT_STATS: Whether to report stats.
 | |
| #   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
 | |
| #         below. Leave empty for no workers. Add a ':' and a number at the end to
 | |
| #         multiply that worker. Append multiple worker types with '+' to merge the
 | |
| #         worker types into a single worker. Add a name and a '=' to the front of a
 | |
| #         worker type to give this instance a name in logs and nginx.
 | |
| #         Examples:
 | |
| #         SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
 | |
| #         SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
 | |
| #         SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
 | |
| #   * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
 | |
| #         will be treated as Application Service registration files.
 | |
| #   * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
 | |
| #   * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
 | |
| #         Nginx will be configured to serve TLS on port 8448.
 | |
| #   * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
 | |
| #         only intended for usage in Complement at the moment.
 | |
| #         No stability guarantees are provided.
 | |
| #   * SYNAPSE_LOG_LEVEL: Set this to DEBUG, INFO, WARNING or ERROR to change the
 | |
| #         log level. INFO is the default.
 | |
| #   * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
 | |
| #         regardless of the SYNAPSE_LOG_LEVEL setting.
 | |
| #
 | |
| # NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
 | |
| # in the project's README), this script may be run multiple times, and functionality should
 | |
| # continue to work if so.
 | |
| 
 | |
| import os
 | |
| import platform
 | |
| import re
 | |
| import subprocess
 | |
| import sys
 | |
| from collections import defaultdict
 | |
| from itertools import chain
 | |
| from pathlib import Path
 | |
| from typing import (
 | |
|     Any,
 | |
|     Dict,
 | |
|     List,
 | |
|     Mapping,
 | |
|     MutableMapping,
 | |
|     NoReturn,
 | |
|     Optional,
 | |
|     Set,
 | |
|     SupportsIndex,
 | |
| )
 | |
| 
 | |
| import yaml
 | |
| from jinja2 import Environment, FileSystemLoader
 | |
| 
 | |
| MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
 | |
| MAIN_PROCESS_INSTANCE_NAME = "main"
 | |
| MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
 | |
| MAIN_PROCESS_REPLICATION_PORT = 9093
 | |
| 
 | |
| # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
 | |
| # during processing with the name of the worker.
 | |
| WORKER_PLACEHOLDER_NAME = "placeholder_name"
 | |
| 
 | |
| # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
 | |
| # Watching /_matrix/client needs a "client" listener
 | |
| # Watching /_matrix/federation needs a "federation" listener
 | |
| # Watching /_matrix/media and related needs a "media" listener
 | |
| # Stream Writers require "client" and "replication" listeners because they
 | |
| #   have to attach by instance_map to the master process and have client endpoints.
 | |
| WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
 | |
|     "pusher": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "user_dir": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
 | |
|         ],
 | |
|         "shared_extra_conf": {
 | |
|             "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
 | |
|         },
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "media_repository": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["media"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/media/",
 | |
|             "^/_synapse/admin/v1/purge_media_cache$",
 | |
|             "^/_synapse/admin/v1/room/.*/media.*$",
 | |
|             "^/_synapse/admin/v1/user/.*/media.*$",
 | |
|             "^/_synapse/admin/v1/media/.*$",
 | |
|             "^/_synapse/admin/v1/quarantine_media/.*$",
 | |
|         ],
 | |
|         # The first configured media worker will run the media background jobs
 | |
|         "shared_extra_conf": {
 | |
|             "enable_media_repo": False,
 | |
|             "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
 | |
|         },
 | |
|         "worker_extra_conf": "enable_media_repo: true",
 | |
|     },
 | |
|     "appservice": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {
 | |
|             "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
 | |
|         },
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "federation_sender": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "synchrotron": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(v2_alpha|r0|v3)/sync$",
 | |
|             "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3)/initialSync$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "client_reader": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
 | |
|             "^/_matrix/client/v1/rooms/.*/hierarchy$",
 | |
|             "^/_matrix/client/(v1|unstable)/rooms/.*/relations/",
 | |
|             "^/_matrix/client/v1/rooms/.*/threads$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
 | |
|             "^/_matrix/client/versions$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/register$",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/register/available$",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
 | |
|             "^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/search",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/password_policy$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/capabilities$",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "federation_reader": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["federation"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/federation/(v1|v2)/event/",
 | |
|             "^/_matrix/federation/(v1|v2)/state/",
 | |
|             "^/_matrix/federation/(v1|v2)/state_ids/",
 | |
|             "^/_matrix/federation/(v1|v2)/backfill/",
 | |
|             "^/_matrix/federation/(v1|v2)/get_missing_events/",
 | |
|             "^/_matrix/federation/(v1|v2)/publicRooms",
 | |
|             "^/_matrix/federation/(v1|v2)/query/",
 | |
|             "^/_matrix/federation/(v1|v2)/make_join/",
 | |
|             "^/_matrix/federation/(v1|v2)/make_leave/",
 | |
|             "^/_matrix/federation/(v1|v2)/send_join/",
 | |
|             "^/_matrix/federation/(v1|v2)/send_leave/",
 | |
|             "^/_matrix/federation/(v1|v2)/invite/",
 | |
|             "^/_matrix/federation/(v1|v2)/query_auth/",
 | |
|             "^/_matrix/federation/(v1|v2)/event_auth/",
 | |
|             "^/_matrix/federation/v1/timestamp_to_event/",
 | |
|             "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
 | |
|             "^/_matrix/federation/(v1|v2)/user/devices/",
 | |
|             "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
 | |
|             "^/_matrix/key/v2/query",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "federation_inbound": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["federation"],
 | |
|         "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "event_persister": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["replication"],
 | |
|         "endpoint_patterns": [],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "background_worker": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": [],
 | |
|         "endpoint_patterns": [],
 | |
|         # This worker cannot be sharded. Therefore, there should only ever be one
 | |
|         # background worker. This is enforced for the safety of your database.
 | |
|         "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "event_creator": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
 | |
|             "^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "frontend_proxy": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "account_data": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(r0|v3|unstable)/.*/tags",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "presence": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "receipts": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
 | |
|             "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "to_device": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
|     "typing": {
 | |
|         "app": "synapse.app.generic_worker",
 | |
|         "listener_resources": ["client", "replication"],
 | |
|         "endpoint_patterns": [
 | |
|             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
 | |
|         ],
 | |
|         "shared_extra_conf": {},
 | |
|         "worker_extra_conf": "",
 | |
|     },
 | |
| }
 | |
| 
 | |
| # Templates for sections that may be inserted multiple times in config files
 | |
| NGINX_LOCATION_CONFIG_BLOCK = """
 | |
|     location ~* {endpoint} {{
 | |
|         proxy_pass {upstream};
 | |
|         proxy_set_header X-Forwarded-For $remote_addr;
 | |
|         proxy_set_header X-Forwarded-Proto $scheme;
 | |
|         proxy_set_header Host $host;
 | |
|     }}
 | |
| """
 | |
| 
 | |
| NGINX_UPSTREAM_CONFIG_BLOCK = """
 | |
| upstream {upstream_worker_base_name} {{
 | |
| {body}
 | |
| }}
 | |
| """
 | |
| 
 | |
| 
 | |
| # Utility functions
 | |
| def log(txt: str) -> None:
 | |
|     print(txt)
 | |
| 
 | |
| 
 | |
| def error(txt: str) -> NoReturn:
 | |
|     print(txt, file=sys.stderr)
 | |
|     sys.exit(2)
 | |
| 
 | |
| 
 | |
| def flush_buffers() -> None:
 | |
|     sys.stdout.flush()
 | |
|     sys.stderr.flush()
 | |
| 
 | |
| 
 | |
| def convert(src: str, dst: str, **template_vars: object) -> None:
 | |
|     """Generate a file from a template
 | |
| 
 | |
|     Args:
 | |
|         src: Path to the input file.
 | |
|         dst: Path to write to.
 | |
|         template_vars: The arguments to replace placeholder variables in the template with.
 | |
|     """
 | |
|     # Read the template file
 | |
|     # We disable autoescape to prevent template variables from being escaped,
 | |
|     # as we're not using HTML.
 | |
|     env = Environment(loader=FileSystemLoader(os.path.dirname(src)), autoescape=False)
 | |
|     template = env.get_template(os.path.basename(src))
 | |
| 
 | |
|     # Generate a string from the template.
 | |
|     rendered = template.render(**template_vars)
 | |
| 
 | |
|     # Write the generated contents to a file
 | |
|     #
 | |
|     # We use append mode in case the files have already been written to by something else
 | |
|     # (for instance, as part of the instructions in a dockerfile).
 | |
|     with open(dst, "a") as outfile:
 | |
|         # In case the existing file doesn't end with a newline
 | |
|         outfile.write("\n")
 | |
| 
 | |
|         outfile.write(rendered)
 | |
| 
 | |
| 
 | |
| def add_worker_roles_to_shared_config(
 | |
|     shared_config: dict,
 | |
|     worker_types_set: Set[str],
 | |
|     worker_name: str,
 | |
|     worker_port: int,
 | |
| ) -> None:
 | |
|     """Given a dictionary representing a config file shared across all workers,
 | |
|     append appropriate worker information to it for the current worker_type instance.
 | |
| 
 | |
|     Args:
 | |
|         shared_config: The config dict that all worker instances share (after being
 | |
|             converted to YAML)
 | |
|         worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
 | |
|             This list can be a single worker type or multiple.
 | |
|         worker_name: The name of the worker instance.
 | |
|         worker_port: The HTTP replication port that the worker instance is listening on.
 | |
|     """
 | |
|     # The instance_map config field marks the workers that write to various replication
 | |
|     # streams
 | |
|     instance_map = shared_config.setdefault("instance_map", {})
 | |
| 
 | |
|     # This is a list of the stream_writers that there can be only one of. Events can be
 | |
|     # sharded, and therefore doesn't belong here.
 | |
|     singular_stream_writers = [
 | |
|         "account_data",
 | |
|         "presence",
 | |
|         "receipts",
 | |
|         "to_device",
 | |
|         "typing",
 | |
|     ]
 | |
| 
 | |
|     # Worker-type specific sharding config. Now a single worker can fulfill multiple
 | |
|     # roles, check each.
 | |
|     if "pusher" in worker_types_set:
 | |
|         shared_config.setdefault("pusher_instances", []).append(worker_name)
 | |
| 
 | |
|     if "federation_sender" in worker_types_set:
 | |
|         shared_config.setdefault("federation_sender_instances", []).append(worker_name)
 | |
| 
 | |
|     if "event_persister" in worker_types_set:
 | |
|         # Event persisters write to the events stream, so we need to update
 | |
|         # the list of event stream writers
 | |
|         shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
 | |
|             worker_name
 | |
|         )
 | |
| 
 | |
|         # Map of stream writer instance names to host/ports combos
 | |
|         instance_map[worker_name] = {
 | |
|             "host": "localhost",
 | |
|             "port": worker_port,
 | |
|         }
 | |
| 
 | |
|     # Update the list of stream writers. It's convenient that the name of the worker
 | |
|     # type is the same as the stream to write. Iterate over the whole list in case there
 | |
|     # is more than one.
 | |
|     for worker in worker_types_set:
 | |
|         if worker in singular_stream_writers:
 | |
|             shared_config.setdefault("stream_writers", {}).setdefault(
 | |
|                 worker, []
 | |
|             ).append(worker_name)
 | |
| 
 | |
|             # Map of stream writer instance names to host/ports combos
 | |
|             # For now, all stream writers need http replication ports
 | |
|             instance_map[worker_name] = {
 | |
|                 "host": "localhost",
 | |
|                 "port": worker_port,
 | |
|             }
 | |
| 
 | |
| 
 | |
| def merge_worker_template_configs(
 | |
|     existing_dict: Optional[Dict[str, Any]],
 | |
|     to_be_merged_dict: Dict[str, Any],
 | |
| ) -> Dict[str, Any]:
 | |
|     """When given an existing dict of worker template configuration consisting with both
 | |
|         dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
 | |
|         return new dict.
 | |
| 
 | |
|     Args:
 | |
|         existing_dict: Either an existing worker template or a fresh blank one.
 | |
|         to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
 | |
|             existing_dict.
 | |
|     Returns: The newly merged together dict values.
 | |
|     """
 | |
|     new_dict: Dict[str, Any] = {}
 | |
|     if not existing_dict:
 | |
|         # It doesn't exist yet, just use the new dict(but take a copy not a reference)
 | |
|         new_dict = to_be_merged_dict.copy()
 | |
|     else:
 | |
|         for i in to_be_merged_dict.keys():
 | |
|             if (i == "endpoint_patterns") or (i == "listener_resources"):
 | |
|                 # merge the two lists, remove duplicates
 | |
|                 new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
 | |
|             elif i == "shared_extra_conf":
 | |
|                 # merge dictionary's, the worker name will be replaced later
 | |
|                 new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
 | |
|             elif i == "worker_extra_conf":
 | |
|                 # There is only one worker type that has a 'worker_extra_conf' and it is
 | |
|                 # the media_repo. Since duplicate worker types on the same worker don't
 | |
|                 # work, this is fine.
 | |
|                 new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
 | |
|             else:
 | |
|                 # Everything else should be identical, like "app", which only works
 | |
|                 # because all apps are now generic_workers.
 | |
|                 new_dict[i] = to_be_merged_dict[i]
 | |
|     return new_dict
 | |
| 
 | |
| 
 | |
| def insert_worker_name_for_worker_config(
 | |
|     existing_dict: Dict[str, Any], worker_name: str
 | |
| ) -> Dict[str, Any]:
 | |
|     """Insert a given worker name into the worker's configuration dict.
 | |
| 
 | |
|     Args:
 | |
|         existing_dict: The worker_config dict that is imported into shared_config.
 | |
|         worker_name: The name of the worker to insert.
 | |
|     Returns: Copy of the dict with newly inserted worker name
 | |
|     """
 | |
|     dict_to_edit = existing_dict.copy()
 | |
|     for k, v in dict_to_edit["shared_extra_conf"].items():
 | |
|         # Only proceed if it's the placeholder name string
 | |
|         if v == WORKER_PLACEHOLDER_NAME:
 | |
|             dict_to_edit["shared_extra_conf"][k] = worker_name
 | |
|     return dict_to_edit
 | |
| 
 | |
| 
 | |
| def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
 | |
|     """
 | |
|     Apply multiplier(if found) by returning a new expanded list with some basic error
 | |
|     checking.
 | |
| 
 | |
|     Args:
 | |
|         worker_types: The unprocessed List of requested workers
 | |
|     Returns:
 | |
|         A new list with all requested workers expanded.
 | |
|     """
 | |
|     # Checking performed:
 | |
|     # 1. if worker:2 or more is declared, it will create additional workers up to number
 | |
|     # 2. if worker:1, it will create a single copy of this worker as if no number was
 | |
|     #   given
 | |
|     # 3. if worker:0 is declared, this worker will be ignored. This is to allow for
 | |
|     #   scripting and automated expansion and is intended behaviour.
 | |
|     # 4. if worker:NaN or is a negative number, it will error and log it.
 | |
|     new_worker_types = []
 | |
|     for worker_type in worker_types:
 | |
|         if ":" in worker_type:
 | |
|             worker_type_components = split_and_strip_string(worker_type, ":", 1)
 | |
|             worker_count = 0
 | |
|             # Should only be 2 components, a type of worker(s) and an integer as a
 | |
|             # string. Cast the number as an int then it can be used as a counter.
 | |
|             try:
 | |
|                 worker_count = int(worker_type_components[1])
 | |
|             except ValueError:
 | |
|                 error(
 | |
|                     f"Bad number in worker count for '{worker_type}': "
 | |
|                     f"'{worker_type_components[1]}' is not an integer"
 | |
|                 )
 | |
| 
 | |
|             # As long as there are more than 0, we add one to the list to make below.
 | |
|             for _ in range(worker_count):
 | |
|                 new_worker_types.append(worker_type_components[0])
 | |
| 
 | |
|         else:
 | |
|             # If it's not a real worker_type, it will error out later.
 | |
|             new_worker_types.append(worker_type)
 | |
|     return new_worker_types
 | |
| 
 | |
| 
 | |
| def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
 | |
|     """Helper to check to make sure worker types that cannot have multiples do not.
 | |
| 
 | |
|     Args:
 | |
|         worker_type: The type of worker to check against.
 | |
|     Returns: True if allowed, False if not
 | |
|     """
 | |
|     return worker_type not in [
 | |
|         "background_worker",
 | |
|         "account_data",
 | |
|         "presence",
 | |
|         "receipts",
 | |
|         "typing",
 | |
|         "to_device",
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def split_and_strip_string(
 | |
|     given_string: str, split_char: str, max_split: SupportsIndex = -1
 | |
| ) -> List[str]:
 | |
|     """
 | |
|     Helper to split a string on split_char and strip whitespace from each end of each
 | |
|         element.
 | |
|     Args:
 | |
|         given_string: The string to split
 | |
|         split_char: The character to split the string on
 | |
|         max_split: kwarg for split() to limit how many times the split() happens
 | |
|     Returns:
 | |
|         A List of strings
 | |
|     """
 | |
|     # Removes whitespace from ends of result strings before adding to list. Allow for
 | |
|     # overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
 | |
|     return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]
 | |
| 
 | |
| 
 | |
| def generate_base_homeserver_config() -> None:
 | |
|     """Starts Synapse and generates a basic homeserver config, which will later be
 | |
|     modified for worker support.
 | |
| 
 | |
|     Raises: CalledProcessError if calling start.py returned a non-zero exit code.
 | |
|     """
 | |
|     # start.py already does this for us, so just call that.
 | |
|     # note that this script is copied in in the official, monolith dockerfile
 | |
|     os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
 | |
|     subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
 | |
| 
 | |
| 
 | |
| def parse_worker_types(
 | |
|     requested_worker_types: List[str],
 | |
| ) -> Dict[str, Set[str]]:
 | |
|     """Read the desired list of requested workers and prepare the data for use in
 | |
|         generating worker config files while also checking for potential gotchas.
 | |
| 
 | |
|     Args:
 | |
|         requested_worker_types: The list formed from the split environment variable
 | |
|             containing the unprocessed requests for workers.
 | |
| 
 | |
|     Returns: A dict of worker names to set of worker types. Format:
 | |
|         {'worker_name':
 | |
|             {'worker_type', 'worker_type2'}
 | |
|         }
 | |
|     """
 | |
|     # A counter of worker_base_name -> int. Used for determining the name for a given
 | |
|     # worker when generating its config file, as each worker's name is just
 | |
|     # worker_base_name followed by instance number
 | |
|     worker_base_name_counter: Dict[str, int] = defaultdict(int)
 | |
| 
 | |
|     # Similar to above, but more finely grained. This is used to determine we don't have
 | |
|     # more than a single worker for cases where multiples would be bad(e.g. presence).
 | |
|     worker_type_shard_counter: Dict[str, int] = defaultdict(int)
 | |
| 
 | |
|     # The final result of all this processing
 | |
|     dict_to_return: Dict[str, Set[str]] = {}
 | |
| 
 | |
|     # Handle any multipliers requested for given workers.
 | |
|     multiple_processed_worker_types = apply_requested_multiplier_for_worker(
 | |
|         requested_worker_types
 | |
|     )
 | |
| 
 | |
|     # Process each worker_type_string
 | |
|     # Examples of expected formats:
 | |
|     #  - requested_name=type1+type2+type3
 | |
|     #  - synchrotron
 | |
|     #  - event_creator+event_persister
 | |
|     for worker_type_string in multiple_processed_worker_types:
 | |
|         # First, if a name is requested, use that — otherwise generate one.
 | |
|         worker_base_name: str = ""
 | |
|         if "=" in worker_type_string:
 | |
|             # Split on "=", remove extra whitespace from ends then make list
 | |
|             worker_type_split = split_and_strip_string(worker_type_string, "=")
 | |
|             if len(worker_type_split) > 2:
 | |
|                 error(
 | |
|                     "There should only be one '=' in the worker type string. "
 | |
|                     f"Please fix: {worker_type_string}"
 | |
|                 )
 | |
| 
 | |
|             # Assign the name
 | |
|             worker_base_name = worker_type_split[0]
 | |
| 
 | |
|             if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z_+-]$", worker_base_name):
 | |
|                 # Apply a fairly narrow regex to the worker names. Some characters
 | |
|                 # aren't safe for use in file paths or nginx configurations.
 | |
|                 # Don't allow to end with a number because we'll add a number
 | |
|                 # ourselves in a moment.
 | |
|                 error(
 | |
|                     "Invalid worker name; please choose a name consisting of "
 | |
|                     "alphanumeric letters, _ + -, but not ending with a digit: "
 | |
|                     f"{worker_base_name!r}"
 | |
|                 )
 | |
| 
 | |
|             # Continue processing the remainder of the worker_type string
 | |
|             # with the name override removed.
 | |
|             worker_type_string = worker_type_split[1]
 | |
| 
 | |
|         # Split the worker_type_string on "+", remove whitespace from ends then make
 | |
|         # the list a set so it's deduplicated.
 | |
|         worker_types_set: Set[str] = set(
 | |
|             split_and_strip_string(worker_type_string, "+")
 | |
|         )
 | |
| 
 | |
|         if not worker_base_name:
 | |
|             # No base name specified: generate one deterministically from set of
 | |
|             # types
 | |
|             worker_base_name = "+".join(sorted(worker_types_set))
 | |
| 
 | |
|         # At this point, we have:
 | |
|         #   worker_base_name which is the name for the worker, without counter.
 | |
|         #   worker_types_set which is the set of worker types for this worker.
 | |
| 
 | |
|         # Validate worker_type and make sure we don't allow sharding for a worker type
 | |
|         # that doesn't support it. Will error and stop if it is a problem,
 | |
|         # e.g. 'background_worker'.
 | |
|         for worker_type in worker_types_set:
 | |
|             # Verify this is a real defined worker type. If it's not, stop everything so
 | |
|             # it can be fixed.
 | |
|             if worker_type not in WORKERS_CONFIG:
 | |
|                 error(
 | |
|                     f"{worker_type} is an unknown worker type! Was found in "
 | |
|                     f"'{worker_type_string}'. Please fix!"
 | |
|                 )
 | |
| 
 | |
|             if worker_type in worker_type_shard_counter:
 | |
|                 if not is_sharding_allowed_for_worker_type(worker_type):
 | |
|                     error(
 | |
|                         f"There can be only a single worker with {worker_type} "
 | |
|                         "type. Please recount and remove."
 | |
|                     )
 | |
|             # Not in shard counter, must not have seen it yet, add it.
 | |
|             worker_type_shard_counter[worker_type] += 1
 | |
| 
 | |
|         # Generate the number for the worker using incrementing counter
 | |
|         worker_base_name_counter[worker_base_name] += 1
 | |
|         worker_number = worker_base_name_counter[worker_base_name]
 | |
|         worker_name = f"{worker_base_name}{worker_number}"
 | |
| 
 | |
|         if worker_number > 1:
 | |
|             # If this isn't the first worker, check that we don't have a confusing
 | |
|             # mixture of worker types with the same base name.
 | |
|             first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"]
 | |
|             if first_worker_with_base_name != worker_types_set:
 | |
|                 error(
 | |
|                     f"Can not use worker_name: '{worker_name}' for worker_type(s): "
 | |
|                     f"{worker_types_set!r}. It is already in use by "
 | |
|                     f"worker_type(s): {first_worker_with_base_name!r}"
 | |
|                 )
 | |
| 
 | |
|         dict_to_return[worker_name] = worker_types_set
 | |
| 
 | |
|     return dict_to_return
 | |
| 
 | |
| 
 | |
| def generate_worker_files(
 | |
|     environ: Mapping[str, str],
 | |
|     config_path: str,
 | |
|     data_dir: str,
 | |
|     requested_worker_types: Dict[str, Set[str]],
 | |
| ) -> None:
 | |
|     """Read the desired workers(if any) that is passed in and generate shared
 | |
|         homeserver, nginx and supervisord configs.
 | |
| 
 | |
|     Args:
 | |
|         environ: os.environ instance.
 | |
|         config_path: The location of the generated Synapse main worker config file.
 | |
|         data_dir: The location of the synapse data directory. Where log and
 | |
|             user-facing config files live.
 | |
|         requested_worker_types: A Dict containing requested workers in the format of
 | |
|             {'worker_name1': {'worker_type', ...}}
 | |
|     """
 | |
|     # Note that yaml cares about indentation, so care should be taken to insert lines
 | |
|     # into files at the correct indentation below.
 | |
| 
 | |
|     # First read the original config file and extract the listeners block. Then we'll
 | |
|     # add another listener for replication. Later we'll write out the result to the
 | |
|     # shared config file.
 | |
|     listeners = [
 | |
|         {
 | |
|             "port": MAIN_PROCESS_REPLICATION_PORT,
 | |
|             "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
 | |
|             "type": "http",
 | |
|             "resources": [{"names": ["replication"]}],
 | |
|         }
 | |
|     ]
 | |
|     with open(config_path) as file_stream:
 | |
|         original_config = yaml.safe_load(file_stream)
 | |
|         original_listeners = original_config.get("listeners")
 | |
|         if original_listeners:
 | |
|             listeners += original_listeners
 | |
| 
 | |
|     # The shared homeserver config. The contents of which will be inserted into the
 | |
|     # base shared worker jinja2 template. This config file will be passed to all
 | |
|     # workers, included Synapse's main process. It is intended mainly for disabling
 | |
|     # functionality when certain workers are spun up, and adding a replication listener.
 | |
|     shared_config: Dict[str, Any] = {"listeners": listeners}
 | |
| 
 | |
|     # List of dicts that describe workers.
 | |
|     # We pass this to the Supervisor template later to generate the appropriate
 | |
|     # program blocks.
 | |
|     worker_descriptors: List[Dict[str, Any]] = []
 | |
| 
 | |
|     # Upstreams for load-balancing purposes. This dict takes the form of the worker
 | |
|     # type to the ports of each worker. For example:
 | |
|     # {
 | |
|     #   worker_type: {1234, 1235, ...}}
 | |
|     # }
 | |
|     # and will be used to construct 'upstream' nginx directives.
 | |
|     nginx_upstreams: Dict[str, Set[int]] = {}
 | |
| 
 | |
|     # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
 | |
|     # will be placed after the proxy_pass directive. The main benefit to representing
 | |
|     # this data as a dict over a str is that we can easily deduplicate endpoints
 | |
|     # across multiple instances of the same worker. The final rendering will be combined
 | |
|     # with nginx_upstreams and placed in /etc/nginx/conf.d.
 | |
|     nginx_locations: Dict[str, str] = {}
 | |
| 
 | |
|     # Create the worker configuration directory if it doesn't already exist
 | |
|     os.makedirs("/conf/workers", exist_ok=True)
 | |
| 
 | |
|     # Start worker ports from this arbitrary port
 | |
|     worker_port = 18009
 | |
| 
 | |
|     # A list of internal endpoints to healthcheck, starting with the main process
 | |
|     # which exists even if no workers do.
 | |
|     healthcheck_urls = ["http://localhost:8080/health"]
 | |
| 
 | |
|     # Get the set of all worker types that we have configured
 | |
|     all_worker_types_in_use = set(chain(*requested_worker_types.values()))
 | |
|     # Map locations to upstreams (corresponding to worker types) in Nginx
 | |
|     # but only if we use the appropriate worker type
 | |
|     for worker_type in all_worker_types_in_use:
 | |
|         for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
 | |
|             nginx_locations[endpoint_pattern] = f"http://{worker_type}"
 | |
| 
 | |
|     # For each worker type specified by the user, create config values and write it's
 | |
|     # yaml config file
 | |
|     for worker_name, worker_types_set in requested_worker_types.items():
 | |
|         # The collected and processed data will live here.
 | |
|         worker_config: Dict[str, Any] = {}
 | |
| 
 | |
|         # Merge all worker config templates for this worker into a single config
 | |
|         for worker_type in worker_types_set:
 | |
|             copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
 | |
| 
 | |
|             # Merge worker type template configuration data. It's a combination of lists
 | |
|             # and dicts, so use this helper.
 | |
|             worker_config = merge_worker_template_configs(
 | |
|                 worker_config, copy_of_template_config
 | |
|             )
 | |
| 
 | |
|         # Replace placeholder names in the config template with the actual worker name.
 | |
|         worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
 | |
| 
 | |
|         worker_config.update(
 | |
|             {"name": worker_name, "port": str(worker_port), "config_path": config_path}
 | |
|         )
 | |
| 
 | |
|         # Update the shared config with any worker_type specific options. The first of a
 | |
|         # given worker_type needs to stay assigned and not be replaced.
 | |
|         worker_config["shared_extra_conf"].update(shared_config)
 | |
|         shared_config = worker_config["shared_extra_conf"]
 | |
| 
 | |
|         healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
 | |
| 
 | |
|         # Update the shared config with sharding-related options if necessary
 | |
|         add_worker_roles_to_shared_config(
 | |
|             shared_config, worker_types_set, worker_name, worker_port
 | |
|         )
 | |
| 
 | |
|         # Enable the worker in supervisord
 | |
|         worker_descriptors.append(worker_config)
 | |
| 
 | |
|         # Write out the worker's logging config file
 | |
|         log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
 | |
| 
 | |
|         # Then a worker config file
 | |
|         convert(
 | |
|             "/conf/worker.yaml.j2",
 | |
|             "/conf/workers/{name}.yaml".format(name=worker_name),
 | |
|             **worker_config,
 | |
|             worker_log_config_filepath=log_config_filepath,
 | |
|         )
 | |
| 
 | |
|         # Save this worker's port number to the correct nginx upstreams
 | |
|         for worker_type in worker_types_set:
 | |
|             nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
 | |
| 
 | |
|         worker_port += 1
 | |
| 
 | |
|     # Build the nginx location config blocks
 | |
|     nginx_location_config = ""
 | |
|     for endpoint, upstream in nginx_locations.items():
 | |
|         nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
 | |
|             endpoint=endpoint,
 | |
|             upstream=upstream,
 | |
|         )
 | |
| 
 | |
|     # Determine the load-balancing upstreams to configure
 | |
|     nginx_upstream_config = ""
 | |
|     for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
 | |
|         body = ""
 | |
|         for port in upstream_worker_ports:
 | |
|             body += f"    server localhost:{port};\n"
 | |
| 
 | |
|         # Add to the list of configured upstreams
 | |
|         nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
 | |
|             upstream_worker_base_name=upstream_worker_base_name,
 | |
|             body=body,
 | |
|         )
 | |
| 
 | |
|     # Finally, we'll write out the config files.
 | |
| 
 | |
|     # log config for the master process
 | |
|     master_log_config = generate_worker_log_config(environ, "master", data_dir)
 | |
|     shared_config["log_config"] = master_log_config
 | |
| 
 | |
|     # Find application service registrations
 | |
|     appservice_registrations = None
 | |
|     appservice_registration_dir = os.environ.get("SYNAPSE_AS_REGISTRATION_DIR")
 | |
|     if appservice_registration_dir:
 | |
|         # Scan for all YAML files that should be application service registrations.
 | |
|         appservice_registrations = [
 | |
|             str(reg_path.resolve())
 | |
|             for reg_path in Path(appservice_registration_dir).iterdir()
 | |
|             if reg_path.suffix.lower() in (".yaml", ".yml")
 | |
|         ]
 | |
| 
 | |
|     workers_in_use = len(requested_worker_types) > 0
 | |
| 
 | |
|     # If there are workers, add the main process to the instance_map too.
 | |
|     if workers_in_use:
 | |
|         instance_map = shared_config.setdefault("instance_map", {})
 | |
|         instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
 | |
|             "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
 | |
|             "port": MAIN_PROCESS_REPLICATION_PORT,
 | |
|         }
 | |
| 
 | |
|     # Shared homeserver config
 | |
|     convert(
 | |
|         "/conf/shared.yaml.j2",
 | |
|         "/conf/workers/shared.yaml",
 | |
|         shared_worker_config=yaml.dump(shared_config),
 | |
|         appservice_registrations=appservice_registrations,
 | |
|         enable_redis=workers_in_use,
 | |
|         workers_in_use=workers_in_use,
 | |
|     )
 | |
| 
 | |
|     # Nginx config
 | |
|     convert(
 | |
|         "/conf/nginx.conf.j2",
 | |
|         "/etc/nginx/conf.d/matrix-synapse.conf",
 | |
|         worker_locations=nginx_location_config,
 | |
|         upstream_directives=nginx_upstream_config,
 | |
|         tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
 | |
|         tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
 | |
|     )
 | |
| 
 | |
|     # Supervisord config
 | |
|     os.makedirs("/etc/supervisor", exist_ok=True)
 | |
|     convert(
 | |
|         "/conf/supervisord.conf.j2",
 | |
|         "/etc/supervisor/supervisord.conf",
 | |
|         main_config_path=config_path,
 | |
|         enable_redis=workers_in_use,
 | |
|     )
 | |
| 
 | |
|     convert(
 | |
|         "/conf/synapse.supervisord.conf.j2",
 | |
|         "/etc/supervisor/conf.d/synapse.conf",
 | |
|         workers=worker_descriptors,
 | |
|         main_config_path=config_path,
 | |
|         use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
 | |
|     )
 | |
| 
 | |
|     # healthcheck config
 | |
|     convert(
 | |
|         "/conf/healthcheck.sh.j2",
 | |
|         "/healthcheck.sh",
 | |
|         healthcheck_urls=healthcheck_urls,
 | |
|     )
 | |
| 
 | |
|     # Ensure the logging directory exists
 | |
|     log_dir = data_dir + "/logs"
 | |
|     if not os.path.exists(log_dir):
 | |
|         os.mkdir(log_dir)
 | |
| 
 | |
| 
 | |
| def generate_worker_log_config(
 | |
|     environ: Mapping[str, str], worker_name: str, data_dir: str
 | |
| ) -> str:
 | |
|     """Generate a log.config file for the given worker.
 | |
| 
 | |
|     Returns: the path to the generated file
 | |
|     """
 | |
|     # Check whether we should write worker logs to disk, in addition to the console
 | |
|     extra_log_template_args: Dict[str, Optional[str]] = {}
 | |
|     if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
 | |
|         extra_log_template_args["LOG_FILE_PATH"] = f"{data_dir}/logs/{worker_name}.log"
 | |
| 
 | |
|     extra_log_template_args["SYNAPSE_LOG_LEVEL"] = environ.get("SYNAPSE_LOG_LEVEL")
 | |
|     extra_log_template_args["SYNAPSE_LOG_SENSITIVE"] = environ.get(
 | |
|         "SYNAPSE_LOG_SENSITIVE"
 | |
|     )
 | |
| 
 | |
|     # Render and write the file
 | |
|     log_config_filepath = f"/conf/workers/{worker_name}.log.config"
 | |
|     convert(
 | |
|         "/conf/log.config",
 | |
|         log_config_filepath,
 | |
|         worker_name=worker_name,
 | |
|         **extra_log_template_args,
 | |
|         include_worker_name_in_log_line=environ.get(
 | |
|             "SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
 | |
|         ),
 | |
|     )
 | |
|     return log_config_filepath
 | |
| 
 | |
| 
 | |
| def main(args: List[str], environ: MutableMapping[str, str]) -> None:
 | |
|     config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
 | |
|     config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
 | |
|     data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
 | |
| 
 | |
|     # override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
 | |
|     # this needs to be handled by a frontend proxy
 | |
|     environ["SYNAPSE_NO_TLS"] = "yes"
 | |
| 
 | |
|     # Generate the base homeserver config if one does not yet exist
 | |
|     if not os.path.exists(config_path):
 | |
|         log("Generating base homeserver config")
 | |
|         generate_base_homeserver_config()
 | |
|     else:
 | |
|         log("Base homeserver config exists—not regenerating")
 | |
|     # This script may be run multiple times (mostly by Complement, see note at top of
 | |
|     # file). Don't re-configure workers in this instance.
 | |
|     mark_filepath = "/conf/workers_have_been_configured"
 | |
|     if not os.path.exists(mark_filepath):
 | |
|         # Collect and validate worker_type requests
 | |
|         # Read the desired worker configuration from the environment
 | |
|         worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
 | |
|         # Only process worker_types if they exist
 | |
|         if not worker_types_env:
 | |
|             # No workers, just the main process
 | |
|             worker_types = []
 | |
|             requested_worker_types: Dict[str, Any] = {}
 | |
|         else:
 | |
|             # Split type names by comma, ignoring whitespace.
 | |
|             worker_types = split_and_strip_string(worker_types_env, ",")
 | |
|             requested_worker_types = parse_worker_types(worker_types)
 | |
| 
 | |
|         # Always regenerate all other config files
 | |
|         log("Generating worker config files")
 | |
|         generate_worker_files(environ, config_path, data_dir, requested_worker_types)
 | |
| 
 | |
|         # Mark workers as being configured
 | |
|         with open(mark_filepath, "w") as f:
 | |
|             f.write("")
 | |
|     else:
 | |
|         log("Worker config exists—not regenerating")
 | |
| 
 | |
|     # Lifted right out of start.py
 | |
|     jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
 | |
| 
 | |
|     if os.path.isfile(jemallocpath):
 | |
|         environ["LD_PRELOAD"] = jemallocpath
 | |
|     else:
 | |
|         log("Could not find %s, will not use" % (jemallocpath,))
 | |
| 
 | |
|     # Start supervisord, which will start Synapse, all of the configured worker
 | |
|     # processes, redis, nginx etc. according to the config we created above.
 | |
|     log("Starting supervisord")
 | |
|     flush_buffers()
 | |
|     os.execle(
 | |
|         "/usr/local/bin/supervisord",
 | |
|         "supervisord",
 | |
|         "-c",
 | |
|         "/etc/supervisor/supervisord.conf",
 | |
|         environ,
 | |
|     )
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main(sys.argv, os.environ)
 |