Make inflight background metrics more efficient. (#7597)

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
pull/7599/head
Erik Johnston 2020-05-29 13:25:32 +01:00 committed by GitHub
parent 47db2c3673
commit f5353eff21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 34 deletions

1
changelog.d/7597.bugfix Normal file
View File

@ -0,0 +1 @@
Fix metrics failing when there is a large number of active background processes.

View File

@ -17,16 +17,18 @@ import logging
import threading import threading
from asyncio import iscoroutine from asyncio import iscoroutine
from functools import wraps from functools import wraps
from typing import Dict, Set from typing import TYPE_CHECKING, Dict, Optional, Set
import six from prometheus_client.core import REGISTRY, Counter, Gauge
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
from twisted.internet import defer from twisted.internet import defer
from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.logging.context import LoggingContext, PreserveLoggingContext
if TYPE_CHECKING:
import resource
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,6 +38,12 @@ _background_process_start_count = Counter(
["name"], ["name"],
) )
_background_process_in_flight_count = Gauge(
"synapse_background_process_in_flight_count",
"Number of background processes in flight",
labelnames=["name"],
)
# we set registry=None in all of these to stop them getting registered with # we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector, # the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected. # which ensures that we can update them before they are collected.
@ -83,13 +91,17 @@ _background_process_db_sched_duration = Counter(
# it's much simpler to do so than to try to combine them.) # it's much simpler to do so than to try to combine them.)
_background_process_counts = {} # type: Dict[str, int] _background_process_counts = {} # type: Dict[str, int]
# map from description to the currently running background processes. # Set of all running background processes that became active active since the
# last time metrics were scraped (i.e. background processes that performed some
# work since the last scrape.)
# #
# it's kept as a dict of sets rather than a big set so that we can keep track # We do it like this to handle the case where we have a large number of
# of process descriptions that no longer have any active processes. # background processes stacking up behind a lock or linearizer, where we then
_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]] # only need to iterate over and update metrics for the process that have
# actually been active and can ignore the idle ones.
_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess]
# A lock that covers the above dicts # A lock that covers the above set and dict
_bg_metrics_lock = threading.Lock() _bg_metrics_lock = threading.Lock()
@ -101,25 +113,16 @@ class _Collector(object):
""" """
def collect(self): def collect(self):
background_process_in_flight_count = GaugeMetricFamily( global _background_processes_active_since_last_scrape
"synapse_background_process_in_flight_count",
"Number of background processes in flight",
labels=["name"],
)
# We copy the dict so that it doesn't change from underneath us. # We swap out the _background_processes set with an empty one so that
# We also copy the process lists as that can also change # we can safely iterate over the set without holding the lock.
with _bg_metrics_lock: with _bg_metrics_lock:
_background_processes_copy = { _background_processes_copy = _background_processes_active_since_last_scrape
k: list(v) for k, v in six.iteritems(_background_processes) _background_processes_active_since_last_scrape = set()
}
for desc, processes in six.iteritems(_background_processes_copy): for process in _background_processes_copy:
background_process_in_flight_count.add_metric((desc,), len(processes)) process.update_metrics()
for process in processes:
process.update_metrics()
yield background_process_in_flight_count
# now we need to run collect() over each of the static Counters, and # now we need to run collect() over each of the static Counters, and
# yield each metric they return. # yield each metric they return.
@ -191,13 +194,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
_background_process_counts[desc] = count + 1 _background_process_counts[desc] = count + 1
_background_process_start_count.labels(desc).inc() _background_process_start_count.labels(desc).inc()
_background_process_in_flight_count.labels(desc).inc()
with LoggingContext(desc) as context: with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count) context.request = "%s-%i" % (desc, count)
proc = _BackgroundProcess(desc, context)
with _bg_metrics_lock:
_background_processes.setdefault(desc, set()).add(proc)
try: try:
result = func(*args, **kwargs) result = func(*args, **kwargs)
@ -214,10 +214,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
except Exception: except Exception:
logger.exception("Background process '%s' threw an exception", desc) logger.exception("Background process '%s' threw an exception", desc)
finally: finally:
proc.update_metrics() _background_process_in_flight_count.labels(desc).dec()
with _bg_metrics_lock:
_background_processes[desc].remove(proc)
with PreserveLoggingContext(): with PreserveLoggingContext():
return run() return run()
@ -238,3 +235,42 @@ def wrap_as_background_process(desc):
return wrap_as_background_process_inner_2 return wrap_as_background_process_inner_2
return wrap_as_background_process_inner return wrap_as_background_process_inner
class BackgroundProcessLoggingContext(LoggingContext):
"""A logging context that tracks in flight metrics for background
processes.
"""
__slots__ = ["_proc"]
def __init__(self, name: str):
super().__init__(name)
self._proc = _BackgroundProcess(name, self)
def start(self, rusage: "Optional[resource._RUsage]"):
"""Log context has started running (again).
"""
super().start(rusage)
# We've become active again so we make sure we're in the list of active
# procs. (Note that "start" here means we've become active, as opposed
# to starting for the first time.)
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.add(self._proc)
def __exit__(self, type, value, traceback) -> None:
"""Log context has finished.
"""
super().__exit__(type, value, traceback)
# The background process has finished. We explictly remove and manually
# update the metrics here so that if nothing is scraping metrics the set
# doesn't infinitely grow.
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.discard(self._proc)
self._proc.update_metrics()