From 99ab65af2f6976bb9ea15f637bb04b6f6dd7690c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 24 Oct 2019 20:59:06 +0300 Subject: [PATCH 01/11] fix up logging to use rapidjson --- synapse/logging/_structured.py | 9 +++- synapse/logging/_terse_json.py | 77 ++++++++++++++++++++++++---------- synapse/python_dependencies.py | 1 + tests/server.py | 2 + tests/unittest.py | 16 ++++--- tox.ini | 9 ++++ 6 files changed, 87 insertions(+), 27 deletions(-) diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 3220e985a9..c6a01577a6 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -261,6 +261,13 @@ def parse_drain_configs( ) +class StoppableLogPublisher(LogPublisher): + def stop(self): + for obs in self._observers: + if hasattr(obs, "stop"): + obs.stop() + + def setup_structured_logging( hs, config, @@ -336,7 +343,7 @@ def setup_structured_logging( # We should never get here, but, just in case, throw an error. raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - publisher = LogPublisher(*observers) + publisher = StoppableLogPublisher(*observers) log_filter = LogLevelFilterPredicate() for namespace, namespace_config in log_config.get( diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 0ebbde06f2..f3b27d5bfd 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -24,7 +24,7 @@ from math import floor from typing import IO import attr -from simplejson import dumps +from rapidjson import dumps from zope.interface import implementer from twisted.application.internet import ClientService @@ -33,9 +33,9 @@ from twisted.internet.endpoints import ( TCP4ClientEndpoint, TCP6ClientEndpoint, ) +from twisted.internet.interfaces import IPushProducer from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger -from twisted.python.failure import Failure def flatten_event(event: dict, metadata: dict, include_time: bool = False): @@ -141,11 +141,40 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" + return dumps(flattened, ensure_ascii=False) + "\n" return FileLogObserver(outFile, formatEvent) +@attr.s +@implementer(IPushProducer) +class LogProducer(object): + + _buffer = attr.ib() + _transport = attr.ib() + paused = attr.ib(default=False) + + def pauseProducing(self): + self.paused = True + + def stopProducing(self): + self.paused = True + self._buffer = None + + def resumeProducing(self): + self.paused = False + + while self.paused is False and (self._buffer and self._transport.connected): + try: + event = self._buffer.popleft() + self._transport.write(dumps(event, ensure_ascii=False).encode("utf8")) + self._transport.write(b"\n") + except Exception: + import traceback + + traceback.print_exc(file=sys.__stderr__) + + @attr.s @implementer(ILogObserver) class TerseJSONToTCPLogObserver(object): @@ -167,6 +196,7 @@ class TerseJSONToTCPLogObserver(object): _buffer = attr.ib(default=attr.Factory(deque), type=deque) _writer = attr.ib(default=None) _logger = attr.ib(default=attr.Factory(Logger)) + _producer = attr.ib(default=None) def start(self) -> None: @@ -188,6 +218,9 @@ class TerseJSONToTCPLogObserver(object): self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service.startService() + def stop(self): + self._service.stopService() + def _write_loop(self) -> None: """ Implement the write loop. @@ -195,27 +228,29 @@ class TerseJSONToTCPLogObserver(object): if self._writer: return + if self._producer and self._producer._transport.connected: + self._producer.resumeProducing() + return + self._writer = self._service.whenConnected() - @self._writer.addBoth - def writer(r): - if isinstance(r, Failure): - r.printTraceback(file=sys.__stderr__) - self._writer = None - self.hs.get_reactor().callLater(1, self._write_loop) - return + @self._writer.addErrback + def fail(r): + r.printTraceback(file=sys.__stderr__) + self._writer = None + self.hs.get_reactor().callLater(1, self._write_loop) + return - try: - for event in self._buffer: - r.transport.write( - dumps(event, ensure_ascii=False, separators=(",", ":")).encode( - "utf8" - ) - ) - r.transport.write(b"\n") - self._buffer.clear() - except Exception as e: - sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) + @self._writer.addCallback + def writer(r): + def connectionLost(_self, reason): + self._producer.pauseProducing() + self._producer = None + self.hs.get_reactor().callLater(1, self._write_loop) + + self._producer = LogProducer(self._buffer, r.transport) + r.transport.registerProducer(self._producer, True) + self._producer.resumeProducing() self._writer = False self.hs.get_reactor().callLater(1, self._write_loop) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index aa7da1c543..3c0040482c 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -74,6 +74,7 @@ REQUIREMENTS = [ "Jinja2>=2.9", "bleach>=1.4.3", "typing-extensions>=3.7.4", + "python-rapidjson>=0.4" ] CONDITIONAL_REQUIREMENTS = { diff --git a/tests/server.py b/tests/server.py index e397ebe8fa..bd647906ba 100644 --- a/tests/server.py +++ b/tests/server.py @@ -375,6 +375,7 @@ class FakeTransport(object): disconnecting = False disconnected = False + connected = True buffer = attr.ib(default=b"") producer = attr.ib(default=None) autoflush = attr.ib(default=True) @@ -392,6 +393,7 @@ class FakeTransport(object): if self._protocol: self._protocol.connectionLost(reason) self.disconnected = True + self.connected = False def abortConnection(self): logger.info("FakeTransport: abortConnection()") diff --git a/tests/unittest.py b/tests/unittest.py index 561cebc223..dc251b1702 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -178,11 +178,14 @@ class HomeserverTestCase(TestCase): needs_threadpool = False def __init__(self, methodName, *args, **kwargs): - super().__init__(methodName, *args, **kwargs) + if methodName: + super().__init__(methodName, *args, **kwargs) - # see if we have any additional config for this test - method = getattr(self, methodName) - self._extra_config = getattr(method, "_extra_config", None) + # see if we have any additional config for this test + method = getattr(self, methodName) + self._extra_config = getattr(method, "_extra_config", None) + else: + self._extra_config = None def setUp(self): """ @@ -190,7 +193,7 @@ class HomeserverTestCase(TestCase): hijacking the authentication system to return a fixed user, and then calling the prepare function. """ - self.reactor, self.clock = get_clock() + self.reactor, self.clock = self.get_clock() self._hs_args = {"clock": self.clock, "reactor": self.reactor} self.hs = self.make_homeserver(self.reactor, self.clock) @@ -240,6 +243,9 @@ class HomeserverTestCase(TestCase): if hasattr(self, "prepare"): self.prepare(self.reactor, self.clock, self.hs) + def get_clock(self): + return get_clock() + def wait_on_thread(self, deferred, timeout=10): """ Wait until a Deferred is done, where it's waiting on a real thread. diff --git a/tox.ini b/tox.ini index 3cd2c5e633..4249f9df5a 100644 --- a/tox.ini +++ b/tox.ini @@ -102,6 +102,15 @@ commands = {envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} +[testenv:benchmark] +deps = + {[base]deps} + pyperf +setenv = + SYNAPSE_POSTGRES = 1 +commands = + python -m synapse.benchmarks {posargs:} + [testenv:packaging] skip_install=True deps = From d684ec8a2bbb682fde86d3b17c4897d40ac71b58 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 24 Oct 2019 22:09:43 +0300 Subject: [PATCH 02/11] benchmarks --- synapse/benchmarks/__init__.py | 63 +++++++++++ synapse/benchmarks/__main__.py | 33 ++++++ synapse/benchmarks/suites/__init__.py | 1 + synapse/benchmarks/suites/logging.py | 147 ++++++++++++++++++++++++++ synapse/logging/_terse_json.py | 8 +- synapse/python_dependencies.py | 1 - 6 files changed, 249 insertions(+), 4 deletions(-) create mode 100644 synapse/benchmarks/__init__.py create mode 100644 synapse/benchmarks/__main__.py create mode 100644 synapse/benchmarks/suites/__init__.py create mode 100644 synapse/benchmarks/suites/logging.py diff --git a/synapse/benchmarks/__init__.py b/synapse/benchmarks/__init__.py new file mode 100644 index 0000000000..d0450c4d9d --- /dev/null +++ b/synapse/benchmarks/__init__.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet.defer import Deferred + +from synapse.config.homeserver import HomeServerConfig +from synapse.util import Clock + +from tests.utils import default_config, setup_test_homeserver, setupdb + +DB_SETUP = False + + +def setup_database(): + global DB_SETUP + if not DB_SETUP: + setupdb() + DB_SETUP = True + + +async def make_homeserver(reactor, config=None): + def wait(time): + d = Deferred() + reactor.callLater(time, d.callback, True) + return d + + cleanup_tasks = [] + + clock = Clock(reactor) + + if not config: + config = default_config("test") + + config_obj = HomeServerConfig() + config_obj.parse_config_dict(config, "", "") + + hs = await setup_test_homeserver( + cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock + ) + stor = hs.get_datastore() + + # Run the database background updates. + if hasattr(stor, "do_next_background_update"): + while not await stor.has_completed_background_updates(): + await stor.do_next_background_update(1) + + def cleanup(): + for i in cleanup_tasks: + i() + + return hs, wait, cleanup diff --git a/synapse/benchmarks/__main__.py b/synapse/benchmarks/__main__.py new file mode 100644 index 0000000000..d67a1cf43a --- /dev/null +++ b/synapse/benchmarks/__main__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyperf + +from twisted.python import reflect + +from synapse.benchmarks import setupdb +from synapse.benchmarks.suites import SUITES + +if __name__ == "__main__": + + runner = pyperf.Runner(processes=5, values=1, warmups=0) + runner.parse_args() + runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] + + for suite, loops in SUITES: + + func = reflect.namedAny("synapse.benchmarks.suites.%s.main" % (suite.lower(),)) + runner.args.loops = loops + runner.bench_time_func(suite + "_" + str(loops), func) diff --git a/synapse/benchmarks/suites/__init__.py b/synapse/benchmarks/suites/__init__.py new file mode 100644 index 0000000000..641e3299c2 --- /dev/null +++ b/synapse/benchmarks/suites/__init__.py @@ -0,0 +1 @@ +SUITES = [("LOGGING", 1000), ("LOGGING", 10000)] diff --git a/synapse/benchmarks/suites/logging.py b/synapse/benchmarks/suites/logging.py new file mode 100644 index 0000000000..37e7229b6c --- /dev/null +++ b/synapse/benchmarks/suites/logging.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import warnings +from contextlib import redirect_stderr +from io import StringIO + +from mock import Mock + +from pyperf import perf_counter + +from twisted.internet.defer import ensureDeferred +from twisted.internet.protocol import ServerFactory +from twisted.logger import ( + LogBeginner, + Logger, + LogPublisher, + globalLogBeginner, + textFileLogObserver, +) +from twisted.protocols.basic import LineOnlyReceiver +from twisted.python.failure import Failure + +from synapse.benchmarks import make_homeserver, setup_database +from synapse.logging._structured import setup_structured_logging + + +class LineCounter(LineOnlyReceiver): + + delimiter = b"\n" + + def __init__(self, *args, **kwargs): + self.count = 0 + super().__init__(*args, **kwargs) + + def lineReceived(self, line): + self.count += 1 + + +async def _main(reactor, loops): + + servers = [] + + def protocol(): + p = LineCounter() + servers.append(p) + return p + + logger_factory = ServerFactory.forProtocol(protocol) + port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") + + hs, wait, cleanup = await make_homeserver(reactor) + + errors = StringIO() + publisher = LogPublisher() + mock_sys = Mock() + beginner = LogBeginner( + publisher, errors, mock_sys, warnings, initialBufferSize=loops + ) + + log_config = { + "loggers": {"synapse": {"level": "DEBUG"}}, + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": port.getHost().port, + "maximum_buffer": 100, + } + }, + } + + logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) + + start = perf_counter() + + logging_system = setup_structured_logging( + hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False + ) + + # Wait for it to connect... + await logging_system._observers[0]._service.whenConnected() + + # Send a bunch of useful messages + for i in range(0, loops): + logger.info("test message %s" % (i,)) + + if ( + len(logging_system._observers[0]._buffer) + == logging_system._observers[0].maximum_buffer + ): + while ( + len(logging_system._observers[0]._buffer) + > logging_system._observers[0].maximum_buffer / 2 + ): + await wait(0.01) + + while servers[0].count != loops: + await wait(0.01) + + end = perf_counter() - start + + logging_system.stop() + port.stopListening() + cleanup() + + return end + + +def main(loops): + + setup_database() + + if globalLogBeginner._temporaryObserver: + globalLogBeginner.beginLoggingTo([lambda event: None]) + + file_out = StringIO() + with redirect_stderr(file_out): + + from twisted.internet import epollreactor + + reactor = epollreactor.EPollReactor() + d = ensureDeferred(_main(reactor, loops)) + + def on_done(_): + if isinstance(_, Failure): + _.printTraceback() + reactor.stop() + return _ + + d.addBoth(on_done) + reactor.run() + + return d.result diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index f3b27d5bfd..b8e8ea5003 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -17,6 +17,7 @@ Log formatters that output terse JSON. """ +import json import sys from collections import deque from ipaddress import IPv4Address, IPv6Address, ip_address @@ -24,7 +25,6 @@ from math import floor from typing import IO import attr -from rapidjson import dumps from zope.interface import implementer from twisted.application.internet import ClientService @@ -37,6 +37,8 @@ from twisted.internet.interfaces import IPushProducer from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger +_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) + def flatten_event(event: dict, metadata: dict, include_time: bool = False): """ @@ -141,7 +143,7 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False) + "\n" + return _encoder.encode(flattened) + "\n" return FileLogObserver(outFile, formatEvent) @@ -167,7 +169,7 @@ class LogProducer(object): while self.paused is False and (self._buffer and self._transport.connected): try: event = self._buffer.popleft() - self._transport.write(dumps(event, ensure_ascii=False).encode("utf8")) + self._transport.write(_encoder.encode(event).encode("utf8")) self._transport.write(b"\n") except Exception: import traceback diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 3c0040482c..aa7da1c543 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -74,7 +74,6 @@ REQUIREMENTS = [ "Jinja2>=2.9", "bleach>=1.4.3", "typing-extensions>=3.7.4", - "python-rapidjson>=0.4" ] CONDITIONAL_REQUIREMENTS = { From 135fdaae0d0ff62d295a9a3fbe2f89e19890e269 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Thu, 31 Oct 2019 21:01:37 +1100 Subject: [PATCH 03/11] update linting --- changelog.d/6266.misc | 1 + synapse/benchmarks/__main__.py | 1 - synapse/benchmarks/suites/logging.py | 2 -- 3 files changed, 1 insertion(+), 3 deletions(-) create mode 100644 changelog.d/6266.misc diff --git a/changelog.d/6266.misc b/changelog.d/6266.misc new file mode 100644 index 0000000000..634e421a79 --- /dev/null +++ b/changelog.d/6266.misc @@ -0,0 +1 @@ +Add benchmarks for structured logging and improve output performance. diff --git a/synapse/benchmarks/__main__.py b/synapse/benchmarks/__main__.py index d67a1cf43a..0d490791fc 100644 --- a/synapse/benchmarks/__main__.py +++ b/synapse/benchmarks/__main__.py @@ -17,7 +17,6 @@ import pyperf from twisted.python import reflect -from synapse.benchmarks import setupdb from synapse.benchmarks.suites import SUITES if __name__ == "__main__": diff --git a/synapse/benchmarks/suites/logging.py b/synapse/benchmarks/suites/logging.py index 37e7229b6c..8756682106 100644 --- a/synapse/benchmarks/suites/logging.py +++ b/synapse/benchmarks/suites/logging.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys import warnings from contextlib import redirect_stderr from io import StringIO @@ -29,7 +28,6 @@ from twisted.logger import ( Logger, LogPublisher, globalLogBeginner, - textFileLogObserver, ) from twisted.protocols.basic import LineOnlyReceiver from twisted.python.failure import Failure From 73cfdebfec0ed341048e2963497e35801009f8c7 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Thu, 31 Oct 2019 21:15:07 +1100 Subject: [PATCH 04/11] fix --- synapse/benchmarks/suites/logging.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/benchmarks/suites/logging.py b/synapse/benchmarks/suites/logging.py index 8756682106..6dd63c69a2 100644 --- a/synapse/benchmarks/suites/logging.py +++ b/synapse/benchmarks/suites/logging.py @@ -23,12 +23,7 @@ from pyperf import perf_counter from twisted.internet.defer import ensureDeferred from twisted.internet.protocol import ServerFactory -from twisted.logger import ( - LogBeginner, - Logger, - LogPublisher, - globalLogBeginner, -) +from twisted.logger import LogBeginner, Logger, LogPublisher, globalLogBeginner from twisted.protocols.basic import LineOnlyReceiver from twisted.python.failure import Failure From c580eb32ba1be71abf0ade53fc11bf5044908f29 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 5 Nov 2019 00:51:51 +1100 Subject: [PATCH 05/11] revert this --- tests/unittest.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index dc251b1702..561cebc223 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -178,14 +178,11 @@ class HomeserverTestCase(TestCase): needs_threadpool = False def __init__(self, methodName, *args, **kwargs): - if methodName: - super().__init__(methodName, *args, **kwargs) + super().__init__(methodName, *args, **kwargs) - # see if we have any additional config for this test - method = getattr(self, methodName) - self._extra_config = getattr(method, "_extra_config", None) - else: - self._extra_config = None + # see if we have any additional config for this test + method = getattr(self, methodName) + self._extra_config = getattr(method, "_extra_config", None) def setUp(self): """ @@ -193,7 +190,7 @@ class HomeserverTestCase(TestCase): hijacking the authentication system to return a fixed user, and then calling the prepare function. """ - self.reactor, self.clock = self.get_clock() + self.reactor, self.clock = get_clock() self._hs_args = {"clock": self.clock, "reactor": self.reactor} self.hs = self.make_homeserver(self.reactor, self.clock) @@ -243,9 +240,6 @@ class HomeserverTestCase(TestCase): if hasattr(self, "prepare"): self.prepare(self.reactor, self.clock, self.hs) - def get_clock(self): - return get_clock() - def wait_on_thread(self, deferred, timeout=10): """ Wait until a Deferred is done, where it's waiting on a real thread. From 92d4d1342a060e82359299fa83570f9420fac033 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Mon, 2 Dec 2019 17:42:24 +1100 Subject: [PATCH 06/11] fixes --- synapse/benchmarks/suites/__init__.py | 1 - {synapse/benchmarks => synmark}/__init__.py | 7 +------ {synapse/benchmarks => synmark}/__main__.py | 7 +++---- synmark/suites/__init__.py | 6 ++++++ .../benchmarks => synmark}/suites/logging.py | 17 ++++++++++++++--- tox.ini | 2 +- 6 files changed, 25 insertions(+), 15 deletions(-) delete mode 100644 synapse/benchmarks/suites/__init__.py rename {synapse/benchmarks => synmark}/__init__.py (92%) rename {synapse/benchmarks => synmark}/__main__.py (82%) create mode 100644 synmark/suites/__init__.py rename {synapse/benchmarks => synmark}/suites/logging.py (92%) diff --git a/synapse/benchmarks/suites/__init__.py b/synapse/benchmarks/suites/__init__.py deleted file mode 100644 index 641e3299c2..0000000000 --- a/synapse/benchmarks/suites/__init__.py +++ /dev/null @@ -1 +0,0 @@ -SUITES = [("LOGGING", 1000), ("LOGGING", 10000)] diff --git a/synapse/benchmarks/__init__.py b/synmark/__init__.py similarity index 92% rename from synapse/benchmarks/__init__.py rename to synmark/__init__.py index d0450c4d9d..64c2744ae7 100644 --- a/synapse/benchmarks/__init__.py +++ b/synmark/__init__.py @@ -31,13 +31,8 @@ def setup_database(): async def make_homeserver(reactor, config=None): - def wait(time): - d = Deferred() - reactor.callLater(time, d.callback, True) - return d cleanup_tasks = [] - clock = Clock(reactor) if not config: @@ -60,4 +55,4 @@ async def make_homeserver(reactor, config=None): for i in cleanup_tasks: i() - return hs, wait, cleanup + return hs, clock.sleep, cleanup diff --git a/synapse/benchmarks/__main__.py b/synmark/__main__.py similarity index 82% rename from synapse/benchmarks/__main__.py rename to synmark/__main__.py index 0d490791fc..e208b04a0b 100644 --- a/synapse/benchmarks/__main__.py +++ b/synmark/__main__.py @@ -17,7 +17,7 @@ import pyperf from twisted.python import reflect -from synapse.benchmarks.suites import SUITES +from synmark.suites import SUITES if __name__ == "__main__": @@ -26,7 +26,6 @@ if __name__ == "__main__": runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] for suite, loops in SUITES: - - func = reflect.namedAny("synapse.benchmarks.suites.%s.main" % (suite.lower(),)) + print(suite, loops) runner.args.loops = loops - runner.bench_time_func(suite + "_" + str(loops), func) + runner.bench_time_func(suite.__name__ + "_" + str(loops), suite.main) diff --git a/synmark/suites/__init__.py b/synmark/suites/__init__.py new file mode 100644 index 0000000000..412c15f52c --- /dev/null +++ b/synmark/suites/__init__.py @@ -0,0 +1,6 @@ +from . import logging + + +SUITES = [(logging, 1000), (logging, 10000)] + + diff --git a/synapse/benchmarks/suites/logging.py b/synmark/suites/logging.py similarity index 92% rename from synapse/benchmarks/suites/logging.py rename to synmark/suites/logging.py index 6dd63c69a2..bc967e97ab 100644 --- a/synapse/benchmarks/suites/logging.py +++ b/synmark/suites/logging.py @@ -18,16 +18,17 @@ from contextlib import redirect_stderr from io import StringIO from mock import Mock +import sys from pyperf import perf_counter from twisted.internet.defer import ensureDeferred from twisted.internet.protocol import ServerFactory -from twisted.logger import LogBeginner, Logger, LogPublisher, globalLogBeginner +from twisted.logger import LogBeginner, Logger, LogPublisher, globalLogBeginner, textFileLogObserver from twisted.protocols.basic import LineOnlyReceiver from twisted.python.failure import Failure -from synapse.benchmarks import make_homeserver, setup_database +from synmark import make_homeserver, setup_database from synapse.logging._structured import setup_structured_logging @@ -47,6 +48,9 @@ async def _main(reactor, loops): servers = [] + print("?") + + def protocol(): p = LineCounter() servers.append(p) @@ -84,12 +88,15 @@ async def _main(reactor, loops): hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False ) + print("hi") + # Wait for it to connect... await logging_system._observers[0]._service.whenConnected() # Send a bunch of useful messages for i in range(0, loops): logger.info("test message %s" % (i,)) + print(i) if ( len(logging_system._observers[0]._buffer) @@ -102,6 +109,7 @@ async def _main(reactor, loops): await wait(0.01) while servers[0].count != loops: + print(servers[0].count, loops) await wait(0.01) end = perf_counter() - start @@ -115,10 +123,13 @@ async def _main(reactor, loops): def main(loops): + print("hi?") + print(loops) + setup_database() if globalLogBeginner._temporaryObserver: - globalLogBeginner.beginLoggingTo([lambda event: None]) + globalLogBeginner.beginLoggingTo([textFileLogObserver(sys.__stderr__)]) file_out = StringIO() with redirect_stderr(file_out): diff --git a/tox.ini b/tox.ini index 7e981bbc68..903a245fb0 100644 --- a/tox.ini +++ b/tox.ini @@ -109,7 +109,7 @@ deps = setenv = SYNAPSE_POSTGRES = 1 commands = - python -m synapse.benchmarks {posargs:} + python -m synmark {posargs:} [testenv:packaging] skip_install=True From c11c8ad39f11a8205778b0bf41bd143e288528f0 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Mon, 2 Dec 2019 18:30:11 +1100 Subject: [PATCH 07/11] more fixes --- synapse/logging/_terse_json.py | 1 + synmark/__init__.py | 25 ++++++++---- synmark/__main__.py | 71 ++++++++++++++++++++++++++++++---- synmark/suites/__init__.py | 5 +-- synmark/suites/logging.py | 63 +++++++----------------------- 5 files changed, 98 insertions(+), 67 deletions(-) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 05fc64f409..03934956f4 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -256,6 +256,7 @@ class TerseJSONToTCPLogObserver(object): # transport is the same, just trigger a resumeProducing. if self._producer and r.transport is self._producer.transport: self._producer.resumeProducing() + self._connection_waiter = None return # If the producer is still producing, stop it. diff --git a/synmark/__init__.py b/synmark/__init__.py index 64c2744ae7..5cc8ed28ee 100644 --- a/synmark/__init__.py +++ b/synmark/__init__.py @@ -13,21 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet.defer import Deferred +import sys + +from twisted.internet import epollreactor +from twisted.internet.main import installReactor from synapse.config.homeserver import HomeServerConfig from synapse.util import Clock from tests.utils import default_config, setup_test_homeserver, setupdb -DB_SETUP = False - def setup_database(): - global DB_SETUP - if not DB_SETUP: - setupdb() - DB_SETUP = True + setupdb() async def make_homeserver(reactor, config=None): @@ -56,3 +54,16 @@ async def make_homeserver(reactor, config=None): i() return hs, clock.sleep, cleanup + + +def make_reactor(): + """ + Make an install a Twisted reactor. + """ + reactor = epollreactor.EPollReactor() + + if "twisted.internet.reactor" in sys.modules: + del sys.modules["twisted.internet.reactor"] + installReactor(reactor) + + return reactor diff --git a/synmark/__main__.py b/synmark/__main__.py index e208b04a0b..8a8fbdf6c7 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py @@ -13,19 +13,76 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys +from contextlib import redirect_stderr +from io import StringIO + import pyperf - -from twisted.python import reflect - +from synmark import make_reactor, setup_database from synmark.suites import SUITES +from twisted.internet.defer import ensureDeferred +from twisted.logger import globalLogBeginner, textFileLogObserver +from twisted.python.failure import Failure + + +def make_test(main): + """ + Take main, the test function, and wrap it in a reactor start and stop. + """ + + def _main(loops): + + reactor = make_reactor() + + file_out = StringIO() + with redirect_stderr(file_out): + + d = ensureDeferred(main(reactor, loops)) + + def on_done(_): + if isinstance(_, Failure): + _.printTraceback() + print(file_out.getvalue()) + reactor.stop() + return _ + + d.addBoth(on_done) + reactor.run() + + return d.result + + return _main + + if __name__ == "__main__": - runner = pyperf.Runner(processes=5, values=1, warmups=0) + def add_cmdline_args(cmd, args): + if args.log: + cmd.extend(["--log"]) + + runner = pyperf.Runner( + processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args + ) + runner.argparser.add_argument("--log", action="store_true") runner.parse_args() + + orig_loops = runner.args.loops runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] + if runner.args.worker: + if runner.args.log: + globalLogBeginner.beginLoggingTo( + [textFileLogObserver(sys.__stdout__)], redirectStandardIO=False + ) + setup_database() + for suite, loops in SUITES: - print(suite, loops) - runner.args.loops = loops - runner.bench_time_func(suite.__name__ + "_" + str(loops), suite.main) + if loops: + runner.args.loops = loops + else: + runner.args.loops = orig_loops + loops = "auto" + runner.bench_time_func( + suite.__name__ + "_" + str(loops), make_test(suite.main), + ) diff --git a/synmark/suites/__init__.py b/synmark/suites/__init__.py index 412c15f52c..cfa3b0ba38 100644 --- a/synmark/suites/__init__.py +++ b/synmark/suites/__init__.py @@ -1,6 +1,3 @@ from . import logging - -SUITES = [(logging, 1000), (logging, 10000)] - - +SUITES = [(logging, 1000), (logging, 10000), (logging, None)] diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index bc967e97ab..26b78e4569 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -14,21 +14,18 @@ # limitations under the License. import warnings -from contextlib import redirect_stderr from io import StringIO from mock import Mock -import sys from pyperf import perf_counter +from synmark import make_homeserver -from twisted.internet.defer import ensureDeferred +from twisted.internet.defer import Deferred from twisted.internet.protocol import ServerFactory -from twisted.logger import LogBeginner, Logger, LogPublisher, globalLogBeginner, textFileLogObserver +from twisted.logger import LogBeginner, Logger, LogPublisher from twisted.protocols.basic import LineOnlyReceiver -from twisted.python.failure import Failure -from synmark import make_homeserver, setup_database from synapse.logging._structured import setup_structured_logging @@ -43,20 +40,24 @@ class LineCounter(LineOnlyReceiver): def lineReceived(self, line): self.count += 1 + if self.count >= self.factory.wait_for and self.factory.on_done: + on_done = self.factory.on_done + self.factory.on_done = None + on_done.callback(True) -async def _main(reactor, loops): + +async def main(reactor, loops): servers = [] - print("?") - - def protocol(): p = LineCounter() servers.append(p) return p logger_factory = ServerFactory.forProtocol(protocol) + logger_factory.wait_for = loops + logger_factory.on_done = Deferred() port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") hs, wait, cleanup = await make_homeserver(reactor) @@ -81,22 +82,18 @@ async def _main(reactor, loops): } logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) - - start = perf_counter() - logging_system = setup_structured_logging( hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False ) - print("hi") - # Wait for it to connect... await logging_system._observers[0]._service.whenConnected() + start = perf_counter() + # Send a bunch of useful messages for i in range(0, loops): logger.info("test message %s" % (i,)) - print(i) if ( len(logging_system._observers[0]._buffer) @@ -108,9 +105,7 @@ async def _main(reactor, loops): ): await wait(0.01) - while servers[0].count != loops: - print(servers[0].count, loops) - await wait(0.01) + await logger_factory.on_done end = perf_counter() - start @@ -119,33 +114,3 @@ async def _main(reactor, loops): cleanup() return end - - -def main(loops): - - print("hi?") - print(loops) - - setup_database() - - if globalLogBeginner._temporaryObserver: - globalLogBeginner.beginLoggingTo([textFileLogObserver(sys.__stderr__)]) - - file_out = StringIO() - with redirect_stderr(file_out): - - from twisted.internet import epollreactor - - reactor = epollreactor.EPollReactor() - d = ensureDeferred(_main(reactor, loops)) - - def on_done(_): - if isinstance(_, Failure): - _.printTraceback() - reactor.stop() - return _ - - d.addBoth(on_done) - reactor.run() - - return d.result From 0e368eec58d87143be8e4b19a0faf12fd6d1e3a0 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Mon, 2 Dec 2019 19:17:19 +1100 Subject: [PATCH 08/11] more fixes --- synmark/__init__.py | 15 +++++++++------ synmark/__main__.py | 8 +++++--- synmark/suites/logging.py | 4 +++- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/synmark/__init__.py b/synmark/__init__.py index 5cc8ed28ee..570eb818d9 100644 --- a/synmark/__init__.py +++ b/synmark/__init__.py @@ -21,15 +21,17 @@ from twisted.internet.main import installReactor from synapse.config.homeserver import HomeServerConfig from synapse.util import Clock -from tests.utils import default_config, setup_test_homeserver, setupdb - - -def setup_database(): - setupdb() +from tests.utils import default_config, setup_test_homeserver async def make_homeserver(reactor, config=None): + """ + Make a Homeserver suitable for running benchmarks against. + Args: + reactor: A Twisted reactor to run under. + config: A HomeServerConfig to use, or None. + """ cleanup_tasks = [] clock = Clock(reactor) @@ -58,7 +60,8 @@ async def make_homeserver(reactor, config=None): def make_reactor(): """ - Make an install a Twisted reactor. + Instantiate and install a Twisted reactor suitable for testing (i.e. not the + default global one). """ reactor = epollreactor.EPollReactor() diff --git a/synmark/__main__.py b/synmark/__main__.py index 8a8fbdf6c7..ac59befbd4 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py @@ -18,17 +18,19 @@ from contextlib import redirect_stderr from io import StringIO import pyperf -from synmark import make_reactor, setup_database +from synmark import make_reactor from synmark.suites import SUITES from twisted.internet.defer import ensureDeferred from twisted.logger import globalLogBeginner, textFileLogObserver from twisted.python.failure import Failure +from tests.utils import setupdb + def make_test(main): """ - Take main, the test function, and wrap it in a reactor start and stop. + Take a benchmark function and wrap it in a reactor start and stop. """ def _main(loops): @@ -75,7 +77,7 @@ if __name__ == "__main__": globalLogBeginner.beginLoggingTo( [textFileLogObserver(sys.__stdout__)], redirectStandardIO=False ) - setup_database() + setupdb() for suite, loops in SUITES: if loops: diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index 26b78e4569..d8e4c7d58f 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -47,7 +47,9 @@ class LineCounter(LineOnlyReceiver): async def main(reactor, loops): - + """ + Benchmark how long it takes to send `loops` messages. + """ servers = [] def protocol(): From f7ec52670b0399ebcc3349932fac867a55e78e7a Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Mon, 2 Dec 2019 19:45:20 +1100 Subject: [PATCH 09/11] add some LruCache benchmarks --- synmark/__main__.py | 14 ++++++++--- synmark/suites/__init__.py | 4 +-- synmark/suites/lrucache.py | 41 +++++++++++++++++++++++++++++++ synmark/suites/lrucache_evict.py | 42 ++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 synmark/suites/lrucache.py create mode 100644 synmark/suites/lrucache_evict.py diff --git a/synmark/__main__.py b/synmark/__main__.py index ac59befbd4..d3f4b5ba05 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py @@ -18,10 +18,11 @@ from contextlib import redirect_stderr from io import StringIO import pyperf +from argparse import REMAINDER from synmark import make_reactor from synmark.suites import SUITES -from twisted.internet.defer import ensureDeferred +from twisted.internet.defer import ensureDeferred, Deferred from twisted.logger import globalLogBeginner, textFileLogObserver from twisted.python.failure import Failure @@ -40,7 +41,8 @@ def make_test(main): file_out = StringIO() with redirect_stderr(file_out): - d = ensureDeferred(main(reactor, loops)) + d = Deferred() + d.addCallback(lambda _: ensureDeferred(main(reactor, loops))) def on_done(_): if isinstance(_, Failure): @@ -50,6 +52,7 @@ def make_test(main): return _ d.addBoth(on_done) + reactor.callWhenRunning(lambda: d.callback(True)) reactor.run() return d.result @@ -62,11 +65,13 @@ if __name__ == "__main__": def add_cmdline_args(cmd, args): if args.log: cmd.extend(["--log"]) + cmd.extend(args.tests) runner = pyperf.Runner( - processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args + processes=3, min_time=1.5, show_name=True, add_cmdline_args=add_cmdline_args ) runner.argparser.add_argument("--log", action="store_true") + runner.argparser.add_argument("tests", nargs=REMAINDER) runner.parse_args() orig_loops = runner.args.loops @@ -79,6 +84,9 @@ if __name__ == "__main__": ) setupdb() + if runner.args.tests: + SUITES = list(filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES)) + for suite, loops in SUITES: if loops: runner.args.loops = loops diff --git a/synmark/suites/__init__.py b/synmark/suites/__init__.py index cfa3b0ba38..39b872762e 100644 --- a/synmark/suites/__init__.py +++ b/synmark/suites/__init__.py @@ -1,3 +1,3 @@ -from . import logging +from . import logging, lrucache, lrucache_evict -SUITES = [(logging, 1000), (logging, 10000), (logging, None)] +SUITES = [(logging, 1000), (logging, 10000), (logging, None), (lrucache, None), (lrucache_evict, None)] diff --git a/synmark/suites/lrucache.py b/synmark/suites/lrucache.py new file mode 100644 index 0000000000..b7a96132c7 --- /dev/null +++ b/synmark/suites/lrucache.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings +from io import StringIO + +from mock import Mock + +from pyperf import perf_counter +from synmark import make_homeserver + +from synapse.util.caches.lrucache import LruCache +from synapse.logging._structured import setup_structured_logging + + +async def main(reactor, loops): + """ + Benchmark `loops` number of insertions into LruCache without eviction. + """ + cache = LruCache(loops) + + start = perf_counter() + + for i in range(loops): + cache[i] = True + + end = perf_counter() - start + + return end diff --git a/synmark/suites/lrucache_evict.py b/synmark/suites/lrucache_evict.py new file mode 100644 index 0000000000..c88ff5a3f8 --- /dev/null +++ b/synmark/suites/lrucache_evict.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings +from io import StringIO + +from mock import Mock + +from pyperf import perf_counter +from synmark import make_homeserver + +from synapse.util.caches.lrucache import LruCache +from synapse.logging._structured import setup_structured_logging + + +async def main(reactor, loops): + """ + Benchmark `loops` number of insertions into LruCache where half of them are + evicted. + """ + cache = LruCache(loops // 2) + + start = perf_counter() + + for i in range(loops): + cache[i] = True + + end = perf_counter() - start + + return end From e174b2d19c5d703cd01240e2a31a96f0c89d4bc0 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Mon, 2 Dec 2019 19:49:13 +1100 Subject: [PATCH 10/11] fix style --- synmark/__main__.py | 8 +++++--- synmark/suites/__init__.py | 8 +++++++- synmark/suites/lrucache.py | 7 ------- synmark/suites/lrucache_evict.py | 7 ------- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/synmark/__main__.py b/synmark/__main__.py index d3f4b5ba05..17df9ddeb7 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py @@ -14,15 +14,15 @@ # limitations under the License. import sys +from argparse import REMAINDER from contextlib import redirect_stderr from io import StringIO import pyperf -from argparse import REMAINDER from synmark import make_reactor from synmark.suites import SUITES -from twisted.internet.defer import ensureDeferred, Deferred +from twisted.internet.defer import Deferred, ensureDeferred from twisted.logger import globalLogBeginner, textFileLogObserver from twisted.python.failure import Failure @@ -85,7 +85,9 @@ if __name__ == "__main__": setupdb() if runner.args.tests: - SUITES = list(filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES)) + SUITES = list( + filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES) + ) for suite, loops in SUITES: if loops: diff --git a/synmark/suites/__init__.py b/synmark/suites/__init__.py index 39b872762e..d8445fc3df 100644 --- a/synmark/suites/__init__.py +++ b/synmark/suites/__init__.py @@ -1,3 +1,9 @@ from . import logging, lrucache, lrucache_evict -SUITES = [(logging, 1000), (logging, 10000), (logging, None), (lrucache, None), (lrucache_evict, None)] +SUITES = [ + (logging, 1000), + (logging, 10000), + (logging, None), + (lrucache, None), + (lrucache_evict, None), +] diff --git a/synmark/suites/lrucache.py b/synmark/suites/lrucache.py index b7a96132c7..69ab042ccc 100644 --- a/synmark/suites/lrucache.py +++ b/synmark/suites/lrucache.py @@ -13,16 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import warnings -from io import StringIO - -from mock import Mock - from pyperf import perf_counter -from synmark import make_homeserver from synapse.util.caches.lrucache import LruCache -from synapse.logging._structured import setup_structured_logging async def main(reactor, loops): diff --git a/synmark/suites/lrucache_evict.py b/synmark/suites/lrucache_evict.py index c88ff5a3f8..532b1cc702 100644 --- a/synmark/suites/lrucache_evict.py +++ b/synmark/suites/lrucache_evict.py @@ -13,16 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import warnings -from io import StringIO - -from mock import Mock - from pyperf import perf_counter -from synmark import make_homeserver from synapse.util.caches.lrucache import LruCache -from synapse.logging._structured import setup_structured_logging async def main(reactor, loops): From 9735a08f04b02683f1ee6c86c59d83a9370cba36 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Tue, 3 Dec 2019 20:24:43 +1100 Subject: [PATCH 11/11] newsfile --- changelog.d/6446.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6446.misc diff --git a/changelog.d/6446.misc b/changelog.d/6446.misc new file mode 100644 index 0000000000..c42df16f1a --- /dev/null +++ b/changelog.d/6446.misc @@ -0,0 +1 @@ +Add benchmarks for LruCache.