Make the in flight background process metrics thread safe
parent
324525f40c
commit
1058d14127
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import six
|
||||
import threading
|
||||
|
||||
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
|
||||
|
||||
|
@ -78,6 +79,9 @@ _background_process_counts = dict() # type: dict[str, int]
|
|||
# of process descriptions that no longer have any active processes.
|
||||
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
|
||||
|
||||
# A lock that covers the above dicts
|
||||
_bg_metrics_lock = threading.Lock()
|
||||
|
||||
|
||||
class _Collector(object):
|
||||
"""A custom metrics collector for the background process metrics.
|
||||
|
@ -92,7 +96,11 @@ class _Collector(object):
|
|||
labels=["name"],
|
||||
)
|
||||
|
||||
for desc, processes in six.iteritems(_background_processes):
|
||||
# We copy the dict so that it doesn't change from underneath us
|
||||
with _bg_metrics_lock:
|
||||
_background_processes_copy = dict(_background_processes)
|
||||
|
||||
for desc, processes in six.iteritems(_background_processes_copy):
|
||||
background_process_in_flight_count.add_metric(
|
||||
(desc,), len(processes),
|
||||
)
|
||||
|
@ -167,19 +175,26 @@ def run_as_background_process(desc, func, *args, **kwargs):
|
|||
"""
|
||||
@defer.inlineCallbacks
|
||||
def run():
|
||||
count = _background_process_counts.get(desc, 0)
|
||||
_background_process_counts[desc] = count + 1
|
||||
with _bg_metrics_lock:
|
||||
count = _background_process_counts.get(desc, 0)
|
||||
_background_process_counts[desc] = count + 1
|
||||
|
||||
_background_process_start_count.labels(desc).inc()
|
||||
|
||||
with LoggingContext(desc) as context:
|
||||
context.request = "%s-%i" % (desc, count)
|
||||
proc = _BackgroundProcess(desc, context)
|
||||
_background_processes.setdefault(desc, set()).add(proc)
|
||||
|
||||
with _bg_metrics_lock:
|
||||
_background_processes.setdefault(desc, set()).add(proc)
|
||||
|
||||
try:
|
||||
yield func(*args, **kwargs)
|
||||
finally:
|
||||
proc.update_metrics()
|
||||
_background_processes[desc].remove(proc)
|
||||
|
||||
with _bg_metrics_lock:
|
||||
_background_processes[desc].remove(proc)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
return run()
|
||||
|
|
Loading…
Reference in New Issue