Trace federation requests on the way in and out.
The span is created in _started_processing and closed in _finished_processing because we need a meaningful log context.pull/5544/head
parent
45d412e67d
commit
fc069fee9c
|
@ -52,6 +52,9 @@ from synapse.util.async_helpers import timeout_deferred
|
|||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
import opentracing
|
||||
from opentracing.propagation import Format
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
|
||||
|
@ -350,11 +353,26 @@ class MatrixFederationHttpClient(object):
|
|||
else:
|
||||
query_bytes = b""
|
||||
|
||||
headers_dict = {
|
||||
b"User-Agent": [self.version_string_bytes],
|
||||
}
|
||||
|
||||
with limiter:
|
||||
# Retreive current span
|
||||
scope = opentracing.tracer.start_active_span(
|
||||
"outgoing-federation-request",
|
||||
tags={
|
||||
"span.kind": "client", # With respect to this request's role in an rpc
|
||||
"peer.address": request.destination,
|
||||
"http.method": request.method,
|
||||
"http.url": request.path,
|
||||
},
|
||||
finish_on_close=True
|
||||
)
|
||||
|
||||
# Inject the span into the headers
|
||||
headers_dict = {}
|
||||
opentracing.tracer.inject(scope.span, Format.HTTP_HEADERS, headers_dict)
|
||||
headers_dict = {k.encode(): [v.encode()] for k, v in headers_dict.items()}
|
||||
|
||||
headers_dict[b"User-Agent"] = [self.version_string_bytes]
|
||||
|
||||
with limiter, scope:
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
# (once we have reliable transactions in place)
|
||||
if long_retries:
|
||||
|
@ -433,6 +451,9 @@ class MatrixFederationHttpClient(object):
|
|||
response.phrase.decode('ascii', errors='replace'),
|
||||
)
|
||||
|
||||
logger.info("Setting response code on span {} *********".format(opentracing.tracer.active_span))
|
||||
scope.span.set_tag(
|
||||
"http.status_code", response.code)
|
||||
if 200 <= response.code < 300:
|
||||
pass
|
||||
else:
|
||||
|
@ -516,9 +537,11 @@ class MatrixFederationHttpClient(object):
|
|||
url_str,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
logger.info("Setting response code on span {} *********".format(opentracing.tracer.active_span))
|
||||
#scope.span.set_tag("error", True)
|
||||
raise
|
||||
|
||||
defer.returnValue(response)
|
||||
defer.returnValue(response)
|
||||
|
||||
def build_auth_headers(
|
||||
self, destination, method, url_bytes, content=None, destination_is=None,
|
||||
|
|
|
@ -14,12 +14,14 @@
|
|||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
import opentracing
|
||||
|
||||
from twisted.web.server import Request, Site
|
||||
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.request_metrics import RequestMetrics, requests_counter
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.util.tracerutils import extract_span_context
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -211,6 +213,7 @@ class SynapseRequest(Request):
|
|||
|
||||
This will log the request's arrival. Once the request completes,
|
||||
be sure to call finished_processing.
|
||||
It will also start a span for this request.
|
||||
|
||||
Args:
|
||||
servlet_name (str): the name of the servlet which will be
|
||||
|
@ -233,6 +236,20 @@ class SynapseRequest(Request):
|
|||
self.get_redacted_uri()
|
||||
)
|
||||
|
||||
# Start a span
|
||||
span_context = extract_span_context(self.requestHeaders)
|
||||
opentracing.tracer.start_active_span(
|
||||
"incoming-federation-request",
|
||||
tags={
|
||||
"request_id": self.get_request_id(),
|
||||
"span.kind": "server",
|
||||
"http.method": self.get_method(),
|
||||
"http.url": self.get_redacted_uri(),
|
||||
"peer.ipv6": self.getClientIP(),
|
||||
},
|
||||
child_of=span_context
|
||||
)
|
||||
|
||||
def _finished_processing(self):
|
||||
"""Log the completion of this request and update the metrics
|
||||
"""
|
||||
|
@ -252,7 +269,7 @@ class SynapseRequest(Request):
|
|||
# the time between receiving the request and the request handler finishing
|
||||
processing_time = self._processing_finished_time - self.start_time
|
||||
|
||||
# the time between the request handler finishing and the response being sent
|
||||
# the time between the reb'\x80\x03ctwisted.web.http_headers\nHeaders\nq\x00)\x81q\x01}q\x02X\x0b\x00\x00\x00_rawHeadersq\x03}q\x04(C\x04hostq\x05]q\x06C\x0elocalhost:8081q\x07aC\nuser-agentq\x08]q\tCLMozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0q\naC\x06acceptq\x0b]q\x0cC\x10application/jsonq\raC\x0faccept-languageq\x0e]q\x0fC\x0een-GB,en;q=0.5q\x10aC\x0faccept-encodingq\x11]q\x12C\rgzip, deflateq\x13aC\x06originq\x14]q\x15C\x04nullq\x16aC\nconnectionq\x17]q\x18C\nkeep-aliveq\x19ausb.'quest handler finishing and the response being sent
|
||||
# to the client (nb may be negative)
|
||||
response_send_time = self.finish_time - self._processing_finished_time
|
||||
|
||||
|
@ -302,6 +319,16 @@ class SynapseRequest(Request):
|
|||
usage.evt_db_fetch_count,
|
||||
)
|
||||
|
||||
scope = opentracing.tracer.scope_manager.active
|
||||
if scope is not None:
|
||||
# TODO: Remove this, it's just for debug and relies on implementation
|
||||
# specific details of the jaeger_tracer
|
||||
tags = {t.key: t.vStr for t in scope.span.tags}
|
||||
assert(tags['request_id'] == self.get_request_id())
|
||||
# finish the span if it's there.
|
||||
scope.span.set_tag("peer.address", authenticated_entity)
|
||||
scope.close()
|
||||
|
||||
try:
|
||||
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
|
||||
except Exception as e:
|
||||
|
|
Loading…
Reference in New Issue