From 99ab65af2f6976bb9ea15f637bb04b6f6dd7690c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 24 Oct 2019 20:59:06 +0300 Subject: [PATCH] fix up logging to use rapidjson --- synapse/logging/_structured.py | 9 +++- synapse/logging/_terse_json.py | 77 ++++++++++++++++++++++++---------- synapse/python_dependencies.py | 1 + tests/server.py | 2 + tests/unittest.py | 16 ++++--- tox.ini | 9 ++++ 6 files changed, 87 insertions(+), 27 deletions(-) diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 3220e985a9..c6a01577a6 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -261,6 +261,13 @@ def parse_drain_configs( ) +class StoppableLogPublisher(LogPublisher): + def stop(self): + for obs in self._observers: + if hasattr(obs, "stop"): + obs.stop() + + def setup_structured_logging( hs, config, @@ -336,7 +343,7 @@ def setup_structured_logging( # We should never get here, but, just in case, throw an error. raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - publisher = LogPublisher(*observers) + publisher = StoppableLogPublisher(*observers) log_filter = LogLevelFilterPredicate() for namespace, namespace_config in log_config.get( diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 0ebbde06f2..f3b27d5bfd 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -24,7 +24,7 @@ from math import floor from typing import IO import attr -from simplejson import dumps +from rapidjson import dumps from zope.interface import implementer from twisted.application.internet import ClientService @@ -33,9 +33,9 @@ from twisted.internet.endpoints import ( TCP4ClientEndpoint, TCP6ClientEndpoint, ) +from twisted.internet.interfaces import IPushProducer from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger -from twisted.python.failure import Failure def flatten_event(event: dict, metadata: dict, include_time: bool = False): @@ -141,11 +141,40 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" + return dumps(flattened, ensure_ascii=False) + "\n" return FileLogObserver(outFile, formatEvent) +@attr.s +@implementer(IPushProducer) +class LogProducer(object): + + _buffer = attr.ib() + _transport = attr.ib() + paused = attr.ib(default=False) + + def pauseProducing(self): + self.paused = True + + def stopProducing(self): + self.paused = True + self._buffer = None + + def resumeProducing(self): + self.paused = False + + while self.paused is False and (self._buffer and self._transport.connected): + try: + event = self._buffer.popleft() + self._transport.write(dumps(event, ensure_ascii=False).encode("utf8")) + self._transport.write(b"\n") + except Exception: + import traceback + + traceback.print_exc(file=sys.__stderr__) + + @attr.s @implementer(ILogObserver) class TerseJSONToTCPLogObserver(object): @@ -167,6 +196,7 @@ class TerseJSONToTCPLogObserver(object): _buffer = attr.ib(default=attr.Factory(deque), type=deque) _writer = attr.ib(default=None) _logger = attr.ib(default=attr.Factory(Logger)) + _producer = attr.ib(default=None) def start(self) -> None: @@ -188,6 +218,9 @@ class TerseJSONToTCPLogObserver(object): self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service.startService() + def stop(self): + self._service.stopService() + def _write_loop(self) -> None: """ Implement the write loop. @@ -195,27 +228,29 @@ class TerseJSONToTCPLogObserver(object): if self._writer: return + if self._producer and self._producer._transport.connected: + self._producer.resumeProducing() + return + self._writer = self._service.whenConnected() - @self._writer.addBoth - def writer(r): - if isinstance(r, Failure): - r.printTraceback(file=sys.__stderr__) - self._writer = None - self.hs.get_reactor().callLater(1, self._write_loop) - return + @self._writer.addErrback + def fail(r): + r.printTraceback(file=sys.__stderr__) + self._writer = None + self.hs.get_reactor().callLater(1, self._write_loop) + return - try: - for event in self._buffer: - r.transport.write( - dumps(event, ensure_ascii=False, separators=(",", ":")).encode( - "utf8" - ) - ) - r.transport.write(b"\n") - self._buffer.clear() - except Exception as e: - sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) + @self._writer.addCallback + def writer(r): + def connectionLost(_self, reason): + self._producer.pauseProducing() + self._producer = None + self.hs.get_reactor().callLater(1, self._write_loop) + + self._producer = LogProducer(self._buffer, r.transport) + r.transport.registerProducer(self._producer, True) + self._producer.resumeProducing() self._writer = False self.hs.get_reactor().callLater(1, self._write_loop) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index aa7da1c543..3c0040482c 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -74,6 +74,7 @@ REQUIREMENTS = [ "Jinja2>=2.9", "bleach>=1.4.3", "typing-extensions>=3.7.4", + "python-rapidjson>=0.4" ] CONDITIONAL_REQUIREMENTS = { diff --git a/tests/server.py b/tests/server.py index e397ebe8fa..bd647906ba 100644 --- a/tests/server.py +++ b/tests/server.py @@ -375,6 +375,7 @@ class FakeTransport(object): disconnecting = False disconnected = False + connected = True buffer = attr.ib(default=b"") producer = attr.ib(default=None) autoflush = attr.ib(default=True) @@ -392,6 +393,7 @@ class FakeTransport(object): if self._protocol: self._protocol.connectionLost(reason) self.disconnected = True + self.connected = False def abortConnection(self): logger.info("FakeTransport: abortConnection()") diff --git a/tests/unittest.py b/tests/unittest.py index 561cebc223..dc251b1702 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -178,11 +178,14 @@ class HomeserverTestCase(TestCase): needs_threadpool = False def __init__(self, methodName, *args, **kwargs): - super().__init__(methodName, *args, **kwargs) + if methodName: + super().__init__(methodName, *args, **kwargs) - # see if we have any additional config for this test - method = getattr(self, methodName) - self._extra_config = getattr(method, "_extra_config", None) + # see if we have any additional config for this test + method = getattr(self, methodName) + self._extra_config = getattr(method, "_extra_config", None) + else: + self._extra_config = None def setUp(self): """ @@ -190,7 +193,7 @@ class HomeserverTestCase(TestCase): hijacking the authentication system to return a fixed user, and then calling the prepare function. """ - self.reactor, self.clock = get_clock() + self.reactor, self.clock = self.get_clock() self._hs_args = {"clock": self.clock, "reactor": self.reactor} self.hs = self.make_homeserver(self.reactor, self.clock) @@ -240,6 +243,9 @@ class HomeserverTestCase(TestCase): if hasattr(self, "prepare"): self.prepare(self.reactor, self.clock, self.hs) + def get_clock(self): + return get_clock() + def wait_on_thread(self, deferred, timeout=10): """ Wait until a Deferred is done, where it's waiting on a real thread. diff --git a/tox.ini b/tox.ini index 3cd2c5e633..4249f9df5a 100644 --- a/tox.ini +++ b/tox.ini @@ -102,6 +102,15 @@ commands = {envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} +[testenv:benchmark] +deps = + {[base]deps} + pyperf +setenv = + SYNAPSE_POSTGRES = 1 +commands = + python -m synapse.benchmarks {posargs:} + [testenv:packaging] skip_install=True deps =