From c11c8ad39f11a8205778b0bf41bd143e288528f0 Mon Sep 17 00:00:00 2001 From: "Amber H. Brown" Date: Mon, 2 Dec 2019 18:30:11 +1100 Subject: [PATCH] more fixes --- synapse/logging/_terse_json.py | 1 + synmark/__init__.py | 25 ++++++++---- synmark/__main__.py | 71 ++++++++++++++++++++++++++++++---- synmark/suites/__init__.py | 5 +-- synmark/suites/logging.py | 63 +++++++----------------------- 5 files changed, 98 insertions(+), 67 deletions(-) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 05fc64f409..03934956f4 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -256,6 +256,7 @@ class TerseJSONToTCPLogObserver(object): # transport is the same, just trigger a resumeProducing. if self._producer and r.transport is self._producer.transport: self._producer.resumeProducing() + self._connection_waiter = None return # If the producer is still producing, stop it. diff --git a/synmark/__init__.py b/synmark/__init__.py index 64c2744ae7..5cc8ed28ee 100644 --- a/synmark/__init__.py +++ b/synmark/__init__.py @@ -13,21 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet.defer import Deferred +import sys + +from twisted.internet import epollreactor +from twisted.internet.main import installReactor from synapse.config.homeserver import HomeServerConfig from synapse.util import Clock from tests.utils import default_config, setup_test_homeserver, setupdb -DB_SETUP = False - def setup_database(): - global DB_SETUP - if not DB_SETUP: - setupdb() - DB_SETUP = True + setupdb() async def make_homeserver(reactor, config=None): @@ -56,3 +54,16 @@ async def make_homeserver(reactor, config=None): i() return hs, clock.sleep, cleanup + + +def make_reactor(): + """ + Make an install a Twisted reactor. + """ + reactor = epollreactor.EPollReactor() + + if "twisted.internet.reactor" in sys.modules: + del sys.modules["twisted.internet.reactor"] + installReactor(reactor) + + return reactor diff --git a/synmark/__main__.py b/synmark/__main__.py index e208b04a0b..8a8fbdf6c7 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py @@ -13,19 +13,76 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys +from contextlib import redirect_stderr +from io import StringIO + import pyperf - -from twisted.python import reflect - +from synmark import make_reactor, setup_database from synmark.suites import SUITES +from twisted.internet.defer import ensureDeferred +from twisted.logger import globalLogBeginner, textFileLogObserver +from twisted.python.failure import Failure + + +def make_test(main): + """ + Take main, the test function, and wrap it in a reactor start and stop. + """ + + def _main(loops): + + reactor = make_reactor() + + file_out = StringIO() + with redirect_stderr(file_out): + + d = ensureDeferred(main(reactor, loops)) + + def on_done(_): + if isinstance(_, Failure): + _.printTraceback() + print(file_out.getvalue()) + reactor.stop() + return _ + + d.addBoth(on_done) + reactor.run() + + return d.result + + return _main + + if __name__ == "__main__": - runner = pyperf.Runner(processes=5, values=1, warmups=0) + def add_cmdline_args(cmd, args): + if args.log: + cmd.extend(["--log"]) + + runner = pyperf.Runner( + processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args + ) + runner.argparser.add_argument("--log", action="store_true") runner.parse_args() + + orig_loops = runner.args.loops runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] + if runner.args.worker: + if runner.args.log: + globalLogBeginner.beginLoggingTo( + [textFileLogObserver(sys.__stdout__)], redirectStandardIO=False + ) + setup_database() + for suite, loops in SUITES: - print(suite, loops) - runner.args.loops = loops - runner.bench_time_func(suite.__name__ + "_" + str(loops), suite.main) + if loops: + runner.args.loops = loops + else: + runner.args.loops = orig_loops + loops = "auto" + runner.bench_time_func( + suite.__name__ + "_" + str(loops), make_test(suite.main), + ) diff --git a/synmark/suites/__init__.py b/synmark/suites/__init__.py index 412c15f52c..cfa3b0ba38 100644 --- a/synmark/suites/__init__.py +++ b/synmark/suites/__init__.py @@ -1,6 +1,3 @@ from . import logging - -SUITES = [(logging, 1000), (logging, 10000)] - - +SUITES = [(logging, 1000), (logging, 10000), (logging, None)] diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index bc967e97ab..26b78e4569 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -14,21 +14,18 @@ # limitations under the License. import warnings -from contextlib import redirect_stderr from io import StringIO from mock import Mock -import sys from pyperf import perf_counter +from synmark import make_homeserver -from twisted.internet.defer import ensureDeferred +from twisted.internet.defer import Deferred from twisted.internet.protocol import ServerFactory -from twisted.logger import LogBeginner, Logger, LogPublisher, globalLogBeginner, textFileLogObserver +from twisted.logger import LogBeginner, Logger, LogPublisher from twisted.protocols.basic import LineOnlyReceiver -from twisted.python.failure import Failure -from synmark import make_homeserver, setup_database from synapse.logging._structured import setup_structured_logging @@ -43,20 +40,24 @@ class LineCounter(LineOnlyReceiver): def lineReceived(self, line): self.count += 1 + if self.count >= self.factory.wait_for and self.factory.on_done: + on_done = self.factory.on_done + self.factory.on_done = None + on_done.callback(True) -async def _main(reactor, loops): + +async def main(reactor, loops): servers = [] - print("?") - - def protocol(): p = LineCounter() servers.append(p) return p logger_factory = ServerFactory.forProtocol(protocol) + logger_factory.wait_for = loops + logger_factory.on_done = Deferred() port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") hs, wait, cleanup = await make_homeserver(reactor) @@ -81,22 +82,18 @@ async def _main(reactor, loops): } logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) - - start = perf_counter() - logging_system = setup_structured_logging( hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False ) - print("hi") - # Wait for it to connect... await logging_system._observers[0]._service.whenConnected() + start = perf_counter() + # Send a bunch of useful messages for i in range(0, loops): logger.info("test message %s" % (i,)) - print(i) if ( len(logging_system._observers[0]._buffer) @@ -108,9 +105,7 @@ async def _main(reactor, loops): ): await wait(0.01) - while servers[0].count != loops: - print(servers[0].count, loops) - await wait(0.01) + await logger_factory.on_done end = perf_counter() - start @@ -119,33 +114,3 @@ async def _main(reactor, loops): cleanup() return end - - -def main(loops): - - print("hi?") - print(loops) - - setup_database() - - if globalLogBeginner._temporaryObserver: - globalLogBeginner.beginLoggingTo([textFileLogObserver(sys.__stderr__)]) - - file_out = StringIO() - with redirect_stderr(file_out): - - from twisted.internet import epollreactor - - reactor = epollreactor.EPollReactor() - d = ensureDeferred(_main(reactor, loops)) - - def on_done(_): - if isinstance(_, Failure): - _.printTraceback() - reactor.stop() - return _ - - d.addBoth(on_done) - reactor.run() - - return d.result