Merge branch 'erikj/reactor_metrics' into erikj/dictionary_cache

pull/219/head
Erik Johnston 2015-08-13 11:39:38 +01:00
commit c044aca1fd
5 changed files with 60 additions and 56 deletions

View File

@ -23,7 +23,6 @@ for port in 8080 8081 8082; do
#rm $DIR/etc/$port.config #rm $DIR/etc/$port.config
python -m synapse.app.homeserver \ python -m synapse.app.homeserver \
--generate-config \ --generate-config \
--enable_registration \
-H "localhost:$https_port" \ -H "localhost:$https_port" \
--config-path "$DIR/etc/$port.config" \ --config-path "$DIR/etc/$port.config" \
@ -36,6 +35,8 @@ for port in 8080 8081 8082; do
fi fi
fi fi
perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
python -m synapse.app.homeserver \ python -m synapse.app.homeserver \
--config-path "$DIR/etc/$port.config" \ --config-path "$DIR/etc/$port.config" \
-D \ -D \

View File

@ -48,7 +48,7 @@ setup(
description="Reference Synapse Home Server", description="Reference Synapse Home Server",
install_requires=dependencies['requirements'](include_conditional=True).keys(), install_requires=dependencies['requirements'](include_conditional=True).keys(),
setup_requires=[ setup_requires=[
"Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0 "Twisted>=15.1.0", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
"setuptools_trial", "setuptools_trial",
"mock" "mock"
], ],

View File

@ -16,7 +16,7 @@
from twisted.internet import defer, reactor, protocol from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone from twisted.web._newclient import ResponseDone
@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter(
) )
class MatrixFederationHttpAgent(_AgentBase): class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.tls_context_factory = hs.tls_context_factory
def __init__(self, reactor, pool=None): def endpointForURI(self, uri):
_AgentBase.__init__(self, reactor, pool) destination = uri.netloc
def request(self, destination, endpoint, method, path, params, query, return matrix_federation_endpoint(
headers, body_producer): reactor, destination, timeout=10,
ssl_context_factory=self.tls_context_factory
outgoing_requests_counter.inc(method) )
host = b""
port = 0
fragment = b""
parsed_URI = _URI(b"http", destination, host, port, path, params,
query, fragment)
# Set the connection pool key to be the destination.
key = destination
d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
return failure
d.addCallbacks(_cb, _eb)
return d
class MatrixFederationHttpClient(object): class MatrixFederationHttpClient(object):
@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname self.server_name = hs.hostname
pool = HTTPConnectionPool(reactor) pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10 pool.maxPersistentPerHost = 10
self.agent = MatrixFederationHttpAgent(reactor, pool=pool) self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.version_string = hs.version_string self.version_string = hs.version_string
self._next_id = 1 self._next_id = 1
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
return urlparse.urlunparse(
("matrix", destination, path_bytes, param_bytes, query_bytes, "")
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"", body_callback, headers_dict={}, param_bytes=b"",
@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"User-Agent"] = [self.version_string]
headers_dict[b"Host"] = [destination] headers_dict[b"Host"] = [destination]
url_bytes = urlparse.urlunparse( url_bytes = self._create_url(
("", "", path_bytes, param_bytes, query_bytes, "",) destination, path_bytes, param_bytes, query_bytes
) )
txn_id = "%s-O-%s" % (method, self._next_id) txn_id = "%s-O-%s" % (method, self._next_id)
@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object):
# (once we have reliable transactions in place) # (once we have reliable transactions in place)
retries_left = 5 retries_left = 5
endpoint = preserve_context_over_fn( http_url_bytes = urlparse.urlunparse(
self._getEndpoint, reactor, destination ("", "", path_bytes, param_bytes, query_bytes, "")
) )
log_result = None log_result = None
@ -148,21 +130,19 @@ class MatrixFederationHttpClient(object):
while True: while True:
producer = None producer = None
if body_callback: if body_callback:
producer = body_callback(method, url_bytes, headers_dict) producer = body_callback(method, http_url_bytes, headers_dict)
try: try:
def send_request(): def send_request():
request_deferred = self.agent.request( request_deferred = preserve_context_over_fn(
destination, self.agent.request,
endpoint,
method, method,
path_bytes, url_bytes,
param_bytes,
query_bytes,
Headers(headers_dict), Headers(headers_dict),
producer producer
) )
return self.clock.time_bound_deferred( return self.clock.time_bound_deferred(
request_deferred, request_deferred,
time_out=timeout/1000. if timeout else 60, time_out=timeout/1000. if timeout else 60,
@ -452,12 +432,6 @@ class MatrixFederationHttpClient(object):
defer.returnValue((length, headers)) defer.returnValue((length, headers))
def _getEndpoint(self, reactor, destination):
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.hs.tls_context_factory
)
class _ReadBodyToFileProtocol(protocol.Protocol): class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size): def __init__(self, stream, deferred, max_size):

View File

@ -18,8 +18,12 @@ from __future__ import absolute_import
import logging import logging
from resource import getrusage, getpagesize, RUSAGE_SELF from resource import getrusage, getpagesize, RUSAGE_SELF
import functools
import os import os
import stat import stat
import time
from twisted.internet import reactor
from .metric import ( from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@ -144,3 +148,28 @@ def _process_fds():
return counts return counts
get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
reactor_metrics = get_metrics_for("reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
def runUntilCurrentTimer(func):
@functools.wraps(func)
def f(*args, **kwargs):
start = time.time() * 1000
pending_calls = len(reactor.getDelayedCalls())
ret = func(*args, **kwargs)
end = time.time() * 1000
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(pending_calls)
return ret
return f
if hasattr(reactor, "runUntilCurrent"):
# runUntilCurrent is called when we have pending calls. It is called once
# per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)

View File

@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
REQUIREMENTS = { REQUIREMENTS = {
"syutil>=0.0.7": ["syutil>=0.0.7"], "syutil>=0.0.7": ["syutil>=0.0.7"],
"Twisted==14.0.2": ["twisted==14.0.2"], "Twisted>=15.1.0": ["twisted>=15.1.0"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"], "service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"], "pyyaml": ["yaml"],