189 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			189 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2015, 2016 OpenMarket Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import logging
 | 
						|
import functools
 | 
						|
import time
 | 
						|
import gc
 | 
						|
 | 
						|
from twisted.internet import reactor
 | 
						|
 | 
						|
from .metric import (
 | 
						|
    CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
 | 
						|
    MemoryUsageMetric,
 | 
						|
)
 | 
						|
from .process_collector import register_process_collector
 | 
						|
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
all_metrics = []
 | 
						|
all_collectors = []
 | 
						|
 | 
						|
 | 
						|
class Metrics(object):
 | 
						|
    """ A single Metrics object gives a (mutable) slice view of the all_metrics
 | 
						|
    dict, allowing callers to easily register new metrics that are namespaced
 | 
						|
    nicely."""
 | 
						|
 | 
						|
    def __init__(self, name):
 | 
						|
        self.name_prefix = name
 | 
						|
 | 
						|
    def make_subspace(self, name):
 | 
						|
        return Metrics("%s_%s" % (self.name_prefix, name))
 | 
						|
 | 
						|
    def register_collector(self, func):
 | 
						|
        all_collectors.append(func)
 | 
						|
 | 
						|
    def _register(self, metric_class, name, *args, **kwargs):
 | 
						|
        full_name = "%s_%s" % (self.name_prefix, name)
 | 
						|
 | 
						|
        metric = metric_class(full_name, *args, **kwargs)
 | 
						|
 | 
						|
        all_metrics.append(metric)
 | 
						|
        return metric
 | 
						|
 | 
						|
    def register_counter(self, *args, **kwargs):
 | 
						|
        return self._register(CounterMetric, *args, **kwargs)
 | 
						|
 | 
						|
    def register_callback(self, *args, **kwargs):
 | 
						|
        return self._register(CallbackMetric, *args, **kwargs)
 | 
						|
 | 
						|
    def register_distribution(self, *args, **kwargs):
 | 
						|
        return self._register(DistributionMetric, *args, **kwargs)
 | 
						|
 | 
						|
    def register_cache(self, *args, **kwargs):
 | 
						|
        return self._register(CacheMetric, *args, **kwargs)
 | 
						|
 | 
						|
 | 
						|
def register_memory_metrics(hs):
 | 
						|
    try:
 | 
						|
        import psutil
 | 
						|
        process = psutil.Process()
 | 
						|
        process.memory_info().rss
 | 
						|
    except (ImportError, AttributeError):
 | 
						|
        logger.warn(
 | 
						|
            "psutil is not installed or incorrect version."
 | 
						|
            " Disabling memory metrics."
 | 
						|
        )
 | 
						|
        return
 | 
						|
    metric = MemoryUsageMetric(hs, psutil)
 | 
						|
    all_metrics.append(metric)
 | 
						|
 | 
						|
 | 
						|
def get_metrics_for(pkg_name):
 | 
						|
    """ Returns a Metrics instance for conveniently creating metrics
 | 
						|
    namespaced with the given name prefix. """
 | 
						|
 | 
						|
    # Convert a "package.name" to "package_name" because Prometheus doesn't
 | 
						|
    # let us use . in metric names
 | 
						|
    return Metrics(pkg_name.replace(".", "_"))
 | 
						|
 | 
						|
 | 
						|
def render_all():
 | 
						|
    strs = []
 | 
						|
 | 
						|
    for collector in all_collectors:
 | 
						|
        collector()
 | 
						|
 | 
						|
    for metric in all_metrics:
 | 
						|
        try:
 | 
						|
            strs += metric.render()
 | 
						|
        except Exception:
 | 
						|
            strs += ["# FAILED to render"]
 | 
						|
            logger.exception("Failed to render metric")
 | 
						|
 | 
						|
    strs.append("")  # to generate a final CRLF
 | 
						|
 | 
						|
    return "\n".join(strs)
 | 
						|
 | 
						|
 | 
						|
reactor_metrics = get_metrics_for("reactor")
 | 
						|
tick_time = reactor_metrics.register_distribution("tick_time")
 | 
						|
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
 | 
						|
 | 
						|
gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
 | 
						|
gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"])
 | 
						|
 | 
						|
reactor_metrics.register_callback(
 | 
						|
    "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
 | 
						|
)
 | 
						|
 | 
						|
register_process_collector(get_metrics_for("process"))
 | 
						|
 | 
						|
 | 
						|
def runUntilCurrentTimer(func):
 | 
						|
 | 
						|
    @functools.wraps(func)
 | 
						|
    def f(*args, **kwargs):
 | 
						|
        now = reactor.seconds()
 | 
						|
        num_pending = 0
 | 
						|
 | 
						|
        # _newTimedCalls is one long list of *all* pending calls. Below loop
 | 
						|
        # is based off of impl of reactor.runUntilCurrent
 | 
						|
        for delayed_call in reactor._newTimedCalls:
 | 
						|
            if delayed_call.time > now:
 | 
						|
                break
 | 
						|
 | 
						|
            if delayed_call.delayed_time > 0:
 | 
						|
                continue
 | 
						|
 | 
						|
            num_pending += 1
 | 
						|
 | 
						|
        num_pending += len(reactor.threadCallQueue)
 | 
						|
 | 
						|
        start = time.time() * 1000
 | 
						|
        ret = func(*args, **kwargs)
 | 
						|
        end = time.time() * 1000
 | 
						|
        tick_time.inc_by(end - start)
 | 
						|
        pending_calls_metric.inc_by(num_pending)
 | 
						|
 | 
						|
        # Check if we need to do a manual GC (since its been disabled), and do
 | 
						|
        # one if necessary.
 | 
						|
        threshold = gc.get_threshold()
 | 
						|
        counts = gc.get_count()
 | 
						|
        for i in (2, 1, 0):
 | 
						|
            if threshold[i] < counts[i]:
 | 
						|
                logger.info("Collecting gc %d", i)
 | 
						|
 | 
						|
                start = time.time() * 1000
 | 
						|
                unreachable = gc.collect(i)
 | 
						|
                end = time.time() * 1000
 | 
						|
 | 
						|
                gc_time.inc_by(end - start, i)
 | 
						|
                gc_unreachable.inc_by(unreachable, i)
 | 
						|
 | 
						|
        return ret
 | 
						|
 | 
						|
    return f
 | 
						|
 | 
						|
 | 
						|
try:
 | 
						|
    # Ensure the reactor has all the attributes we expect
 | 
						|
    reactor.runUntilCurrent
 | 
						|
    reactor._newTimedCalls
 | 
						|
    reactor.threadCallQueue
 | 
						|
 | 
						|
    # runUntilCurrent is called when we have pending calls. It is called once
 | 
						|
    # per iteratation after fd polling.
 | 
						|
    reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
 | 
						|
 | 
						|
    # We manually run the GC each reactor tick so that we can get some metrics
 | 
						|
    # about time spent doing GC,
 | 
						|
    gc.disable()
 | 
						|
except AttributeError:
 | 
						|
    pass
 |