Merge pull request #751 from matrix-org/markjh/pusher_metrics_manhole
Add a metrics listener and a ssh listener to the pusherpull/752/head
commit
9843f2a657
|
@ -17,19 +17,25 @@
|
||||||
import synapse
|
import synapse
|
||||||
|
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.util.versionstring import get_version_string
|
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.database import DatabaseConfig
|
from synapse.config.database import DatabaseConfig
|
||||||
from synapse.config.logger import LoggingConfig
|
from synapse.config.logger import LoggingConfig
|
||||||
|
from synapse.http.site import SynapseSite
|
||||||
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.util.async import sleep
|
from synapse.util.async import sleep
|
||||||
from synapse.util.logcontext import (LoggingContext, preserve_fn)
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
|
from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||||
|
from synapse.util.manhole import manhole
|
||||||
|
from synapse.util.rlimit import change_resource_limit
|
||||||
|
from synapse.util.versionstring import get_version_string
|
||||||
|
|
||||||
from twisted.internet import reactor, defer
|
from twisted.internet import reactor, defer
|
||||||
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
|
@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig):
|
||||||
)
|
)
|
||||||
self.user_agent_suffix = None
|
self.user_agent_suffix = None
|
||||||
self.start_pushers = True
|
self.start_pushers = True
|
||||||
|
self.listeners = config["listeners"]
|
||||||
|
self.soft_file_limit = config.get("soft_file_limit")
|
||||||
|
|
||||||
def default_config(self, **kwargs):
|
def default_config(self, **kwargs):
|
||||||
return """\
|
return """\
|
||||||
## Slave ##
|
## Slave ##
|
||||||
|
# The replication listener on the synapse to talk to.
|
||||||
#replication_url: https://localhost:{replication_port}/_synapse/replication
|
#replication_url: https://localhost:{replication_port}/_synapse/replication
|
||||||
|
|
||||||
|
listeners: []
|
||||||
|
# Enable a ssh manhole listener on the pusher.
|
||||||
|
# - type: manhole
|
||||||
|
# port: {manhole_port}
|
||||||
|
# bind_address: 127.0.0.1
|
||||||
|
# Enable a metric listener on the pusher.
|
||||||
|
# - type: http
|
||||||
|
# port: {metrics_port}
|
||||||
|
# bind_address: 127.0.0.1
|
||||||
|
# resources:
|
||||||
|
# - names: ["metrics"],
|
||||||
|
# compress: False
|
||||||
|
|
||||||
report_stats: False
|
report_stats: False
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -100,6 +122,46 @@ class PusherServer(HomeServer):
|
||||||
}]
|
}]
|
||||||
})
|
})
|
||||||
|
|
||||||
|
def _listen_http(self, listener_config):
|
||||||
|
port = listener_config["port"]
|
||||||
|
bind_address = listener_config.get("bind_address", "")
|
||||||
|
site_tag = listener_config.get("tag", port)
|
||||||
|
resources = {}
|
||||||
|
for res in listener_config["resources"]:
|
||||||
|
for name in res["names"]:
|
||||||
|
if name == "metrics":
|
||||||
|
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||||
|
|
||||||
|
root_resource = create_resource_tree(resources, Resource())
|
||||||
|
reactor.listenTCP(
|
||||||
|
port,
|
||||||
|
SynapseSite(
|
||||||
|
"synapse.access.http.%s" % (site_tag,),
|
||||||
|
site_tag,
|
||||||
|
listener_config,
|
||||||
|
root_resource,
|
||||||
|
),
|
||||||
|
interface=bind_address
|
||||||
|
)
|
||||||
|
logger.info("Synapse pusher now listening on port %d", port)
|
||||||
|
|
||||||
|
def start_listening(self):
|
||||||
|
for listener in self.config.listeners:
|
||||||
|
if listener["type"] == "http":
|
||||||
|
self._listen_http(listener)
|
||||||
|
elif listener["type"] == "manhole":
|
||||||
|
reactor.listenTCP(
|
||||||
|
listener["port"],
|
||||||
|
manhole(
|
||||||
|
username="matrix",
|
||||||
|
password="rabbithole",
|
||||||
|
globals={"hs": self},
|
||||||
|
),
|
||||||
|
interface=listener.get("bind_address", '127.0.0.1')
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def replicate(self):
|
def replicate(self):
|
||||||
http_client = self.get_simple_http_client()
|
http_client = self.get_simple_http_client()
|
||||||
|
@ -191,6 +253,9 @@ def setup(config_options):
|
||||||
)
|
)
|
||||||
|
|
||||||
ps.setup()
|
ps.setup()
|
||||||
|
ps.start_listening()
|
||||||
|
|
||||||
|
change_resource_limit(ps.config.soft_file_limit)
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ps.replicate()
|
ps.replicate()
|
||||||
|
|
|
@ -16,6 +16,26 @@ from twisted.conch.manhole import ColoredManhole
|
||||||
from twisted.conch.insults import insults
|
from twisted.conch.insults import insults
|
||||||
from twisted.conch import manhole_ssh
|
from twisted.conch import manhole_ssh
|
||||||
from twisted.cred import checkers, portal
|
from twisted.cred import checkers, portal
|
||||||
|
from twisted.conch.ssh.keys import Key
|
||||||
|
|
||||||
|
PUBLIC_KEY = (
|
||||||
|
"ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az"
|
||||||
|
"64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS"
|
||||||
|
"kbh/C+BR3utDS555mV"
|
||||||
|
)
|
||||||
|
|
||||||
|
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
|
||||||
|
MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
|
||||||
|
4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
|
||||||
|
vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
|
||||||
|
Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
|
||||||
|
xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
|
||||||
|
PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
|
||||||
|
gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
|
||||||
|
DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
|
||||||
|
pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
|
||||||
|
EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
|
||||||
|
-----END RSA PRIVATE KEY-----"""
|
||||||
|
|
||||||
|
|
||||||
def manhole(username, password, globals):
|
def manhole(username, password, globals):
|
||||||
|
@ -43,4 +63,8 @@ def manhole(username, password, globals):
|
||||||
dict(globals, __name__="__console__")
|
dict(globals, __name__="__console__")
|
||||||
)
|
)
|
||||||
|
|
||||||
return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
|
factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
|
||||||
|
factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY)
|
||||||
|
factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY)
|
||||||
|
|
||||||
|
return factory
|
||||||
|
|
Loading…
Reference in New Issue