Trace how long it takes for the send trasaction to complete, including retrys (#5986)
parent
bc604e7f94
commit
f7c873a643
|
@ -0,0 +1 @@
|
||||||
|
Trace replication send times.
|
|
@ -46,6 +46,7 @@ from synapse.http import (
|
||||||
redact_uri,
|
redact_uri,
|
||||||
)
|
)
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
|
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||||
from synapse.util.async_helpers import timeout_deferred
|
from synapse.util.async_helpers import timeout_deferred
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
|
|
||||||
|
@ -269,42 +270,56 @@ class SimpleHttpClient(object):
|
||||||
# log request but strip `access_token` (AS requests for example include this)
|
# log request but strip `access_token` (AS requests for example include this)
|
||||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||||
|
|
||||||
try:
|
with start_active_span(
|
||||||
body_producer = None
|
"outgoing-client-request",
|
||||||
if data is not None:
|
tags={
|
||||||
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
||||||
|
tags.HTTP_METHOD: method,
|
||||||
|
tags.HTTP_URL: uri,
|
||||||
|
},
|
||||||
|
finish_on_close=True,
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
body_producer = None
|
||||||
|
if data is not None:
|
||||||
|
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
||||||
|
|
||||||
request_deferred = treq.request(
|
request_deferred = treq.request(
|
||||||
method,
|
method,
|
||||||
uri,
|
uri,
|
||||||
agent=self.agent,
|
agent=self.agent,
|
||||||
data=body_producer,
|
data=body_producer,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
**self._extra_treq_args
|
**self._extra_treq_args
|
||||||
)
|
)
|
||||||
request_deferred = timeout_deferred(
|
request_deferred = timeout_deferred(
|
||||||
request_deferred,
|
request_deferred,
|
||||||
60,
|
60,
|
||||||
self.hs.get_reactor(),
|
self.hs.get_reactor(),
|
||||||
cancelled_to_request_timed_out_error,
|
cancelled_to_request_timed_out_error,
|
||||||
)
|
)
|
||||||
response = yield make_deferred_yieldable(request_deferred)
|
response = yield make_deferred_yieldable(request_deferred)
|
||||||
|
|
||||||
incoming_responses_counter.labels(method, response.code).inc()
|
incoming_responses_counter.labels(method, response.code).inc()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Received response to %s %s: %s", method, redact_uri(uri), response.code
|
"Received response to %s %s: %s",
|
||||||
)
|
method,
|
||||||
return response
|
redact_uri(uri),
|
||||||
except Exception as e:
|
response.code,
|
||||||
incoming_responses_counter.labels(method, "ERR").inc()
|
)
|
||||||
logger.info(
|
return response
|
||||||
"Error sending request to %s %s: %s %s",
|
except Exception as e:
|
||||||
method,
|
incoming_responses_counter.labels(method, "ERR").inc()
|
||||||
redact_uri(uri),
|
logger.info(
|
||||||
type(e).__name__,
|
"Error sending request to %s %s: %s %s",
|
||||||
e.args[0],
|
method,
|
||||||
)
|
redact_uri(uri),
|
||||||
raise
|
type(e).__name__,
|
||||||
|
e.args[0],
|
||||||
|
)
|
||||||
|
set_tag(tags.ERROR, True)
|
||||||
|
set_tag("error_reason", e.args[0])
|
||||||
|
raise
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
||||||
|
|
|
@ -345,7 +345,6 @@ class MatrixFederationHttpClient(object):
|
||||||
else:
|
else:
|
||||||
query_bytes = b""
|
query_bytes = b""
|
||||||
|
|
||||||
# Retreive current span
|
|
||||||
scope = start_active_span(
|
scope = start_active_span(
|
||||||
"outgoing-federation-request",
|
"outgoing-federation-request",
|
||||||
tags={
|
tags={
|
||||||
|
|
|
@ -28,7 +28,11 @@ from synapse.api.errors import (
|
||||||
RequestSendFailed,
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet
|
from synapse.logging.opentracing import (
|
||||||
|
inject_active_span_byte_dict,
|
||||||
|
trace,
|
||||||
|
trace_servlet,
|
||||||
|
)
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
|
@ -129,6 +133,7 @@ class ReplicationEndpoint(object):
|
||||||
|
|
||||||
client = hs.get_simple_http_client()
|
client = hs.get_simple_http_client()
|
||||||
|
|
||||||
|
@trace(opname="outgoing_replication_request")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_request(**kwargs):
|
def send_request(**kwargs):
|
||||||
data = yield cls._serialize_payload(**kwargs)
|
data = yield cls._serialize_payload(**kwargs)
|
||||||
|
|
Loading…
Reference in New Issue