Add `instance_map` config and route replication calls (#7495)
parent
dede23ff1e
commit
1de36407d1
|
@ -0,0 +1 @@
|
||||||
|
Add `instance_map` config and route replication calls.
|
|
@ -13,9 +13,20 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
from ._base import Config
|
from ._base import Config
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class InstanceLocationConfig:
|
||||||
|
"""The host and port to talk to an instance via HTTP replication.
|
||||||
|
"""
|
||||||
|
|
||||||
|
host = attr.ib(type=str)
|
||||||
|
port = attr.ib(type=int)
|
||||||
|
|
||||||
|
|
||||||
class WorkerConfig(Config):
|
class WorkerConfig(Config):
|
||||||
"""The workers are processes run separately to the main synapse process.
|
"""The workers are processes run separately to the main synapse process.
|
||||||
They have their own pid_file and listener configuration. They use the
|
They have their own pid_file and listener configuration. They use the
|
||||||
|
@ -71,6 +82,12 @@ class WorkerConfig(Config):
|
||||||
elif not bind_addresses:
|
elif not bind_addresses:
|
||||||
bind_addresses.append("")
|
bind_addresses.append("")
|
||||||
|
|
||||||
|
# A map from instance name to host/port of their HTTP replication endpoint.
|
||||||
|
instance_map = config.get("instance_map", {}) or {}
|
||||||
|
self.instance_map = {
|
||||||
|
name: InstanceLocationConfig(**c) for name, c in instance_map.items()
|
||||||
|
}
|
||||||
|
|
||||||
def read_arguments(self, args):
|
def read_arguments(self, args):
|
||||||
# We support a bunch of command line arguments that override options in
|
# We support a bunch of command line arguments that override options in
|
||||||
# the config. A lot of these options have a worker_* prefix when running
|
# the config. A lot of these options have a worker_* prefix when running
|
||||||
|
|
|
@ -141,17 +141,26 @@ class ReplicationEndpoint(object):
|
||||||
Returns a callable that accepts the same parameters as `_serialize_payload`.
|
Returns a callable that accepts the same parameters as `_serialize_payload`.
|
||||||
"""
|
"""
|
||||||
clock = hs.get_clock()
|
clock = hs.get_clock()
|
||||||
host = hs.config.worker_replication_host
|
|
||||||
port = hs.config.worker_replication_http_port
|
|
||||||
|
|
||||||
client = hs.get_simple_http_client()
|
client = hs.get_simple_http_client()
|
||||||
|
|
||||||
|
master_host = hs.config.worker_replication_host
|
||||||
|
master_port = hs.config.worker_replication_http_port
|
||||||
|
|
||||||
|
instance_map = hs.config.worker.instance_map
|
||||||
|
|
||||||
@trace(opname="outgoing_replication_request")
|
@trace(opname="outgoing_replication_request")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_request(instance_name="master", **kwargs):
|
def send_request(instance_name="master", **kwargs):
|
||||||
# Currently we only support sending requests to master process.
|
if instance_name == "master":
|
||||||
if instance_name != "master":
|
host = master_host
|
||||||
raise Exception("Unknown instance")
|
port = master_port
|
||||||
|
elif instance_name in instance_map:
|
||||||
|
host = instance_map[instance_name].host
|
||||||
|
port = instance_map[instance_name].port
|
||||||
|
else:
|
||||||
|
raise Exception(
|
||||||
|
"Instance %r not in 'instance_map' config" % (instance_name,)
|
||||||
|
)
|
||||||
|
|
||||||
data = yield cls._serialize_payload(**kwargs)
|
data = yield cls._serialize_payload(**kwargs)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue