fix up logging to use rapidjson

pull/6266/head
Amber Brown 2019-10-24 20:59:06 +03:00
parent 92e88a71d3
commit 99ab65af2f
6 changed files with 87 additions and 27 deletions

View File

@ -261,6 +261,13 @@ def parse_drain_configs(
)
class StoppableLogPublisher(LogPublisher):
def stop(self):
for obs in self._observers:
if hasattr(obs, "stop"):
obs.stop()
def setup_structured_logging(
hs,
config,
@ -336,7 +343,7 @@ def setup_structured_logging(
# We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,))
publisher = LogPublisher(*observers)
publisher = StoppableLogPublisher(*observers)
log_filter = LogLevelFilterPredicate()
for namespace, namespace_config in log_config.get(

View File

@ -24,7 +24,7 @@ from math import floor
from typing import IO
import attr
from simplejson import dumps
from rapidjson import dumps
from zope.interface import implementer
from twisted.application.internet import ClientService
@ -33,9 +33,9 @@ from twisted.internet.endpoints import (
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer
from twisted.internet.protocol import Factory, Protocol
from twisted.logger import FileLogObserver, ILogObserver, Logger
from twisted.python.failure import Failure
def flatten_event(event: dict, metadata: dict, include_time: bool = False):
@ -141,11 +141,40 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
def formatEvent(_event: dict) -> str:
flattened = flatten_event(_event, metadata)
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
return dumps(flattened, ensure_ascii=False) + "\n"
return FileLogObserver(outFile, formatEvent)
@attr.s
@implementer(IPushProducer)
class LogProducer(object):
_buffer = attr.ib()
_transport = attr.ib()
paused = attr.ib(default=False)
def pauseProducing(self):
self.paused = True
def stopProducing(self):
self.paused = True
self._buffer = None
def resumeProducing(self):
self.paused = False
while self.paused is False and (self._buffer and self._transport.connected):
try:
event = self._buffer.popleft()
self._transport.write(dumps(event, ensure_ascii=False).encode("utf8"))
self._transport.write(b"\n")
except Exception:
import traceback
traceback.print_exc(file=sys.__stderr__)
@attr.s
@implementer(ILogObserver)
class TerseJSONToTCPLogObserver(object):
@ -167,6 +196,7 @@ class TerseJSONToTCPLogObserver(object):
_buffer = attr.ib(default=attr.Factory(deque), type=deque)
_writer = attr.ib(default=None)
_logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None)
def start(self) -> None:
@ -188,6 +218,9 @@ class TerseJSONToTCPLogObserver(object):
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service.startService()
def stop(self):
self._service.stopService()
def _write_loop(self) -> None:
"""
Implement the write loop.
@ -195,27 +228,29 @@ class TerseJSONToTCPLogObserver(object):
if self._writer:
return
if self._producer and self._producer._transport.connected:
self._producer.resumeProducing()
return
self._writer = self._service.whenConnected()
@self._writer.addBoth
def writer(r):
if isinstance(r, Failure):
r.printTraceback(file=sys.__stderr__)
self._writer = None
self.hs.get_reactor().callLater(1, self._write_loop)
return
@self._writer.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
self._writer = None
self.hs.get_reactor().callLater(1, self._write_loop)
return
try:
for event in self._buffer:
r.transport.write(
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
"utf8"
)
)
r.transport.write(b"\n")
self._buffer.clear()
except Exception as e:
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
@self._writer.addCallback
def writer(r):
def connectionLost(_self, reason):
self._producer.pauseProducing()
self._producer = None
self.hs.get_reactor().callLater(1, self._write_loop)
self._producer = LogProducer(self._buffer, r.transport)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._writer = False
self.hs.get_reactor().callLater(1, self._write_loop)

View File

@ -74,6 +74,7 @@ REQUIREMENTS = [
"Jinja2>=2.9",
"bleach>=1.4.3",
"typing-extensions>=3.7.4",
"python-rapidjson>=0.4"
]
CONDITIONAL_REQUIREMENTS = {

View File

@ -375,6 +375,7 @@ class FakeTransport(object):
disconnecting = False
disconnected = False
connected = True
buffer = attr.ib(default=b"")
producer = attr.ib(default=None)
autoflush = attr.ib(default=True)
@ -392,6 +393,7 @@ class FakeTransport(object):
if self._protocol:
self._protocol.connectionLost(reason)
self.disconnected = True
self.connected = False
def abortConnection(self):
logger.info("FakeTransport: abortConnection()")

View File

@ -178,11 +178,14 @@ class HomeserverTestCase(TestCase):
needs_threadpool = False
def __init__(self, methodName, *args, **kwargs):
super().__init__(methodName, *args, **kwargs)
if methodName:
super().__init__(methodName, *args, **kwargs)
# see if we have any additional config for this test
method = getattr(self, methodName)
self._extra_config = getattr(method, "_extra_config", None)
# see if we have any additional config for this test
method = getattr(self, methodName)
self._extra_config = getattr(method, "_extra_config", None)
else:
self._extra_config = None
def setUp(self):
"""
@ -190,7 +193,7 @@ class HomeserverTestCase(TestCase):
hijacking the authentication system to return a fixed user, and then
calling the prepare function.
"""
self.reactor, self.clock = get_clock()
self.reactor, self.clock = self.get_clock()
self._hs_args = {"clock": self.clock, "reactor": self.reactor}
self.hs = self.make_homeserver(self.reactor, self.clock)
@ -240,6 +243,9 @@ class HomeserverTestCase(TestCase):
if hasattr(self, "prepare"):
self.prepare(self.reactor, self.clock, self.hs)
def get_clock(self):
return get_clock()
def wait_on_thread(self, deferred, timeout=10):
"""
Wait until a Deferred is done, where it's waiting on a real thread.

View File

@ -102,6 +102,15 @@ commands =
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
[testenv:benchmark]
deps =
{[base]deps}
pyperf
setenv =
SYNAPSE_POSTGRES = 1
commands =
python -m synapse.benchmarks {posargs:}
[testenv:packaging]
skip_install=True
deps =