Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
commit
68f53b7a0e
|
@ -0,0 +1 @@
|
||||||
|
Trace replication send times.
|
|
@ -0,0 +1 @@
|
||||||
|
Fix invalid references to None while opentracing if the log context slips.
|
|
@ -46,6 +46,7 @@ from synapse.http import (
|
||||||
redact_uri,
|
redact_uri,
|
||||||
)
|
)
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
|
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||||
from synapse.util.async_helpers import timeout_deferred
|
from synapse.util.async_helpers import timeout_deferred
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
|
|
||||||
|
@ -269,42 +270,56 @@ class SimpleHttpClient(object):
|
||||||
# log request but strip `access_token` (AS requests for example include this)
|
# log request but strip `access_token` (AS requests for example include this)
|
||||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||||
|
|
||||||
try:
|
with start_active_span(
|
||||||
body_producer = None
|
"outgoing-client-request",
|
||||||
if data is not None:
|
tags={
|
||||||
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
||||||
|
tags.HTTP_METHOD: method,
|
||||||
|
tags.HTTP_URL: uri,
|
||||||
|
},
|
||||||
|
finish_on_close=True,
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
body_producer = None
|
||||||
|
if data is not None:
|
||||||
|
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
||||||
|
|
||||||
request_deferred = treq.request(
|
request_deferred = treq.request(
|
||||||
method,
|
method,
|
||||||
uri,
|
uri,
|
||||||
agent=self.agent,
|
agent=self.agent,
|
||||||
data=body_producer,
|
data=body_producer,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
**self._extra_treq_args
|
**self._extra_treq_args
|
||||||
)
|
)
|
||||||
request_deferred = timeout_deferred(
|
request_deferred = timeout_deferred(
|
||||||
request_deferred,
|
request_deferred,
|
||||||
60,
|
60,
|
||||||
self.hs.get_reactor(),
|
self.hs.get_reactor(),
|
||||||
cancelled_to_request_timed_out_error,
|
cancelled_to_request_timed_out_error,
|
||||||
)
|
)
|
||||||
response = yield make_deferred_yieldable(request_deferred)
|
response = yield make_deferred_yieldable(request_deferred)
|
||||||
|
|
||||||
incoming_responses_counter.labels(method, response.code).inc()
|
incoming_responses_counter.labels(method, response.code).inc()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Received response to %s %s: %s", method, redact_uri(uri), response.code
|
"Received response to %s %s: %s",
|
||||||
)
|
method,
|
||||||
return response
|
redact_uri(uri),
|
||||||
except Exception as e:
|
response.code,
|
||||||
incoming_responses_counter.labels(method, "ERR").inc()
|
)
|
||||||
logger.info(
|
return response
|
||||||
"Error sending request to %s %s: %s %s",
|
except Exception as e:
|
||||||
method,
|
incoming_responses_counter.labels(method, "ERR").inc()
|
||||||
redact_uri(uri),
|
logger.info(
|
||||||
type(e).__name__,
|
"Error sending request to %s %s: %s %s",
|
||||||
e.args[0],
|
method,
|
||||||
)
|
redact_uri(uri),
|
||||||
raise
|
type(e).__name__,
|
||||||
|
e.args[0],
|
||||||
|
)
|
||||||
|
set_tag(tags.ERROR, True)
|
||||||
|
set_tag("error_reason", e.args[0])
|
||||||
|
raise
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
||||||
|
|
|
@ -345,7 +345,6 @@ class MatrixFederationHttpClient(object):
|
||||||
else:
|
else:
|
||||||
query_bytes = b""
|
query_bytes = b""
|
||||||
|
|
||||||
# Retreive current span
|
|
||||||
scope = start_active_span(
|
scope = start_active_span(
|
||||||
"outgoing-federation-request",
|
"outgoing-federation-request",
|
||||||
tags={
|
tags={
|
||||||
|
|
|
@ -239,8 +239,7 @@ _homeserver_whitelist = None
|
||||||
|
|
||||||
|
|
||||||
def only_if_tracing(func):
|
def only_if_tracing(func):
|
||||||
"""Executes the function only if we're tracing. Otherwise return.
|
"""Executes the function only if we're tracing. Otherwise returns None."""
|
||||||
Assumes the function wrapped may return None"""
|
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def _only_if_tracing_inner(*args, **kwargs):
|
def _only_if_tracing_inner(*args, **kwargs):
|
||||||
|
@ -252,6 +251,41 @@ def only_if_tracing(func):
|
||||||
return _only_if_tracing_inner
|
return _only_if_tracing_inner
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_active_span(message, ret=None):
|
||||||
|
"""Executes the operation only if opentracing is enabled and there is an active span.
|
||||||
|
If there is no active span it logs message at the error level.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message (str): Message which fills in "There was no active span when trying to %s"
|
||||||
|
in the error log if there is no active span and opentracing is enabled.
|
||||||
|
ret (object): return value if opentracing is None or there is no active span.
|
||||||
|
|
||||||
|
Returns (object): The result of the func or ret if opentracing is disabled or there
|
||||||
|
was no active span.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def ensure_active_span_inner_1(func):
|
||||||
|
@wraps(func)
|
||||||
|
def ensure_active_span_inner_2(*args, **kwargs):
|
||||||
|
if not opentracing:
|
||||||
|
return ret
|
||||||
|
|
||||||
|
if not opentracing.tracer.active_span:
|
||||||
|
logger.error(
|
||||||
|
"There was no active span when trying to %s."
|
||||||
|
" Did you forget to start one or did a context slip?",
|
||||||
|
message,
|
||||||
|
)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return ensure_active_span_inner_2
|
||||||
|
|
||||||
|
return ensure_active_span_inner_1
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _noop_context_manager(*args, **kwargs):
|
def _noop_context_manager(*args, **kwargs):
|
||||||
"""Does exactly what it says on the tin"""
|
"""Does exactly what it says on the tin"""
|
||||||
|
@ -349,26 +383,24 @@ def start_active_span(
|
||||||
if opentracing is None:
|
if opentracing is None:
|
||||||
return _noop_context_manager()
|
return _noop_context_manager()
|
||||||
|
|
||||||
else:
|
return opentracing.tracer.start_active_span(
|
||||||
# We need to enter the scope here for the logcontext to become active
|
operation_name,
|
||||||
return opentracing.tracer.start_active_span(
|
child_of=child_of,
|
||||||
operation_name,
|
references=references,
|
||||||
child_of=child_of,
|
tags=tags,
|
||||||
references=references,
|
start_time=start_time,
|
||||||
tags=tags,
|
ignore_active_span=ignore_active_span,
|
||||||
start_time=start_time,
|
finish_on_close=finish_on_close,
|
||||||
ignore_active_span=ignore_active_span,
|
)
|
||||||
finish_on_close=finish_on_close,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def start_active_span_follows_from(operation_name, contexts):
|
def start_active_span_follows_from(operation_name, contexts):
|
||||||
if opentracing is None:
|
if opentracing is None:
|
||||||
return _noop_context_manager()
|
return _noop_context_manager()
|
||||||
else:
|
|
||||||
references = [opentracing.follows_from(context) for context in contexts]
|
references = [opentracing.follows_from(context) for context in contexts]
|
||||||
scope = start_active_span(operation_name, references=references)
|
scope = start_active_span(operation_name, references=references)
|
||||||
return scope
|
return scope
|
||||||
|
|
||||||
|
|
||||||
def start_active_span_from_request(
|
def start_active_span_from_request(
|
||||||
|
@ -465,19 +497,19 @@ def start_active_span_from_edu(
|
||||||
# Opentracing setters for tags, logs, etc
|
# Opentracing setters for tags, logs, etc
|
||||||
|
|
||||||
|
|
||||||
@only_if_tracing
|
@ensure_active_span("set a tag")
|
||||||
def set_tag(key, value):
|
def set_tag(key, value):
|
||||||
"""Sets a tag on the active span"""
|
"""Sets a tag on the active span"""
|
||||||
opentracing.tracer.active_span.set_tag(key, value)
|
opentracing.tracer.active_span.set_tag(key, value)
|
||||||
|
|
||||||
|
|
||||||
@only_if_tracing
|
@ensure_active_span("log")
|
||||||
def log_kv(key_values, timestamp=None):
|
def log_kv(key_values, timestamp=None):
|
||||||
"""Log to the active span"""
|
"""Log to the active span"""
|
||||||
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||||
|
|
||||||
|
|
||||||
@only_if_tracing
|
@ensure_active_span("set the traces operation name")
|
||||||
def set_operation_name(operation_name):
|
def set_operation_name(operation_name):
|
||||||
"""Sets the operation name of the active span"""
|
"""Sets the operation name of the active span"""
|
||||||
opentracing.tracer.active_span.set_operation_name(operation_name)
|
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||||
|
@ -486,7 +518,7 @@ def set_operation_name(operation_name):
|
||||||
# Injection and extraction
|
# Injection and extraction
|
||||||
|
|
||||||
|
|
||||||
@only_if_tracing
|
@ensure_active_span("inject the span into a header")
|
||||||
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
|
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
|
||||||
"""
|
"""
|
||||||
Injects a span context into twisted headers in-place
|
Injects a span context into twisted headers in-place
|
||||||
|
@ -522,7 +554,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
|
||||||
headers.addRawHeaders(key, value)
|
headers.addRawHeaders(key, value)
|
||||||
|
|
||||||
|
|
||||||
@only_if_tracing
|
@ensure_active_span("inject the span into a byte dict")
|
||||||
def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||||
"""
|
"""
|
||||||
Injects a span context into a dict where the headers are encoded as byte
|
Injects a span context into a dict where the headers are encoded as byte
|
||||||
|
@ -559,7 +591,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||||
headers[key.encode()] = [value.encode()]
|
headers[key.encode()] = [value.encode()]
|
||||||
|
|
||||||
|
|
||||||
@only_if_tracing
|
@ensure_active_span("inject the span into a text map")
|
||||||
def inject_active_span_text_map(carrier, destination, check_destination=True):
|
def inject_active_span_text_map(carrier, destination, check_destination=True):
|
||||||
"""
|
"""
|
||||||
Injects a span context into a dict
|
Injects a span context into a dict
|
||||||
|
@ -591,6 +623,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ensure_active_span("get the active span context as a dict", ret={})
|
||||||
def get_active_span_text_map(destination=None):
|
def get_active_span_text_map(destination=None):
|
||||||
"""
|
"""
|
||||||
Gets a span context as a dict. This can be used instead of manually
|
Gets a span context as a dict. This can be used instead of manually
|
||||||
|
@ -603,7 +636,7 @@ def get_active_span_text_map(destination=None):
|
||||||
dict: the active span's context if opentracing is enabled, otherwise empty.
|
dict: the active span's context if opentracing is enabled, otherwise empty.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not opentracing or (destination and not whitelisted_homeserver(destination)):
|
if destination and not whitelisted_homeserver(destination):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
carrier = {}
|
carrier = {}
|
||||||
|
@ -614,6 +647,7 @@ def get_active_span_text_map(destination=None):
|
||||||
return carrier
|
return carrier
|
||||||
|
|
||||||
|
|
||||||
|
@ensure_active_span("get the span context as a string.", ret={})
|
||||||
def active_span_context_as_string():
|
def active_span_context_as_string():
|
||||||
"""
|
"""
|
||||||
Returns:
|
Returns:
|
||||||
|
|
|
@ -28,7 +28,11 @@ from synapse.api.errors import (
|
||||||
RequestSendFailed,
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet
|
from synapse.logging.opentracing import (
|
||||||
|
inject_active_span_byte_dict,
|
||||||
|
trace,
|
||||||
|
trace_servlet,
|
||||||
|
)
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
|
@ -129,6 +133,7 @@ class ReplicationEndpoint(object):
|
||||||
|
|
||||||
client = hs.get_simple_http_client()
|
client = hs.get_simple_http_client()
|
||||||
|
|
||||||
|
@trace(opname="outgoing_replication_request")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_request(**kwargs):
|
def send_request(**kwargs):
|
||||||
data = yield cls._serialize_payload(**kwargs)
|
data = yield cls._serialize_payload(**kwargs)
|
||||||
|
|
Loading…
Reference in New Issue