285 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			285 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright 2015, 2016 OpenMarket Ltd
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import functools
 | |
| import gc
 | |
| import logging
 | |
| import os
 | |
| import platform
 | |
| import time
 | |
| 
 | |
| import attr
 | |
| from prometheus_client import Counter, Gauge, Histogram
 | |
| from prometheus_client.core import REGISTRY, GaugeMetricFamily
 | |
| 
 | |
| from twisted.internet import reactor
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| running_on_pypy = platform.python_implementation() == "PyPy"
 | |
| all_metrics = []
 | |
| all_collectors = []
 | |
| all_gauges = {}
 | |
| 
 | |
| HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 | |
| 
 | |
| 
 | |
| class RegistryProxy(object):
 | |
| 
 | |
|     @staticmethod
 | |
|     def collect():
 | |
|         for metric in REGISTRY.collect():
 | |
|             if not metric.name.startswith("__"):
 | |
|                 yield metric
 | |
| 
 | |
| 
 | |
| @attr.s(hash=True)
 | |
| class LaterGauge(object):
 | |
| 
 | |
|     name = attr.ib()
 | |
|     desc = attr.ib()
 | |
|     labels = attr.ib(hash=False)
 | |
|     caller = attr.ib()
 | |
| 
 | |
|     def collect(self):
 | |
| 
 | |
|         g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
 | |
| 
 | |
|         try:
 | |
|             calls = self.caller()
 | |
|         except Exception:
 | |
|             logger.exception(
 | |
|                 "Exception running callback for LaterGauge(%s)",
 | |
|                 self.name,
 | |
|             )
 | |
|             yield g
 | |
|             return
 | |
| 
 | |
|         if isinstance(calls, dict):
 | |
|             for k, v in calls.items():
 | |
|                 g.add_metric(k, v)
 | |
|         else:
 | |
|             g.add_metric([], calls)
 | |
| 
 | |
|         yield g
 | |
| 
 | |
|     def __attrs_post_init__(self):
 | |
|         self._register()
 | |
| 
 | |
|     def _register(self):
 | |
|         if self.name in all_gauges.keys():
 | |
|             logger.warning("%s already registered, reregistering" % (self.name,))
 | |
|             REGISTRY.unregister(all_gauges.pop(self.name))
 | |
| 
 | |
|         REGISTRY.register(self)
 | |
|         all_gauges[self.name] = self
 | |
| 
 | |
| 
 | |
| #
 | |
| # Detailed CPU metrics
 | |
| #
 | |
| 
 | |
| class CPUMetrics(object):
 | |
| 
 | |
|     def __init__(self):
 | |
|         ticks_per_sec = 100
 | |
|         try:
 | |
|             # Try and get the system config
 | |
|             ticks_per_sec = os.sysconf('SC_CLK_TCK')
 | |
|         except (ValueError, TypeError, AttributeError):
 | |
|             pass
 | |
| 
 | |
|         self.ticks_per_sec = ticks_per_sec
 | |
| 
 | |
|     def collect(self):
 | |
|         if not HAVE_PROC_SELF_STAT:
 | |
|             return
 | |
| 
 | |
|         with open("/proc/self/stat") as s:
 | |
|             line = s.read()
 | |
|             raw_stats = line.split(") ", 1)[1].split(" ")
 | |
| 
 | |
|             user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
 | |
|             user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
 | |
|             yield user
 | |
| 
 | |
|             sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
 | |
|             sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
 | |
|             yield sys
 | |
| 
 | |
| 
 | |
| REGISTRY.register(CPUMetrics())
 | |
| 
 | |
| #
 | |
| # Python GC metrics
 | |
| #
 | |
| 
 | |
| gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
 | |
| gc_time = Histogram(
 | |
|     "python_gc_time",
 | |
|     "Time taken to GC (sec)",
 | |
|     ["gen"],
 | |
|     buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
 | |
|              5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
 | |
| )
 | |
| 
 | |
| 
 | |
| class GCCounts(object):
 | |
| 
 | |
|     def collect(self):
 | |
|         cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
 | |
|         for n, m in enumerate(gc.get_count()):
 | |
|             cm.add_metric([str(n)], m)
 | |
| 
 | |
|         yield cm
 | |
| 
 | |
| 
 | |
| if not running_on_pypy:
 | |
|     REGISTRY.register(GCCounts())
 | |
| 
 | |
| #
 | |
| # Twisted reactor metrics
 | |
| #
 | |
| 
 | |
| tick_time = Histogram(
 | |
|     "python_twisted_reactor_tick_time",
 | |
|     "Tick time of the Twisted reactor (sec)",
 | |
|     buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
 | |
| )
 | |
| pending_calls_metric = Histogram(
 | |
|     "python_twisted_reactor_pending_calls",
 | |
|     "Pending calls",
 | |
|     buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
 | |
| )
 | |
| 
 | |
| #
 | |
| # Federation Metrics
 | |
| #
 | |
| 
 | |
| sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
 | |
| 
 | |
| sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
 | |
| 
 | |
| events_processed_counter = Counter("synapse_federation_client_events_processed", "")
 | |
| 
 | |
| # Used to track where various components have processed in the event stream,
 | |
| # e.g. federation sending, appservice sending, etc.
 | |
| event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
 | |
| 
 | |
| # Used to track the current max events stream position
 | |
| event_persisted_position = Gauge("synapse_event_persisted_position", "")
 | |
| 
 | |
| # Used to track the received_ts of the last event processed by various
 | |
| # components
 | |
| event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
 | |
| 
 | |
| # Used to track the lag processing events. This is the time difference
 | |
| # between the last processed event's received_ts and the time it was
 | |
| # finished being processed.
 | |
| event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 | |
| 
 | |
| last_ticked = time.time()
 | |
| 
 | |
| 
 | |
| class ReactorLastSeenMetric(object):
 | |
| 
 | |
|     def collect(self):
 | |
|         cm = GaugeMetricFamily(
 | |
|             "python_twisted_reactor_last_seen",
 | |
|             "Seconds since the Twisted reactor was last seen",
 | |
|         )
 | |
|         cm.add_metric([], time.time() - last_ticked)
 | |
|         yield cm
 | |
| 
 | |
| 
 | |
| REGISTRY.register(ReactorLastSeenMetric())
 | |
| 
 | |
| 
 | |
| def runUntilCurrentTimer(func):
 | |
| 
 | |
|     @functools.wraps(func)
 | |
|     def f(*args, **kwargs):
 | |
|         now = reactor.seconds()
 | |
|         num_pending = 0
 | |
| 
 | |
|         # _newTimedCalls is one long list of *all* pending calls. Below loop
 | |
|         # is based off of impl of reactor.runUntilCurrent
 | |
|         for delayed_call in reactor._newTimedCalls:
 | |
|             if delayed_call.time > now:
 | |
|                 break
 | |
| 
 | |
|             if delayed_call.delayed_time > 0:
 | |
|                 continue
 | |
| 
 | |
|             num_pending += 1
 | |
| 
 | |
|         num_pending += len(reactor.threadCallQueue)
 | |
|         start = time.time()
 | |
|         ret = func(*args, **kwargs)
 | |
|         end = time.time()
 | |
| 
 | |
|         # record the amount of wallclock time spent running pending calls.
 | |
|         # This is a proxy for the actual amount of time between reactor polls,
 | |
|         # since about 25% of time is actually spent running things triggered by
 | |
|         # I/O events, but that is harder to capture without rewriting half the
 | |
|         # reactor.
 | |
|         tick_time.observe(end - start)
 | |
|         pending_calls_metric.observe(num_pending)
 | |
| 
 | |
|         # Update the time we last ticked, for the metric to test whether
 | |
|         # Synapse's reactor has frozen
 | |
|         global last_ticked
 | |
|         last_ticked = end
 | |
| 
 | |
|         if running_on_pypy:
 | |
|             return ret
 | |
| 
 | |
|         # Check if we need to do a manual GC (since its been disabled), and do
 | |
|         # one if necessary.
 | |
|         threshold = gc.get_threshold()
 | |
|         counts = gc.get_count()
 | |
|         for i in (2, 1, 0):
 | |
|             if threshold[i] < counts[i]:
 | |
|                 logger.info("Collecting gc %d", i)
 | |
| 
 | |
|                 start = time.time()
 | |
|                 unreachable = gc.collect(i)
 | |
|                 end = time.time()
 | |
| 
 | |
|                 gc_time.labels(i).observe(end - start)
 | |
|                 gc_unreachable.labels(i).set(unreachable)
 | |
| 
 | |
|         return ret
 | |
| 
 | |
|     return f
 | |
| 
 | |
| 
 | |
| try:
 | |
|     # Ensure the reactor has all the attributes we expect
 | |
|     reactor.runUntilCurrent
 | |
|     reactor._newTimedCalls
 | |
|     reactor.threadCallQueue
 | |
| 
 | |
|     # runUntilCurrent is called when we have pending calls. It is called once
 | |
|     # per iteratation after fd polling.
 | |
|     reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
 | |
| 
 | |
|     # We manually run the GC each reactor tick so that we can get some metrics
 | |
|     # about time spent doing GC,
 | |
|     if not running_on_pypy:
 | |
|         gc.disable()
 | |
| except AttributeError:
 | |
|     pass
 |