Convert worker templates into dataclass

rei/cwas_extension
Olivier Wilkinson (reivilibre) 2023-11-16 15:00:48 +00:00
parent b39a50a43b
commit a22eb7dc15
1 changed files with 162 additions and 150 deletions

View File

@ -47,12 +47,14 @@
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.
import dataclasses
import os
import platform
import re
import subprocess
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import chain
from pathlib import Path
from typing import (
@ -82,32 +84,41 @@ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
# Watching /_matrix/federation needs a "federation" listener
# Watching /_matrix/media and related needs a "media" listener
# Stream Writers require "client" and "replication" listeners because they
# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"user_dir": {
"listener_resources": ["client"],
"endpoint_patterns": [
@dataclass
class WorkerTemplate:
listener_resources: List[str] = field(default_factory=list)
endpoint_patterns: List[str] = field(default_factory=list)
shared_extra_conf: Dict[str, Any] = field(default_factory=dict)
worker_extra_conf: str = ""
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate(
listener_resources=[],
endpoint_patterns=[],
shared_extra_conf={},
worker_extra_conf="",
),
"user_dir": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {
shared_extra_conf={
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"media_repository": {
"listener_resources": ["media"],
"endpoint_patterns": [
worker_extra_conf="",
),
"media_repository": WorkerTemplate(
listener_resources=["media"],
endpoint_patterns=[
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
@ -116,40 +127,38 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_synapse/admin/v1/quarantine_media/.*$",
],
# The first configured media worker will run the media background jobs
"shared_extra_conf": {
shared_extra_conf={
"enable_media_repo": False,
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
},
"worker_extra_conf": "enable_media_repo: true",
},
"appservice": {
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"federation_sender": {
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"synchrotron": {
"listener_resources": ["client"],
"endpoint_patterns": [
worker_extra_conf="enable_media_repo: true",
),
"appservice": WorkerTemplate(
listener_resources=[],
endpoint_patterns=[],
shared_extra_conf={"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME},
worker_extra_conf="",
),
"federation_sender": WorkerTemplate(
listener_resources=[],
endpoint_patterns=[],
shared_extra_conf={},
worker_extra_conf="",
),
"synchrotron": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"client_reader": {
"listener_resources": ["client"],
"endpoint_patterns": [
shared_extra_conf={},
worker_extra_conf="",
),
"client_reader": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
@ -178,12 +187,12 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_reader": {
"listener_resources": ["federation"],
"endpoint_patterns": [
shared_extra_conf={},
worker_extra_conf="",
),
"federation_reader": WorkerTemplate(
listener_resources=["federation"],
endpoint_patterns=[
"^/_matrix/federation/(v1|v2)/event/",
"^/_matrix/federation/(v1|v2)/state/",
"^/_matrix/federation/(v1|v2)/state_ids/",
@ -204,32 +213,32 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
"^/_matrix/key/v2/query",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_inbound": {
"listener_resources": ["federation"],
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"event_persister": {
"listener_resources": ["replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"background_worker": {
"listener_resources": [],
"endpoint_patterns": [],
shared_extra_conf={},
worker_extra_conf="",
),
"federation_inbound": WorkerTemplate(
listener_resources=["federation"],
endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"],
shared_extra_conf={},
worker_extra_conf="",
),
"event_persister": WorkerTemplate(
listener_resources=["replication"],
endpoint_patterns=[],
shared_extra_conf={},
worker_extra_conf="",
),
"background_worker": WorkerTemplate(
listener_resources=[],
endpoint_patterns=[],
# This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database.
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
"worker_extra_conf": "",
},
"event_creator": {
"listener_resources": ["client"],
"endpoint_patterns": [
shared_extra_conf={"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
worker_extra_conf="",
),
"event_creator": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
@ -237,53 +246,51 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"frontend_proxy": {
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"account_data": {
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
shared_extra_conf={},
worker_extra_conf="",
),
"frontend_proxy": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
shared_extra_conf={},
worker_extra_conf="",
),
"account_data": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=[
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"presence": {
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"receipts": {
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
shared_extra_conf={},
worker_extra_conf="",
),
"presence": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
shared_extra_conf={},
worker_extra_conf="",
),
"receipts": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=[
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"to_device": {
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"typing": {
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
shared_extra_conf={},
worker_extra_conf="",
),
"to_device": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
shared_extra_conf={},
worker_extra_conf="",
),
"typing": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"],
shared_extra_conf={},
worker_extra_conf="",
),
}
# Templates for sections that may be inserted multiple times in config files
@ -425,54 +432,59 @@ def add_worker_roles_to_shared_config(
def merge_worker_template_configs(
existing_dict: Optional[Dict[str, Any]],
to_be_merged_dict: Dict[str, Any],
) -> Dict[str, Any]:
existing_template: WorkerTemplate,
to_be_merged_template: WorkerTemplate,
) -> WorkerTemplate:
"""When given an existing dict of worker template configuration consisting with both
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
return new dict.
Args:
existing_dict: Either an existing worker template or a fresh blank one.
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
existing_template: Either an existing worker template or a fresh blank one.
to_be_merged_template: The template from WORKERS_CONFIGS to be merged into
existing_dict.
Returns: The newly merged together dict values.
"""
new_dict: Dict[str, Any] = {}
if not existing_dict:
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
new_dict = to_be_merged_dict.copy()
else:
for i in to_be_merged_dict.keys():
if (i == "endpoint_patterns") or (i == "listener_resources"):
# merge the two lists, remove duplicates
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
elif i == "shared_extra_conf":
# merge dictionary's, the worker name will be replaced later
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
elif i == "worker_extra_conf":
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
else:
# Everything else should be identical, like "app", which only works
# because all apps are now generic_workers.
new_dict[i] = to_be_merged_dict[i]
return new_dict
# copy existing_template without any replacements
new_template: WorkerTemplate = dataclasses.replace(existing_template)
# merge the two lists, remove duplicates
new_template.listener_resources = list(
set(new_template.listener_resources + to_be_merged_template.listener_resources)
)
# merge the two lists, remove duplicates
new_template.endpoint_patterns = list(
set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns)
)
# merge dictionaries; the worker name will be replaced later
new_template.shared_extra_conf = {
**new_template.shared_extra_conf,
**to_be_merged_template.shared_extra_conf,
}
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_template.worker_extra_conf = (
new_template.worker_extra_conf + to_be_merged_template.worker_extra_conf
)
return new_template
def insert_worker_name_for_worker_config(
existing_dict: Dict[str, Any], worker_name: str
existing_template: WorkerTemplate, worker_name: str
) -> Dict[str, Any]:
"""Insert a given worker name into the worker's configuration dict.
Args:
existing_dict: The worker_config dict that is imported into shared_config.
existing_template: The WorkerTemplate that is imported into shared_config.
worker_name: The name of the worker to insert.
Returns: Copy of the dict with newly inserted worker name
"""
dict_to_edit = existing_dict.copy()
dict_to_edit = dataclasses.asdict(existing_template)
for k, v in dict_to_edit["shared_extra_conf"].items():
# Only proceed if it's the placeholder name string
if v == WORKER_PLACEHOLDER_NAME:
@ -793,27 +805,27 @@ def generate_worker_files(
# Map locations to upstreams (corresponding to worker types) in Nginx
# but only if we use the appropriate worker type
for worker_type in all_worker_types_in_use:
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns:
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
# For each worker type specified by the user, create config values and write it's
# yaml config file
for worker_name, worker_types_set in requested_worker_types.items():
# The collected and processed data will live here.
worker_config: Dict[str, Any] = {}
worker_template: WorkerTemplate = WorkerTemplate()
# Merge all worker config templates for this worker into a single config
for worker_type in worker_types_set:
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
# Merge worker type template configuration data. It's a combination of lists
# and dicts, so use this helper.
worker_config = merge_worker_template_configs(
worker_config, copy_of_template_config
worker_template = merge_worker_template_configs(
worker_template, WORKERS_CONFIG[worker_type]
)
# Replace placeholder names in the config template with the actual worker name.
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
worker_config: Dict[str, Any] = insert_worker_name_for_worker_config(
worker_template, worker_name
)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}