Use a lambda for the worker name rather than search and replace later

rei/cwas_extension
Olivier Wilkinson (reivilibre) 2023-11-16 15:05:39 +00:00
parent a22eb7dc15
commit ba3b6a4dfd
1 changed files with 29 additions and 32 deletions

View File

@ -59,6 +59,7 @@ from itertools import chain
from pathlib import Path from pathlib import Path
from typing import ( from typing import (
Any, Any,
Callable,
Dict, Dict,
List, List,
Mapping, Mapping,
@ -80,10 +81,6 @@ MAIN_PROCESS_REPLICATION_PORT = 9093
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock" MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener # Watching /_matrix/client needs a "client" listener
@ -95,7 +92,8 @@ WORKER_PLACEHOLDER_NAME = "placeholder_name"
class WorkerTemplate: class WorkerTemplate:
listener_resources: List[str] = field(default_factory=list) listener_resources: List[str] = field(default_factory=list)
endpoint_patterns: List[str] = field(default_factory=list) endpoint_patterns: List[str] = field(default_factory=list)
shared_extra_conf: Dict[str, Any] = field(default_factory=dict) # (worker_name) -> {}
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
worker_extra_conf: str = "" worker_extra_conf: str = ""
@ -103,7 +101,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate( "pusher": WorkerTemplate(
listener_resources=[], listener_resources=[],
endpoint_patterns=[], endpoint_patterns=[],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"user_dir": WorkerTemplate( "user_dir": WorkerTemplate(
@ -111,8 +109,8 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
endpoint_patterns=[ endpoint_patterns=[
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
], ],
shared_extra_conf={ shared_extra_conf=lambda worker_name: {
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME "update_user_directory_from_worker": worker_name
}, },
worker_extra_conf="", worker_extra_conf="",
), ),
@ -127,22 +125,24 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_synapse/admin/v1/quarantine_media/.*$", "^/_synapse/admin/v1/quarantine_media/.*$",
], ],
# The first configured media worker will run the media background jobs # The first configured media worker will run the media background jobs
shared_extra_conf={ shared_extra_conf=lambda worker_name: {
"enable_media_repo": False, "enable_media_repo": False,
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, "media_instance_running_background_jobs": worker_name,
}, },
worker_extra_conf="enable_media_repo: true", worker_extra_conf="enable_media_repo: true",
), ),
"appservice": WorkerTemplate( "appservice": WorkerTemplate(
listener_resources=[], listener_resources=[],
endpoint_patterns=[], endpoint_patterns=[],
shared_extra_conf={"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME}, shared_extra_conf=lambda worker_name: {
"notify_appservices_from_worker": worker_name
},
worker_extra_conf="", worker_extra_conf="",
), ),
"federation_sender": WorkerTemplate( "federation_sender": WorkerTemplate(
listener_resources=[], listener_resources=[],
endpoint_patterns=[], endpoint_patterns=[],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"synchrotron": WorkerTemplate( "synchrotron": WorkerTemplate(
@ -153,7 +153,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
], ],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"client_reader": WorkerTemplate( "client_reader": WorkerTemplate(
@ -187,7 +187,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$", "^/_matrix/client/(r0|v3|unstable)/notifications$",
], ],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"federation_reader": WorkerTemplate( "federation_reader": WorkerTemplate(
@ -213,19 +213,19 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
"^/_matrix/key/v2/query", "^/_matrix/key/v2/query",
], ],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"federation_inbound": WorkerTemplate( "federation_inbound": WorkerTemplate(
listener_resources=["federation"], listener_resources=["federation"],
endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"event_persister": WorkerTemplate( "event_persister": WorkerTemplate(
listener_resources=["replication"], listener_resources=["replication"],
endpoint_patterns=[], endpoint_patterns=[],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"background_worker": WorkerTemplate( "background_worker": WorkerTemplate(
@ -233,7 +233,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
endpoint_patterns=[], endpoint_patterns=[],
# This worker cannot be sharded. Therefore, there should only ever be one # This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database. # background worker. This is enforced for the safety of your database.
shared_extra_conf={"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name},
worker_extra_conf="", worker_extra_conf="",
), ),
"event_creator": WorkerTemplate( "event_creator": WorkerTemplate(
@ -246,13 +246,13 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
], ],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"frontend_proxy": WorkerTemplate( "frontend_proxy": WorkerTemplate(
listener_resources=["client", "replication"], listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"account_data": WorkerTemplate( "account_data": WorkerTemplate(
@ -261,13 +261,13 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data", "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
], ],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"presence": WorkerTemplate( "presence": WorkerTemplate(
listener_resources=["client", "replication"], listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"receipts": WorkerTemplate( "receipts": WorkerTemplate(
@ -276,19 +276,19 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
], ],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"to_device": WorkerTemplate( "to_device": WorkerTemplate(
listener_resources=["client", "replication"], listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
"typing": WorkerTemplate( "typing": WorkerTemplate(
listener_resources=["client", "replication"], listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"],
shared_extra_conf={}, shared_extra_conf=lambda _worker_name: {},
worker_extra_conf="", worker_extra_conf="",
), ),
} }
@ -459,9 +459,9 @@ def merge_worker_template_configs(
) )
# merge dictionaries; the worker name will be replaced later # merge dictionaries; the worker name will be replaced later
new_template.shared_extra_conf = { new_template.shared_extra_conf = lambda worker_name: {
**new_template.shared_extra_conf, **new_template.shared_extra_conf(worker_name),
**to_be_merged_template.shared_extra_conf, **to_be_merged_template.shared_extra_conf(worker_name),
} }
# There is only one worker type that has a 'worker_extra_conf' and it is # There is only one worker type that has a 'worker_extra_conf' and it is
@ -485,10 +485,7 @@ def insert_worker_name_for_worker_config(
Returns: Copy of the dict with newly inserted worker name Returns: Copy of the dict with newly inserted worker name
""" """
dict_to_edit = dataclasses.asdict(existing_template) dict_to_edit = dataclasses.asdict(existing_template)
for k, v in dict_to_edit["shared_extra_conf"].items(): dict_to_edit["shared_extra_conf"] = existing_template.shared_extra_conf(worker_name)
# Only proceed if it's the placeholder name string
if v == WORKER_PLACEHOLDER_NAME:
dict_to_edit["shared_extra_conf"][k] = worker_name
return dict_to_edit return dict_to_edit