From 1de36407d14d7517af8462b2608dff0ea657262e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2020 14:00:58 +0100 Subject: [PATCH] Add `instance_map` config and route replication calls (#7495) --- changelog.d/7495.feature | 1 + synapse/config/workers.py | 17 +++++++++++++++++ synapse/replication/http/_base.py | 21 +++++++++++++++------ 3 files changed, 33 insertions(+), 6 deletions(-) create mode 100644 changelog.d/7495.feature diff --git a/changelog.d/7495.feature b/changelog.d/7495.feature new file mode 100644 index 0000000000..1150e714bd --- /dev/null +++ b/changelog.d/7495.feature @@ -0,0 +1 @@ +Add `instance_map` config and route replication calls. diff --git a/synapse/config/workers.py b/synapse/config/workers.py index fef72ed974..c80c338584 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,9 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import attr + from ._base import Config +@attr.s +class InstanceLocationConfig: + """The host and port to talk to an instance via HTTP replication. + """ + + host = attr.ib(type=str) + port = attr.ib(type=int) + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -71,6 +82,12 @@ class WorkerConfig(Config): elif not bind_addresses: bind_addresses.append("") + # A map from instance name to host/port of their HTTP replication endpoint. + instance_map = config.get("instance_map", {}) or {} + self.instance_map = { + name: InstanceLocationConfig(**c) for name, c in instance_map.items() + } + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index f88c80ae84..c3136a4eb9 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -141,17 +141,26 @@ class ReplicationEndpoint(object): Returns a callable that accepts the same parameters as `_serialize_payload`. """ clock = hs.get_clock() - host = hs.config.worker_replication_host - port = hs.config.worker_replication_http_port - client = hs.get_simple_http_client() + master_host = hs.config.worker_replication_host + master_port = hs.config.worker_replication_http_port + + instance_map = hs.config.worker.instance_map + @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): - # Currently we only support sending requests to master process. - if instance_name != "master": - raise Exception("Unknown instance") + if instance_name == "master": + host = master_host + port = master_port + elif instance_name in instance_map: + host = instance_map[instance_name].host + port = instance_map[instance_name].port + else: + raise Exception( + "Instance %r not in 'instance_map' config" % (instance_name,) + ) data = yield cls._serialize_payload(**kwargs)