From fc069fee9c3a909597a2b529670d6be121ca458a Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 21 Jun 2019 12:18:51 +0100 Subject: [PATCH] Trace federation requests on the way in and out. The span is created in _started_processing and closed in _finished_processing because we need a meaningful log context. --- synapse/http/matrixfederationclient.py | 35 +++++++++++++++++++++----- synapse/http/site.py | 29 ++++++++++++++++++++- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 663ea72a7a..5f546ba27c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -52,6 +52,9 @@ from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure +import opentracing +from opentracing.propagation import Format + logger = logging.getLogger(__name__) outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", @@ -350,11 +353,26 @@ class MatrixFederationHttpClient(object): else: query_bytes = b"" - headers_dict = { - b"User-Agent": [self.version_string_bytes], - } - - with limiter: + # Retreive current span + scope = opentracing.tracer.start_active_span( + "outgoing-federation-request", + tags={ + "span.kind": "client", # With respect to this request's role in an rpc + "peer.address": request.destination, + "http.method": request.method, + "http.url": request.path, + }, + finish_on_close=True + ) + + # Inject the span into the headers + headers_dict = {} + opentracing.tracer.inject(scope.span, Format.HTTP_HEADERS, headers_dict) + headers_dict = {k.encode(): [v.encode()] for k, v in headers_dict.items()} + + headers_dict[b"User-Agent"] = [self.version_string_bytes] + + with limiter, scope: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -433,6 +451,9 @@ class MatrixFederationHttpClient(object): response.phrase.decode('ascii', errors='replace'), ) + logger.info("Setting response code on span {} *********".format(opentracing.tracer.active_span)) + scope.span.set_tag( + "http.status_code", response.code) if 200 <= response.code < 300: pass else: @@ -516,9 +537,11 @@ class MatrixFederationHttpClient(object): url_str, _flatten_response_never_received(e), ) + logger.info("Setting response code on span {} *********".format(opentracing.tracer.active_span)) + #scope.span.set_tag("error", True) raise - defer.returnValue(response) + defer.returnValue(response) def build_auth_headers( self, destination, method, url_bytes, content=None, destination_is=None, diff --git a/synapse/http/site.py b/synapse/http/site.py index e508c0bd4f..104f992fcf 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,12 +14,14 @@ import contextlib import logging import time +import opentracing from twisted.web.server import Request, Site from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.tracerutils import extract_span_context logger = logging.getLogger(__name__) @@ -211,6 +213,7 @@ class SynapseRequest(Request): This will log the request's arrival. Once the request completes, be sure to call finished_processing. + It will also start a span for this request. Args: servlet_name (str): the name of the servlet which will be @@ -233,6 +236,20 @@ class SynapseRequest(Request): self.get_redacted_uri() ) + # Start a span + span_context = extract_span_context(self.requestHeaders) + opentracing.tracer.start_active_span( + "incoming-federation-request", + tags={ + "request_id": self.get_request_id(), + "span.kind": "server", + "http.method": self.get_method(), + "http.url": self.get_redacted_uri(), + "peer.ipv6": self.getClientIP(), + }, + child_of=span_context + ) + def _finished_processing(self): """Log the completion of this request and update the metrics """ @@ -252,7 +269,7 @@ class SynapseRequest(Request): # the time between receiving the request and the request handler finishing processing_time = self._processing_finished_time - self.start_time - # the time between the request handler finishing and the response being sent + # the time between the reb'\x80\x03ctwisted.web.http_headers\nHeaders\nq\x00)\x81q\x01}q\x02X\x0b\x00\x00\x00_rawHeadersq\x03}q\x04(C\x04hostq\x05]q\x06C\x0elocalhost:8081q\x07aC\nuser-agentq\x08]q\tCLMozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0q\naC\x06acceptq\x0b]q\x0cC\x10application/jsonq\raC\x0faccept-languageq\x0e]q\x0fC\x0een-GB,en;q=0.5q\x10aC\x0faccept-encodingq\x11]q\x12C\rgzip, deflateq\x13aC\x06originq\x14]q\x15C\x04nullq\x16aC\nconnectionq\x17]q\x18C\nkeep-aliveq\x19ausb.'quest handler finishing and the response being sent # to the client (nb may be negative) response_send_time = self.finish_time - self._processing_finished_time @@ -302,6 +319,16 @@ class SynapseRequest(Request): usage.evt_db_fetch_count, ) + scope = opentracing.tracer.scope_manager.active + if scope is not None: + # TODO: Remove this, it's just for debug and relies on implementation + # specific details of the jaeger_tracer + tags = {t.key: t.vStr for t in scope.span.tags} + assert(tags['request_id'] == self.get_request_id()) + # finish the span if it's there. + scope.span.set_tag("peer.address", authenticated_entity) + scope.close() + try: self.request_metrics.stop(self.finish_time, self.code, self.sentLength) except Exception as e: