Change CacheMetrics to be quicker

We change it so that each cache has an individual CacheMetric, instead
of having one global CacheMetric. This means that when a cache tries to
increment a counter it does not need to go through so many indirections.
erikj/timings
Erik Johnston 2016-06-02 11:29:44 +01:00
parent 9e54d865e6
commit 266eb3bf26
8 changed files with 74 additions and 66 deletions

View File

@ -33,11 +33,7 @@ from .metric import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# We'll keep all the available metrics in a single toplevel dict, one shared all_metrics = []
# for the entire process. We don't currently support per-HomeServer instances
# of metrics, because in practice any one python VM will host only one
# HomeServer anyway. This makes a lot of implementation neater
all_metrics = {}
class Metrics(object): class Metrics(object):
@ -53,7 +49,7 @@ class Metrics(object):
metric = metric_class(full_name, *args, **kwargs) metric = metric_class(full_name, *args, **kwargs)
all_metrics[full_name] = metric all_metrics.append(metric)
return metric return metric
def register_counter(self, *args, **kwargs): def register_counter(self, *args, **kwargs):
@ -84,12 +80,12 @@ def render_all():
# TODO(paul): Internal hack # TODO(paul): Internal hack
update_resource_metrics() update_resource_metrics()
for name in sorted(all_metrics.keys()): for metric in all_metrics:
try: try:
strs += all_metrics[name].render() strs += metric.render()
except Exception: except Exception:
strs += ["# FAILED to render %s" % name] strs += ["# FAILED to render"]
logger.exception("Failed to render %s metric", name) logger.exception("Failed to render metric")
strs.append("") # to generate a final CRLF strs.append("") # to generate a final CRLF

View File

@ -48,9 +48,6 @@ class BaseMetric(object):
for k, v in zip(self.labels, values)]) for k, v in zip(self.labels, values)])
) )
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CounterMetric(BaseMetric): class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing """The simplest kind of metric; one that stores a monotonically-increasing
@ -83,6 +80,9 @@ class CounterMetric(BaseMetric):
def render_item(self, k): def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CallbackMetric(BaseMetric): class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever """A metric that returns the numeric value returned by a callback whenever
@ -132,26 +132,30 @@ class CacheMetric(object):
This metric generates standard metric name pairs, so that monitoring rules This metric generates standard metric name pairs, so that monitoring rules
can easily be applied to measure hit ratio.""" can easily be applied to measure hit ratio."""
__slots__ = ("name", "hits", "total", "size") __slots__ = ("name", "cache_name", "hits", "misses", "size_callback")
def __init__(self, name, size_callback, labels=[]): def __init__(self, name, size_callback, cache_name):
self.name = name self.name = name
self.cache_name = cache_name
self.hits = CounterMetric(name + ":hits", labels=labels) self.hits = 0
self.total = CounterMetric(name + ":total", labels=labels) self.misses = 0
self.size = CallbackMetric( self.size_callback = size_callback
name + ":size",
callback=size_callback,
labels=labels,
)
def inc_hits(self, *values): def inc_hits(self):
self.hits.inc(*values) self.hits += 1
self.total.inc(*values)
def inc_misses(self, *values): def inc_misses(self):
self.total.inc(*values) self.misses += 1
def render(self): def render(self):
return self.hits.render() + self.total.render() + self.size.render() size = self.size_callback()
hits = self.hits
total = self.misses + self.hits
return [
"""%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
"""%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
"""%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
]

View File

@ -24,11 +24,21 @@ DEBUG_CACHES = False
metrics = synapse.metrics.get_metrics_for("synapse.util.caches") metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {} caches_by_name = {}
cache_counter = metrics.register_cache( # cache_counter = metrics.register_cache(
"cache", # "cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, # lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"], # labels=["name"],
) # )
def register_cache(name, cache):
caches_by_name[name] = cache
return metrics.register_cache(
"cache",
lambda: len(cache),
name,
)
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache caches_by_name["string_cache"] = _string_cache

View File

@ -22,7 +22,7 @@ from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
) )
from . import caches_by_name, DEBUG_CACHES, cache_counter from . import DEBUG_CACHES, register_cache
from twisted.internet import defer from twisted.internet import defer
@ -51,6 +51,7 @@ class Cache(object):
"keylen", "keylen",
"sequence", "sequence",
"thread", "thread",
"metrics",
) )
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
@ -68,7 +69,7 @@ class Cache(object):
self.keylen = keylen self.keylen = keylen
self.sequence = 0 self.sequence = 0
self.thread = None self.thread = None
caches_by_name[name] = self.cache self.metrics = register_cache(name, self.cache)
def check_thread(self): def check_thread(self):
expected_thread = self.thread expected_thread = self.thread
@ -83,10 +84,10 @@ class Cache(object):
def get(self, key, default=_CacheSentinel): def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel) val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel: if val is not _CacheSentinel:
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
return val return val
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
if default is _CacheSentinel: if default is _CacheSentinel:
raise KeyError() raise KeyError()

View File

@ -15,7 +15,7 @@
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from collections import namedtuple from collections import namedtuple
from . import caches_by_name, cache_counter from . import register_cache
import threading import threading
import logging import logging
@ -43,7 +43,7 @@ class DictionaryCache(object):
__slots__ = [] __slots__ = []
self.sentinel = Sentinel() self.sentinel = Sentinel()
caches_by_name[name] = self.cache self.metrics = register_cache(name, self.cache)
def check_thread(self): def check_thread(self):
expected_thread = self.thread expected_thread = self.thread
@ -58,7 +58,7 @@ class DictionaryCache(object):
def get(self, key, dict_keys=None): def get(self, key, dict_keys=None):
entry = self.cache.get(key, self.sentinel) entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel: if entry is not self.sentinel:
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
if dict_keys is None: if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value)) return DictionaryEntry(entry.full, dict(entry.value))
@ -69,7 +69,7 @@ class DictionaryCache(object):
if k in entry.value if k in entry.value
}) })
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return DictionaryEntry(False, {}) return DictionaryEntry(False, {})
def invalidate(self, key): def invalidate(self, key):

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name from synapse.util.caches import register_cache
import logging import logging
@ -49,7 +49,7 @@ class ExpiringCache(object):
self._cache = {} self._cache = {}
caches_by_name[cache_name] = self._cache self.metrics = register_cache(cache_name, self._cache)
def start(self): def start(self):
if not self._expiry_ms: if not self._expiry_ms:
@ -78,9 +78,9 @@ class ExpiringCache(object):
def __getitem__(self, key): def __getitem__(self, key):
try: try:
entry = self._cache[key] entry = self._cache[key]
cache_counter.inc_hits(self._cache_name) self.metrics.inc_hits()
except KeyError: except KeyError:
cache_counter.inc_misses(self._cache_name) self.metrics.inc_misses()
raise raise
if self._reset_expiry_on_get: if self._reset_expiry_on_get:

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name from synapse.util.caches import register_cache
from blist import sorteddict from blist import sorteddict
@ -42,7 +42,7 @@ class StreamChangeCache(object):
self._cache = sorteddict() self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos self._earliest_known_stream_pos = current_stream_pos
self.name = name self.name = name
caches_by_name[self.name] = self._cache self.metrics = register_cache(self.name, self._cache)
for entity, stream_pos in prefilled_cache.items(): for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos) self.entity_has_changed(entity, stream_pos)
@ -53,19 +53,19 @@ class StreamChangeCache(object):
assert type(stream_pos) is int assert type(stream_pos) is int
if stream_pos < self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return True return True
latest_entity_change_pos = self._entity_to_key.get(entity, None) latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None: if latest_entity_change_pos is None:
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
return False return False
if stream_pos < latest_entity_change_pos: if stream_pos < latest_entity_change_pos:
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return True return True
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
return False return False
def get_entities_changed(self, entities, stream_pos): def get_entities_changed(self, entities, stream_pos):
@ -82,10 +82,10 @@ class StreamChangeCache(object):
self._cache[k] for k in keys[i:] self._cache[k] for k in keys[i:]
).intersection(entities) ).intersection(entities)
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
else: else:
result = entities result = entities
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return result return result

View File

@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase):
'vector{method="PUT"} 1', 'vector{method="PUT"} 1',
]) ])
# Check that passing too few values errors
self.assertRaises(ValueError, counter.inc)
class CallbackMetricTestCase(unittest.TestCase): class CallbackMetricTestCase(unittest.TestCase):
@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase):
def test_cache(self): def test_cache(self):
d = dict() d = dict()
metric = CacheMetric("cache", lambda: len(d)) metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [ self.assertEquals(metric.render(), [
'cache:hits 0', 'cache:hits{name="cache_name"} 0',
'cache:total 0', 'cache:total{name="cache_name"} 0',
'cache:size 0', 'cache:size{name="cache_name"} 0',
]) ])
metric.inc_misses() metric.inc_misses()
d["key"] = "value" d["key"] = "value"
self.assertEquals(metric.render(), [ self.assertEquals(metric.render(), [
'cache:hits 0', 'cache:hits{name="cache_name"} 0',
'cache:total 1', 'cache:total{name="cache_name"} 1',
'cache:size 1', 'cache:size{name="cache_name"} 1',
]) ])
metric.inc_hits() metric.inc_hits()
self.assertEquals(metric.render(), [ self.assertEquals(metric.render(), [
'cache:hits 1', 'cache:hits{name="cache_name"} 1',
'cache:total 2', 'cache:total{name="cache_name"} 2',
'cache:size 1', 'cache:size{name="cache_name"} 1',
]) ])