Add some metrics for inbound and outbound federation processing times (#7755)

pull/7771/head
Erik Johnston 2020-06-30 16:58:06 +01:00 committed by GitHub
parent 2f6afdd8b4
commit a99658074d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 17 deletions

1
changelog.d/7755.misc Normal file
View File

@ -0,0 +1 @@
Add some metrics for inbound and outbound federation latencies: `synapse_federation_server_pdu_process_time` and `synapse_event_processing_lag_by_event`.

View File

@ -18,7 +18,7 @@ import logging
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
from canonicaljson import json from canonicaljson import json
from prometheus_client import Counter from prometheus_client import Counter, Histogram
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.abstract import isIPAddress from twisted.internet.abstract import isIPAddress
@ -70,6 +70,10 @@ received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"] "synapse_federation_server_received_queries", "", ["type"]
) )
pdu_process_time = Histogram(
"synapse_federation_server_pdu_process_time", "Time taken to process an event",
)
class FederationServer(FederationBase): class FederationServer(FederationBase):
def __init__(self, hs): def __init__(self, hs):
@ -271,21 +275,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]: for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id event_id = pdu.event_id
with nested_logging_context(event_id): with pdu_process_time.time():
try: with nested_logging_context(event_id):
await self._handle_received_pdu(origin, pdu) try:
pdu_results[event_id] = {} await self._handle_received_pdu(origin, pdu)
except FederationError as e: pdu_results[event_id] = {}
logger.warning("Error handling PDU %s: %s", event_id, e) except FederationError as e:
pdu_results[event_id] = {"error": str(e)} logger.warning("Error handling PDU %s: %s", event_id, e)
except Exception as e: pdu_results[event_id] = {"error": str(e)}
f = failure.Failure() except Exception as e:
pdu_results[event_id] = {"error": str(e)} f = failure.Failure()
logger.error( pdu_results[event_id] = {"error": str(e)}
"Failed to handle PDU %s", logger.error(
event_id, "Failed to handle PDU %s",
exc_info=(f.type, f.value, f.getTracebackObject()), event_id,
) exc_info=(f.type, f.value, f.getTracebackObject()),
)
await concurrently_execute( await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT

View File

@ -201,7 +201,15 @@ class FederationSender(object):
logger.debug("Sending %s to %r", event, destinations) logger.debug("Sending %s to %r", event, destinations)
self._send_pdu(event, destinations) if destinations:
self._send_pdu(event, destinations)
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
).observe(now - ts)
async def handle_room_events(events: Iterable[EventBase]) -> None: async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"): with Measure(self.clock, "handle_room_events"):

View File

@ -114,6 +114,12 @@ class ApplicationServicesHandler(object):
for service in services: for service in services:
self.scheduler.submit_event_for_as(service, event) self.scheduler.submit_event_for_as(service, event)
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(event.event_id)
synapse.metrics.event_processing_lag_by_event.labels(
"appservice_sender"
).observe(now - ts)
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_room_events(events): def handle_room_events(events):
for event in events: for event in events:

View File

@ -463,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed. # finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
event_processing_lag_by_event = Histogram(
"synapse_event_processing_lag_by_event",
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
["name"],
)
# Build info of the running server. # Build info of the running server.
build_info = Gauge( build_info = Gauge(
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]