# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import functools import gc import logging import os import platform import time import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import REGISTRY, GaugeMetricFamily from twisted.internet import reactor logger = logging.getLogger(__name__) running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] all_gauges = {} HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") class RegistryProxy(object): @staticmethod def collect(): for metric in REGISTRY.collect(): if not metric.name.startswith("__"): yield metric @attr.s(hash=True) class LaterGauge(object): name = attr.ib() desc = attr.ib() labels = attr.ib(hash=False) caller = attr.ib() def collect(self): g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) try: calls = self.caller() except Exception: logger.exception( "Exception running callback for LaterGauge(%s)", self.name, ) yield g return if isinstance(calls, dict): for k, v in calls.items(): g.add_metric(k, v) else: g.add_metric([], calls) yield g def __attrs_post_init__(self): self._register() def _register(self): if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) REGISTRY.register(self) all_gauges[self.name] = self # # Detailed CPU metrics # class CPUMetrics(object): def __init__(self): ticks_per_sec = 100 try: # Try and get the system config ticks_per_sec = os.sysconf('SC_CLK_TCK') except (ValueError, TypeError, AttributeError): pass self.ticks_per_sec = ticks_per_sec def collect(self): if not HAVE_PROC_SELF_STAT: return with open("/proc/self/stat") as s: line = s.read() raw_stats = line.split(") ", 1)[1].split(" ") user = GaugeMetricFamily("process_cpu_user_seconds_total", "") user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) yield user sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) yield sys REGISTRY.register(CPUMetrics()) # # Python GC metrics # gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) gc_time = Histogram( "python_gc_time", "Time taken to GC (sec)", ["gen"], buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], ) class GCCounts(object): def collect(self): cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) yield cm if not running_on_pypy: REGISTRY.register(GCCounts()) # # Twisted reactor metrics # tick_time = Histogram( "python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (sec)", buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], ) pending_calls_metric = Histogram( "python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], ) # # Federation Metrics # sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") events_processed_counter = Counter("synapse_federation_client_events_processed", "") event_processing_loop_counter = Counter( "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"], ) event_processing_loop_room_count = Counter( "synapse_event_processing_loop_room_count", "Rooms seen per event processing loop iteration", ["name"], ) # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) # Used to track the current max events stream position event_persisted_position = Gauge("synapse_event_persisted_position", "") # Used to track the received_ts of the last event processed by various # components event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) # Used to track the lag processing events. This is the time difference # between the last processed event's received_ts and the time it was # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) last_ticked = time.time() class ReactorLastSeenMetric(object): def collect(self): cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", "Seconds since the Twisted reactor was last seen", ) cm.add_metric([], time.time() - last_ticked) yield cm REGISTRY.register(ReactorLastSeenMetric()) def runUntilCurrentTimer(func): @functools.wraps(func) def f(*args, **kwargs): now = reactor.seconds() num_pending = 0 # _newTimedCalls is one long list of *all* pending calls. Below loop # is based off of impl of reactor.runUntilCurrent for delayed_call in reactor._newTimedCalls: if delayed_call.time > now: break if delayed_call.delayed_time > 0: continue num_pending += 1 num_pending += len(reactor.threadCallQueue) start = time.time() ret = func(*args, **kwargs) end = time.time() # record the amount of wallclock time spent running pending calls. # This is a proxy for the actual amount of time between reactor polls, # since about 25% of time is actually spent running things triggered by # I/O events, but that is harder to capture without rewriting half the # reactor. tick_time.observe(end - start) pending_calls_metric.observe(num_pending) # Update the time we last ticked, for the metric to test whether # Synapse's reactor has frozen global last_ticked last_ticked = end if running_on_pypy: return ret # Check if we need to do a manual GC (since its been disabled), and do # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() for i in (2, 1, 0): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) start = time.time() unreachable = gc.collect(i) end = time.time() gc_time.labels(i).observe(end - start) gc_unreachable.labels(i).set(unreachable) return ret return f try: # Ensure the reactor has all the attributes we expect reactor.runUntilCurrent reactor._newTimedCalls reactor.threadCallQueue # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, if not running_on_pypy: gc.disable() except AttributeError: pass