Give pushers their own background logcontext

Each pusher has its own loop which runs for as long as it has work to do. This
should run in its own background thread with its own logcontext, as other
similar loops elsewhere in the system do - which means that CPU usage is
consistently attributed to that loop, rather than to whatever request happened
to start the loop.
pull/4075/head
Richard van der Hoff 2018-10-22 16:12:11 +01:00
parent 5110f4e425
commit c7273c11bc
2 changed files with 54 additions and 58 deletions

View File

@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@ -80,7 +79,7 @@ class EmailPusher(object):
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
yield self._process()
self._start_processing()
except Exception:
logger.exception("Error starting email pusher")
@ -92,10 +91,10 @@ class EmailPusher(object):
pass
self.timed_call = None
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
yield self._process()
self._start_processing()
return defer.succeed(None)
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
@ -103,32 +102,33 @@ class EmailPusher(object):
# timer fire
return defer.succeed(None)
@defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
yield self._process()
self._start_processing()
@defer.inlineCallbacks
def _process(self):
def _start_processing(self):
if self.processing:
return
with LoggingContext("emailpush._process"):
with Measure(self.clock, "emailpush._process"):
run_as_background_process("emailpush.process", self._process)
@defer.inlineCallbacks
def _process(self):
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
@defer.inlineCallbacks
def _unsafe_process(self):

View File

@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@ -92,34 +91,30 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
@defer.inlineCallbacks
def on_started(self):
try:
yield self._process()
except Exception:
logger.exception("Error starting http pusher")
self._start_processing()
return defer.succeed(None)
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
yield self._process()
self._start_processing()
return defer.suceed(None)
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
with LoggingContext("push.on_new_receipts"):
with Measure(self.clock, "push.on_new_receipts"):
badge = yield push_tools.get_badge_count(
self.hs.get_datastore(), self.user_id
)
yield self._send_badge(badge)
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
return defer.succeed(None)
@defer.inlineCallbacks
def _update_badge(self):
badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
yield self._send_badge(badge)
def on_timer(self):
yield self._process()
self._start_processing()
def on_stop(self):
if self.timed_call:
@ -129,27 +124,28 @@ class HttpPusher(object):
pass
self.timed_call = None
@defer.inlineCallbacks
def _process(self):
def _start_processing(self):
if self.processing:
return
with LoggingContext("push._process"):
with Measure(self.clock, "push._process"):
run_as_background_process("httppush.process", self._process)
@defer.inlineCallbacks
def _process(self):
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
@defer.inlineCallbacks
def _unsafe_process(self):