more fixes

pull/6266/head
Amber H. Brown 2019-12-02 18:30:11 +11:00
parent d70be1871c
commit c11c8ad39f
5 changed files with 98 additions and 67 deletions

View File

@ -256,6 +256,7 @@ class TerseJSONToTCPLogObserver(object):
# transport is the same, just trigger a resumeProducing. # transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport: if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing() self._producer.resumeProducing()
self._connection_waiter = None
return return
# If the producer is still producing, stop it. # If the producer is still producing, stop it.

View File

@ -13,21 +13,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet.defer import Deferred import sys
from twisted.internet import epollreactor
from twisted.internet.main import installReactor
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.util import Clock from synapse.util import Clock
from tests.utils import default_config, setup_test_homeserver, setupdb from tests.utils import default_config, setup_test_homeserver, setupdb
DB_SETUP = False
def setup_database(): def setup_database():
global DB_SETUP setupdb()
if not DB_SETUP:
setupdb()
DB_SETUP = True
async def make_homeserver(reactor, config=None): async def make_homeserver(reactor, config=None):
@ -56,3 +54,16 @@ async def make_homeserver(reactor, config=None):
i() i()
return hs, clock.sleep, cleanup return hs, clock.sleep, cleanup
def make_reactor():
"""
Make an install a Twisted reactor.
"""
reactor = epollreactor.EPollReactor()
if "twisted.internet.reactor" in sys.modules:
del sys.modules["twisted.internet.reactor"]
installReactor(reactor)
return reactor

View File

@ -13,19 +13,76 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import sys
from contextlib import redirect_stderr
from io import StringIO
import pyperf import pyperf
from synmark import make_reactor, setup_database
from twisted.python import reflect
from synmark.suites import SUITES from synmark.suites import SUITES
from twisted.internet.defer import ensureDeferred
from twisted.logger import globalLogBeginner, textFileLogObserver
from twisted.python.failure import Failure
def make_test(main):
"""
Take main, the test function, and wrap it in a reactor start and stop.
"""
def _main(loops):
reactor = make_reactor()
file_out = StringIO()
with redirect_stderr(file_out):
d = ensureDeferred(main(reactor, loops))
def on_done(_):
if isinstance(_, Failure):
_.printTraceback()
print(file_out.getvalue())
reactor.stop()
return _
d.addBoth(on_done)
reactor.run()
return d.result
return _main
if __name__ == "__main__": if __name__ == "__main__":
runner = pyperf.Runner(processes=5, values=1, warmups=0) def add_cmdline_args(cmd, args):
if args.log:
cmd.extend(["--log"])
runner = pyperf.Runner(
processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args
)
runner.argparser.add_argument("--log", action="store_true")
runner.parse_args() runner.parse_args()
orig_loops = runner.args.loops
runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] runner.args.inherit_environ = ["SYNAPSE_POSTGRES"]
if runner.args.worker:
if runner.args.log:
globalLogBeginner.beginLoggingTo(
[textFileLogObserver(sys.__stdout__)], redirectStandardIO=False
)
setup_database()
for suite, loops in SUITES: for suite, loops in SUITES:
print(suite, loops) if loops:
runner.args.loops = loops runner.args.loops = loops
runner.bench_time_func(suite.__name__ + "_" + str(loops), suite.main) else:
runner.args.loops = orig_loops
loops = "auto"
runner.bench_time_func(
suite.__name__ + "_" + str(loops), make_test(suite.main),
)

View File

@ -1,6 +1,3 @@
from . import logging from . import logging
SUITES = [(logging, 1000), (logging, 10000), (logging, None)]
SUITES = [(logging, 1000), (logging, 10000)]

View File

@ -14,21 +14,18 @@
# limitations under the License. # limitations under the License.
import warnings import warnings
from contextlib import redirect_stderr
from io import StringIO from io import StringIO
from mock import Mock from mock import Mock
import sys
from pyperf import perf_counter from pyperf import perf_counter
from synmark import make_homeserver
from twisted.internet.defer import ensureDeferred from twisted.internet.defer import Deferred
from twisted.internet.protocol import ServerFactory from twisted.internet.protocol import ServerFactory
from twisted.logger import LogBeginner, Logger, LogPublisher, globalLogBeginner, textFileLogObserver from twisted.logger import LogBeginner, Logger, LogPublisher
from twisted.protocols.basic import LineOnlyReceiver from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
from synmark import make_homeserver, setup_database
from synapse.logging._structured import setup_structured_logging from synapse.logging._structured import setup_structured_logging
@ -43,20 +40,24 @@ class LineCounter(LineOnlyReceiver):
def lineReceived(self, line): def lineReceived(self, line):
self.count += 1 self.count += 1
if self.count >= self.factory.wait_for and self.factory.on_done:
on_done = self.factory.on_done
self.factory.on_done = None
on_done.callback(True)
async def _main(reactor, loops):
async def main(reactor, loops):
servers = [] servers = []
print("?")
def protocol(): def protocol():
p = LineCounter() p = LineCounter()
servers.append(p) servers.append(p)
return p return p
logger_factory = ServerFactory.forProtocol(protocol) logger_factory = ServerFactory.forProtocol(protocol)
logger_factory.wait_for = loops
logger_factory.on_done = Deferred()
port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1")
hs, wait, cleanup = await make_homeserver(reactor) hs, wait, cleanup = await make_homeserver(reactor)
@ -81,22 +82,18 @@ async def _main(reactor, loops):
} }
logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher)
start = perf_counter()
logging_system = setup_structured_logging( logging_system = setup_structured_logging(
hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False
) )
print("hi")
# Wait for it to connect... # Wait for it to connect...
await logging_system._observers[0]._service.whenConnected() await logging_system._observers[0]._service.whenConnected()
start = perf_counter()
# Send a bunch of useful messages # Send a bunch of useful messages
for i in range(0, loops): for i in range(0, loops):
logger.info("test message %s" % (i,)) logger.info("test message %s" % (i,))
print(i)
if ( if (
len(logging_system._observers[0]._buffer) len(logging_system._observers[0]._buffer)
@ -108,9 +105,7 @@ async def _main(reactor, loops):
): ):
await wait(0.01) await wait(0.01)
while servers[0].count != loops: await logger_factory.on_done
print(servers[0].count, loops)
await wait(0.01)
end = perf_counter() - start end = perf_counter() - start
@ -119,33 +114,3 @@ async def _main(reactor, loops):
cleanup() cleanup()
return end return end
def main(loops):
print("hi?")
print(loops)
setup_database()
if globalLogBeginner._temporaryObserver:
globalLogBeginner.beginLoggingTo([textFileLogObserver(sys.__stderr__)])
file_out = StringIO()
with redirect_stderr(file_out):
from twisted.internet import epollreactor
reactor = epollreactor.EPollReactor()
d = ensureDeferred(_main(reactor, loops))
def on_done(_):
if isinstance(_, Failure):
_.printTraceback()
reactor.stop()
return _
d.addBoth(on_done)
reactor.run()
return d.result