Merge pull request #3090 from matrix-org/erikj/processed_event_lag
Add metrics for event processing lagerikj/state_delta_writeup
commit
0f13f30fca
|
@ -233,12 +233,27 @@ class TransactionQueue(object):
|
||||||
consumeErrors=True
|
consumeErrors=True
|
||||||
))
|
))
|
||||||
|
|
||||||
events_processed_counter.inc_by(len(events))
|
|
||||||
|
|
||||||
yield self.store.update_federation_out_pos(
|
yield self.store.update_federation_out_pos(
|
||||||
"events", next_token
|
"events", next_token
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if events:
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
ts = yield self.store.get_received_ts(events[-1].event_id)
|
||||||
|
|
||||||
|
synapse.metrics.event_processing_lag.set(
|
||||||
|
now - ts, "federation_sender",
|
||||||
|
)
|
||||||
|
synapse.metrics.event_processing_last_ts.set(
|
||||||
|
ts, "federation_sender",
|
||||||
|
)
|
||||||
|
|
||||||
|
events_processed_counter.inc_by(len(events))
|
||||||
|
|
||||||
|
synapse.metrics.event_processing_positions.set(
|
||||||
|
next_token, "federation_sender",
|
||||||
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
|
|
|
@ -125,9 +125,23 @@ class ApplicationServicesHandler(object):
|
||||||
for evs in events_by_room.itervalues()
|
for evs in events_by_room.itervalues()
|
||||||
], consumeErrors=True))
|
], consumeErrors=True))
|
||||||
|
|
||||||
|
yield self.store.set_appservice_last_pos(upper_bound)
|
||||||
|
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
ts = yield self.store.get_received_ts(events[-1].event_id)
|
||||||
|
|
||||||
|
synapse.metrics.event_processing_positions.set(
|
||||||
|
upper_bound, "appservice_sender",
|
||||||
|
)
|
||||||
|
|
||||||
events_processed_counter.inc_by(len(events))
|
events_processed_counter.inc_by(len(events))
|
||||||
|
|
||||||
yield self.store.set_appservice_last_pos(upper_bound)
|
synapse.metrics.event_processing_lag.set(
|
||||||
|
now - ts, "appservice_sender",
|
||||||
|
)
|
||||||
|
synapse.metrics.event_processing_last_ts.set(
|
||||||
|
ts, "appservice_sender",
|
||||||
|
)
|
||||||
finally:
|
finally:
|
||||||
self.is_processing = False
|
self.is_processing = False
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ from twisted.internet import reactor
|
||||||
|
|
||||||
from .metric import (
|
from .metric import (
|
||||||
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
|
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
|
||||||
MemoryUsageMetric,
|
MemoryUsageMetric, GaugeMetric,
|
||||||
)
|
)
|
||||||
from .process_collector import register_process_collector
|
from .process_collector import register_process_collector
|
||||||
|
|
||||||
|
@ -65,6 +65,13 @@ class Metrics(object):
|
||||||
"""
|
"""
|
||||||
return self._register(CounterMetric, *args, **kwargs)
|
return self._register(CounterMetric, *args, **kwargs)
|
||||||
|
|
||||||
|
def register_gauge(self, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Returns:
|
||||||
|
GaugeMetric
|
||||||
|
"""
|
||||||
|
return self._register(GaugeMetric, *args, **kwargs)
|
||||||
|
|
||||||
def register_callback(self, *args, **kwargs):
|
def register_callback(self, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -144,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
|
||||||
tick_time = reactor_metrics.register_distribution("tick_time")
|
tick_time = reactor_metrics.register_distribution("tick_time")
|
||||||
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
|
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
|
||||||
|
|
||||||
|
synapse_metrics = get_metrics_for("synapse")
|
||||||
|
|
||||||
|
# Used to track where various components have processed in the event stream,
|
||||||
|
# e.g. federation sending, appservice sending, etc.
|
||||||
|
event_processing_positions = synapse_metrics.register_gauge(
|
||||||
|
"event_processing_positions", labels=["name"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Used to track the current max events stream position
|
||||||
|
event_persisted_position = synapse_metrics.register_gauge(
|
||||||
|
"event_persisted_position",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Used to track the received_ts of the last event processed by various
|
||||||
|
# components
|
||||||
|
event_processing_last_ts = synapse_metrics.register_gauge(
|
||||||
|
"event_processing_last_ts", labels=["name"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Used to track the lag processing events. This is the time difference
|
||||||
|
# between the last processed event's received_ts and the time it was
|
||||||
|
# finished being processed.
|
||||||
|
event_processing_lag = synapse_metrics.register_gauge(
|
||||||
|
"event_processing_lag", labels=["name"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def runUntilCurrentTimer(func):
|
def runUntilCurrentTimer(func):
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
|
||||||
# dict[list[str]]: value for each set of label values. the keys are the
|
# dict[list[str]]: value for each set of label values. the keys are the
|
||||||
# label values, in the same order as the labels in self.labels.
|
# label values, in the same order as the labels in self.labels.
|
||||||
#
|
#
|
||||||
# (if the metric is a scalar, the (single) key is the empty list).
|
# (if the metric is a scalar, the (single) key is the empty tuple).
|
||||||
self.counts = {}
|
self.counts = {}
|
||||||
|
|
||||||
# Scalar metrics are never empty
|
# Scalar metrics are never empty
|
||||||
|
@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GaugeMetric(BaseMetric):
|
||||||
|
"""A metric that can go up or down
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(GaugeMetric, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
# dict[list[str]]: value for each set of label values. the keys are the
|
||||||
|
# label values, in the same order as the labels in self.labels.
|
||||||
|
#
|
||||||
|
# (if the metric is a scalar, the (single) key is the empty tuple).
|
||||||
|
self.guages = {}
|
||||||
|
|
||||||
|
def set(self, v, *values):
|
||||||
|
if len(values) != self.dimension():
|
||||||
|
raise ValueError(
|
||||||
|
"Expected as many values to inc() as labels (%d)" % (self.dimension())
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: should assert that the tag values are all strings
|
||||||
|
|
||||||
|
self.guages[values] = v
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
return flatten(
|
||||||
|
self._render_for_labels(k, self.guages[k])
|
||||||
|
for k in sorted(self.guages.keys())
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class CallbackMetric(BaseMetric):
|
class CallbackMetric(BaseMetric):
|
||||||
"""A metric that returns the numeric value returned by a callback whenever
|
"""A metric that returns the numeric value returned by a callback whenever
|
||||||
it is rendered. Typically this is used to implement gauges that yield the
|
it is rendered. Typically this is used to implement gauges that yield the
|
||||||
|
|
|
@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore):
|
||||||
new_forward_extremeties=new_forward_extremeties,
|
new_forward_extremeties=new_forward_extremeties,
|
||||||
)
|
)
|
||||||
persist_event_counter.inc_by(len(chunk))
|
persist_event_counter.inc_by(len(chunk))
|
||||||
|
synapse.metrics.event_persisted_position.set(
|
||||||
|
chunk[-1][0].internal_metadata.stream_ordering,
|
||||||
|
)
|
||||||
for event, context in chunk:
|
for event, context in chunk:
|
||||||
if context.app_service:
|
if context.app_service:
|
||||||
origin_type = "local"
|
origin_type = "local"
|
||||||
|
|
|
@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
|
||||||
|
|
||||||
|
|
||||||
class EventsWorkerStore(SQLBaseStore):
|
class EventsWorkerStore(SQLBaseStore):
|
||||||
|
def get_received_ts(self, event_id):
|
||||||
|
"""Get received_ts (when it was persisted) for the event.
|
||||||
|
|
||||||
|
Raises an exception for unknown events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_id (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[int|None]: Timestamp in milliseconds, or None for events
|
||||||
|
that were persisted before received_ts was implemented.
|
||||||
|
"""
|
||||||
|
return self._simple_select_one_onecol(
|
||||||
|
table="events",
|
||||||
|
keyvalues={
|
||||||
|
"event_id": event_id,
|
||||||
|
},
|
||||||
|
retcol="received_ts",
|
||||||
|
desc="get_received_ts",
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_event(self, event_id, check_redacted=True,
|
def get_event(self, event_id, check_redacted=True,
|
||||||
|
|
|
@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||||
self.mock_scheduler = Mock()
|
self.mock_scheduler = Mock()
|
||||||
hs = Mock()
|
hs = Mock()
|
||||||
hs.get_datastore = Mock(return_value=self.mock_store)
|
hs.get_datastore = Mock(return_value=self.mock_store)
|
||||||
|
self.mock_store.get_received_ts.return_value = 0
|
||||||
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
|
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
|
||||||
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
|
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
|
||||||
hs.get_clock.return_value = MockClock()
|
hs.get_clock.return_value = MockClock()
|
||||||
|
|
Loading…
Reference in New Issue