From ab825aa328e089b28cf858332bb95f47e1490377 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:07:33 +0100 Subject: [PATCH 1/8] Add GaugeMetric --- synapse/metrics/__init__.py | 9 ++++++++- synapse/metrics/metric.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2ed82b04a5..2666f982a6 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -23,7 +23,7 @@ from twisted.internet import reactor from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, + MemoryUsageMetric, GaugeMetric, ) from .process_collector import register_process_collector @@ -65,6 +65,13 @@ class Metrics(object): """ return self._register(CounterMetric, *args, **kwargs) + def register_gauge(self, *args, **kwargs): + """ + Returns: + GaugeMetric + """ + return self._register(GaugeMetric, *args, **kwargs) + def register_callback(self, *args, **kwargs): """ Returns: diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index ff5aa8c0e1..9ebf2d6fdb 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -145,6 +145,36 @@ class CounterMetric(BaseMetric): ) +class GaugeMetric(BaseMetric): + """A metric that can go up or down + """ + + def __init__(self, *args, **kwargs): + super(GaugeMetric, self).__init__(*args, **kwargs) + + # dict[list[str]]: value for each set of label values. the keys are the + # label values, in the same order as the labels in self.labels. + # + # (if the metric is a scalar, the (single) key is the empty list). + self.guages = {} + + def set(self, v, *values): + if len(values) != self.dimension(): + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) + ) + + # TODO: should assert that the tag values are all strings + + self.guages[values] = v + + def render(self): + return flatten( + self._render_for_labels(k, self.guages[k]) + for k in sorted(self.guages.keys()) + ) + + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever it is rendered. Typically this is used to implement gauges that yield the From 92e34615c5c9ed5df2b0072a2686d3e42239e420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:07:51 +0100 Subject: [PATCH 2/8] Track where event stream processing have gotten up to --- synapse/federation/transaction_queue.py | 4 ++++ synapse/handlers/appservice.py | 4 ++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events.py | 3 +++ 4 files changed, 24 insertions(+) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5b0b798e57..d68f0d8950 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -239,6 +239,10 @@ class TransactionQueue(object): "events", next_token ) + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..2a7e31e711 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -111,6 +111,10 @@ class ApplicationServicesHandler(object): events_processed_counter.inc_by(len(events)) yield self.store.set_appservice_last_pos(upper_bound) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2666f982a6..a6c0d7e1bf 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -151,6 +151,19 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41f..da44b52fd6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" From 4dae4a97ed0e0b2cc9b5493172670ec7353ded2e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:52:19 +0100 Subject: [PATCH 3/8] Track last processed event received_ts --- synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/appservice.py | 10 ++++++++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events_worker.py | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d68f0d8950..1c466cbded 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -243,6 +243,17 @@ class TransactionQueue(object): next_token, "federation_sender", ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2a7e31e711..fcc1246bee 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -115,6 +115,16 @@ class ApplicationServicesHandler(object): synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6c0d7e1bf..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge( "event_persisted_position", ) +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..769eb51489 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timstamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, From e939f3bca6d73bc4ec6450aeb41f5e121a29a662 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 14:37:11 +0100 Subject: [PATCH 4/8] Fix tests --- tests/handlers/test_appservice.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a667fb6f0e..b753455943 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) + self.mock_store.get_received_ts.return_value = 0 hs.get_application_service_api = Mock(return_value=self.mock_as_api) hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) hs.get_clock.return_value = MockClock() From f67e906e189cb528cab18cb5190be352dcc8914b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 11:18:19 +0100 Subject: [PATCH 5/8] Set all metrics at the same time --- synapse/federation/transaction_queue.py | 12 ++++++------ synapse/handlers/appservice.py | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1c466cbded..963d938edd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -233,16 +233,10 @@ class TransactionQueue(object): consumeErrors=True )) - events_processed_counter.inc_by(len(events)) - yield self.store.update_federation_out_pos( "events", next_token ) - synapse.metrics.event_processing_positions.set( - next_token, "federation_sender", - ) - if events: now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) @@ -254,6 +248,12 @@ class TransactionQueue(object): ts, "federation_sender", ) + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fcc1246bee..ce0814bc25 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -108,16 +108,16 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) - yield self.store.set_appservice_last_pos(upper_bound) + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) - now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + events_processed_counter.inc_by(len(events)) synapse.metrics.event_processing_lag.set( now - ts, "appservice_sender", From d7bf3a68f002680ed0cd8a84c75b011371a50d6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 11:19:04 +0100 Subject: [PATCH 6/8] s/list/tuple --- synapse/metrics/metric.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 9ebf2d6fdb..89bd47c3f7 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -115,7 +115,7 @@ class CounterMetric(BaseMetric): # dict[list[str]]: value for each set of label values. the keys are the # label values, in the same order as the labels in self.labels. # - # (if the metric is a scalar, the (single) key is the empty list). + # (if the metric is a scalar, the (single) key is the empty tuple). self.counts = {} # Scalar metrics are never empty @@ -155,7 +155,7 @@ class GaugeMetric(BaseMetric): # dict[list[str]]: value for each set of label values. the keys are the # label values, in the same order as the labels in self.labels. # - # (if the metric is a scalar, the (single) key is the empty list). + # (if the metric is a scalar, the (single) key is the empty tuple). self.guages = {} def set(self, v, *values): From 23a7f9d7f4cfbf0333a1def526a59fb4b0f01067 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 11:20:51 +0100 Subject: [PATCH 7/8] Doc we raise on unknown event --- synapse/storage/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 769eb51489..88360b48b8 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -52,13 +52,14 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): def get_received_ts(self, event_id): - """Get received_ts (when it was persisted) for the event + """Get received_ts (when it was persisted) for the event. Raises an + exception for unknown events. Args: event_id (str) Returns: - Deferred[int|None]: Timstamp in milliseconds, or None for events + Deferred[int|None]: Timestamp in milliseconds, or None for events that were persisted before received_ts was implemented. """ return self._simple_select_one_onecol( From 415aeefd89ca48d3a3e9a7a981999ae071f6060b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 12:07:09 +0100 Subject: [PATCH 8/8] Format docstring --- synapse/storage/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 88360b48b8..a937b9bceb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -52,8 +52,9 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): def get_received_ts(self, event_id): - """Get received_ts (when it was persisted) for the event. Raises an - exception for unknown events. + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. Args: event_id (str)