Make opentracing an optional dependency
parent
37a650c270
commit
4bf9e1188f
|
@ -241,7 +241,7 @@ def start(hs, listeners=None):
|
|||
refresh_certificate(hs)
|
||||
|
||||
# Start the tracer
|
||||
synapse.config.tracer.init_tracing(hs.config)
|
||||
synapse.util.tracerutils.TracerUtil.init_tracer(hs.config)
|
||||
|
||||
# It is now safe to start your Synapse.
|
||||
hs.start_listening(listeners)
|
||||
|
|
|
@ -13,11 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from jaeger_client import Config as JaegerConfig
|
||||
|
||||
from synapse.util.scopecontextmanager import LogContextScopeManager
|
||||
from synapse.util.tracerutils import TracerUtil
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
|
||||
|
@ -55,32 +50,3 @@ class TracerConfig(Config):
|
|||
# homeserver_whitelist:
|
||||
# - ".*"
|
||||
"""
|
||||
|
||||
|
||||
def init_tracing(config):
|
||||
"""Set the whitelists and initialise the JaegerClient tracer
|
||||
|
||||
Args:
|
||||
config (Config)
|
||||
The config used by the homserver. Here it's used to set the service
|
||||
name to the homeserver's.
|
||||
"""
|
||||
|
||||
name = config.worker_name if config.worker_name else "master"
|
||||
|
||||
if config.tracer_config.get("tracer_enabled", False):
|
||||
TracerUtil.set_homeserver_whitelist(
|
||||
config.tracer_config["homeserver_whitelist"]
|
||||
)
|
||||
jaeger_config = JaegerConfig(
|
||||
config={"sampler": {"type": "const", "param": 1}, "logging": True},
|
||||
service_name="{} {}".format(config.server_name, name),
|
||||
scope_manager=LogContextScopeManager(config),
|
||||
)
|
||||
else: # The tracer is not configured so we instantiate a noop tracer
|
||||
jaeger_config = JaegerConfig(
|
||||
config={"sampler": {"type": "const", "param": 0}},
|
||||
service_name=config.server_name,
|
||||
)
|
||||
|
||||
return jaeger_config.initialize_tracer()
|
||||
|
|
|
@ -23,10 +23,8 @@ from six import PY3, raise_from, string_types
|
|||
from six.moves import urllib
|
||||
|
||||
import attr
|
||||
import opentracing
|
||||
import treq
|
||||
from canonicaljson import encode_canonical_json
|
||||
from opentracing import tags
|
||||
from prometheus_client import Counter
|
||||
from signedjson.sign import sign_json
|
||||
from zope.interface import implementer
|
||||
|
@ -343,26 +341,24 @@ class MatrixFederationHttpClient(object):
|
|||
query_bytes = b""
|
||||
|
||||
# Retreive current span
|
||||
scope = opentracing.tracer.start_active_span(
|
||||
TracerUtil.start_active_span(
|
||||
"outgoing-federation-request",
|
||||
tags={
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
||||
tags.PEER_ADDRESS: request.destination,
|
||||
tags.HTTP_METHOD: request.method,
|
||||
tags.HTTP_URL: request.path,
|
||||
TracerUtil.Tags.SPAN_KIND: TracerUtil.Tags.SPAN_KIND_RPC_CLIENT,
|
||||
TracerUtil.Tags.PEER_ADDRESS: request.destination,
|
||||
TracerUtil.Tags.HTTP_METHOD: request.method,
|
||||
TracerUtil.Tags.HTTP_URL: request.path,
|
||||
},
|
||||
finish_on_close=True,
|
||||
)
|
||||
|
||||
# Inject the span into the headers
|
||||
headers_dict = {}
|
||||
TracerUtil.inject_span_context_byte_dict(
|
||||
headers_dict, scope.span, request.destination
|
||||
)
|
||||
TracerUtil.inject_active_span_byte_dict(headers_dict, request.destination)
|
||||
|
||||
headers_dict[b"User-Agent"] = [self.version_string_bytes]
|
||||
|
||||
with limiter, scope:
|
||||
with limiter:
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
# (once we have reliable transactions in place)
|
||||
if long_retries:
|
||||
|
@ -440,7 +436,7 @@ class MatrixFederationHttpClient(object):
|
|||
response.phrase.decode("ascii", errors="replace"),
|
||||
)
|
||||
|
||||
scope.span.set_tag(opentracing.tags.HTTP_STATUS_CODE, response.code)
|
||||
TracerUtil.set_tag(TracerUtil.Tags.HTTP_STATUS_CODE, response.code)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
pass
|
||||
|
@ -522,7 +518,7 @@ class MatrixFederationHttpClient(object):
|
|||
_flatten_response_never_received(e),
|
||||
)
|
||||
raise
|
||||
|
||||
TracerUtil.close_active_span()
|
||||
defer.returnValue(response)
|
||||
|
||||
def build_auth_headers(
|
||||
|
|
|
@ -15,9 +15,6 @@ import contextlib
|
|||
import logging
|
||||
import time
|
||||
|
||||
import opentracing
|
||||
from opentracing import tags
|
||||
|
||||
from twisted.web.server import Request, Site
|
||||
|
||||
from synapse.http import redact_uri
|
||||
|
@ -239,17 +236,16 @@ class SynapseRequest(Request):
|
|||
)
|
||||
|
||||
# Start a span
|
||||
span_context = TracerUtil.extract_span_context(self.requestHeaders)
|
||||
opentracing.tracer.start_active_span(
|
||||
TracerUtil.start_active_span_from_context(
|
||||
self.requestHeaders,
|
||||
"incoming-federation-request",
|
||||
tags={
|
||||
"request_id": self.get_request_id(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: self.get_method(),
|
||||
tags.HTTP_URL: self.get_redacted_uri(),
|
||||
tags.PEER_HOST_IPV6: self.getClientIP(),
|
||||
TracerUtil.Tags.SPAN_KIND: TracerUtil.Tags.SPAN_KIND_RPC_SERVER,
|
||||
TracerUtil.Tags.HTTP_METHOD: self.get_method(),
|
||||
TracerUtil.Tags.HTTP_URL: self.get_redacted_uri(),
|
||||
TracerUtil.Tags.PEER_HOST_IPV6: self.getClientIP(),
|
||||
},
|
||||
child_of=span_context,
|
||||
)
|
||||
|
||||
def _finished_processing(self):
|
||||
|
@ -321,11 +317,9 @@ class SynapseRequest(Request):
|
|||
usage.evt_db_fetch_count,
|
||||
)
|
||||
|
||||
scope = opentracing.tracer.scope_manager.active
|
||||
if scope is not None:
|
||||
# finish the span if it's there.
|
||||
scope.span.set_tag("peer.address", authenticated_entity)
|
||||
scope.__exit__(None, None, None)
|
||||
# finish the span if it's there.
|
||||
TracerUtil.set_tag("peer.address", authenticated_entity)
|
||||
TracerUtil.close_active_span()
|
||||
|
||||
try:
|
||||
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
|
||||
|
|
|
@ -72,7 +72,6 @@ REQUIREMENTS = [
|
|||
# Twisted 18.7.0 requires attrs>=17.4.0
|
||||
"attrs>=17.4.0",
|
||||
"netaddr>=0.7.18",
|
||||
"jaeger-client>=4.0.0",
|
||||
"Jinja2>=2.9",
|
||||
"bleach>=1.4.3",
|
||||
]
|
||||
|
@ -96,6 +95,7 @@ CONDITIONAL_REQUIREMENTS = {
|
|||
"url_preview": ["lxml>=3.5.0"],
|
||||
"test": ["mock>=2.0", "parameterized"],
|
||||
"sentry": ["sentry-sdk>=0.7.2"],
|
||||
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
|
||||
}
|
||||
|
||||
ALL_OPTIONAL_REQUIREMENTS = set()
|
||||
|
|
|
@ -13,19 +13,179 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.import opentracing
|
||||
|
||||
import logging
|
||||
import re
|
||||
from functools import wraps
|
||||
|
||||
import opentracing
|
||||
from opentracing.propagation import Format
|
||||
|
||||
# block everything by default
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TracerUtil:
|
||||
def only_if_tracing(func):
|
||||
"""Executes the function only if we're tracing. Otherwise return.
|
||||
Assumes the function wrapped may return None"""
|
||||
|
||||
@wraps(func)
|
||||
def f(cls, *args, **kwargs):
|
||||
if cls._opentracing:
|
||||
return func(cls, *args, **kwargs)
|
||||
else:
|
||||
return
|
||||
|
||||
return f
|
||||
|
||||
|
||||
class TracerUtil(object):
|
||||
_opentracing = None
|
||||
_opentracing_formats = None
|
||||
|
||||
# Block everything by default
|
||||
_homeserver_whitelist = None
|
||||
|
||||
@staticmethod
|
||||
def set_homeserver_whitelist(homeserver_whitelist):
|
||||
@classmethod
|
||||
def init_tracer(cls, config):
|
||||
"""Set the whitelists and initialise the JaegerClient tracer
|
||||
|
||||
Args:
|
||||
config (Config)
|
||||
The config used by the homserver. Here it's used to set the service
|
||||
name to the homeserver's.
|
||||
"""
|
||||
if not config.tracer_config.get("tracer_enabled", False):
|
||||
# We don't have a tracer
|
||||
return
|
||||
|
||||
cls.import_opentracing()
|
||||
cls.set_tags()
|
||||
cls.setup_tracing(config)
|
||||
|
||||
@classmethod
|
||||
def import_opentracing(cls):
|
||||
try:
|
||||
# Try to import the tracer. If it's not there we want to throw an eror
|
||||
import opentracing
|
||||
except ImportError as e:
|
||||
logger.error(
|
||||
"The server has been configure to use opentracing but "
|
||||
"the {} module has not been installed.".format(e.name)
|
||||
)
|
||||
raise
|
||||
|
||||
cls._opentracing = opentracing
|
||||
cls.set_tags()
|
||||
|
||||
@classmethod
|
||||
def setup_tracing(cls, config):
|
||||
try:
|
||||
from jaeger_client import Config as JaegerConfig
|
||||
from synapse.util.scopecontextmanager import LogContextScopeManager
|
||||
except ImportError as e:
|
||||
logger.error(
|
||||
"The server has been configure to use opentracing but "
|
||||
"the {} module has not been installed.".format(e.name)
|
||||
)
|
||||
raise
|
||||
|
||||
# Include the worker name
|
||||
name = config.worker_name if config.worker_name else "master"
|
||||
|
||||
cls.set_homeserver_whitelist(config.tracer_config["homeserver_whitelist"])
|
||||
jaeger_config = JaegerConfig(
|
||||
config={"sampler": {"type": "const", "param": 1}, "logging": True},
|
||||
service_name="{} {}".format(config.server_name, name),
|
||||
scope_manager=LogContextScopeManager(config),
|
||||
)
|
||||
jaeger_config.initialize_tracer()
|
||||
|
||||
class Tags(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
|
||||
COMPONENT = "invlalid-tag"
|
||||
DATABASE_INSTANCE = "invlalid-tag"
|
||||
DATABASE_STATEMENT = "invlalid-tag"
|
||||
DATABASE_TYPE = "invlalid-tag"
|
||||
DATABASE_USER = "invlalid-tag"
|
||||
ERROR = "invlalid-tag"
|
||||
HTTP_METHOD = "invlalid-tag"
|
||||
HTTP_STATUS_CODE = "invlalid-tag"
|
||||
HTTP_URL = "invlalid-tag"
|
||||
MESSAGE_BUS_DESTINATION = "invlalid-tag"
|
||||
PEER_ADDRESS = "invlalid-tag"
|
||||
PEER_HOSTNAME = "invlalid-tag"
|
||||
PEER_HOST_IPV4 = "invlalid-tag"
|
||||
PEER_HOST_IPV6 = "invlalid-tag"
|
||||
PEER_PORT = "invlalid-tag"
|
||||
PEER_SERVICE = "invlalid-tag"
|
||||
SAMPLING_PRIORITY = "invlalid-tag"
|
||||
SERVICE = "invlalid-tag"
|
||||
SPAN_KIND = "invlalid-tag"
|
||||
SPAN_KIND_CONSUMER = "invlalid-tag"
|
||||
SPAN_KIND_PRODUCER = "invlalid-tag"
|
||||
SPAN_KIND_RPC_CLIENT = "invlalid-tag"
|
||||
SPAN_KIND_RPC_SERVER = "invlalid-tag"
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def set_tags(cls):
|
||||
cls.Tags = cls._opentracing.tags
|
||||
|
||||
# Could use kwargs but I want these to be explicit
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def start_active_span(
|
||||
cls,
|
||||
operation_name,
|
||||
child_of=None,
|
||||
references=None,
|
||||
tags=None,
|
||||
start_time=None,
|
||||
ignore_active_span=False,
|
||||
finish_on_close=True,
|
||||
):
|
||||
# We need to enter the scope here for the logcontext to become active
|
||||
cls._opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=child_of,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
).__enter__()
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def close_active_span(cls):
|
||||
cls._opentracing.tracer.scope_manager.active.__exit__(None, None, None)
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def set_tag(cls, key, value):
|
||||
cls._opentracing.tracer.active_span.set_tag(key, value)
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def log_kv(cls, key_values, timestamp=None):
|
||||
cls._opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||
|
||||
# Note: we don't have a get baggage items because we're trying to hide all
|
||||
# scope and span state from synapse. I think this method may also be useless
|
||||
# as a result
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def set_baggage_item(cls, key, value):
|
||||
cls._opentracing.tracer.active_span.set_baggage_item(key, value)
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def set_operation_name(cls, operation_name):
|
||||
cls._opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def set_homeserver_whitelist(cls, homeserver_whitelist):
|
||||
"""Sets the whitelist
|
||||
|
||||
Args:
|
||||
|
@ -33,18 +193,29 @@ class TracerUtil:
|
|||
"""
|
||||
if homeserver_whitelist:
|
||||
# Makes a single regex which accepts all passed in regexes in the list
|
||||
TracerUtil._homeserver_whitelist = re.compile(
|
||||
cls._homeserver_whitelist = re.compile(
|
||||
"({})".format(")|(".join(homeserver_whitelist))
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def whitelisted_homeserver(destination):
|
||||
if TracerUtil._homeserver_whitelist:
|
||||
return TracerUtil._homeserver_whitelist.match(destination)
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def whitelisted_homeserver(cls, destination):
|
||||
if cls._homeserver_whitelist:
|
||||
return cls._homeserver_whitelist.match(destination)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def extract_span_context(headers):
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def start_active_span_from_context(
|
||||
cls,
|
||||
headers,
|
||||
operation_name,
|
||||
references=None,
|
||||
tags=None,
|
||||
start_time=None,
|
||||
ignore_active_span=False,
|
||||
finish_on_close=True,
|
||||
):
|
||||
"""
|
||||
Extracts a span context from Twisted Headers.
|
||||
args:
|
||||
|
@ -56,10 +227,23 @@ class TracerUtil:
|
|||
# So, we take the first item in the list.
|
||||
# Also, twisted uses byte arrays while opentracing expects strings.
|
||||
header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
|
||||
return opentracing.tracer.extract(Format.HTTP_HEADERS, header_dict)
|
||||
context = cls._opentracing.tracer.extract(
|
||||
cls._opentracing.Format.HTTP_HEADERS, header_dict
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def inject_span_context(headers, span, destination):
|
||||
cls._opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=context,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def inject_active_span_twisted_headers(cls, headers, destination):
|
||||
"""
|
||||
Injects a span context into twisted headers inplace
|
||||
|
||||
|
@ -80,14 +264,19 @@ class TracerUtil:
|
|||
|
||||
if not TracerUtil.whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
span = cls._opentracing.tracer.active_span
|
||||
carrier = {}
|
||||
opentracing.tracer.inject(span, Format.HTTP_HEADERS, carrier)
|
||||
cls._opentracing.tracer.inject(
|
||||
span, cls._opentracing.Format.HTTP_HEADERS, carrier
|
||||
)
|
||||
|
||||
for key, value in carrier.items():
|
||||
headers.addRawHeaders(key, value)
|
||||
|
||||
@staticmethod
|
||||
def inject_span_context_byte_dict(headers, span, destination):
|
||||
@classmethod
|
||||
@only_if_tracing
|
||||
def inject_active_span_byte_dict(cls, headers, destination):
|
||||
"""
|
||||
Injects a span context into a dict where the headers are encoded as byte
|
||||
strings
|
||||
|
@ -109,8 +298,12 @@ class TracerUtil:
|
|||
if not TracerUtil.whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
span = cls._opentracing.tracer.active_span
|
||||
|
||||
carrier = {}
|
||||
opentracing.tracer.inject(span, Format.HTTP_HEADERS, carrier)
|
||||
cls._opentracing.tracer.inject(
|
||||
span, cls._opentracing.Format.HTTP_HEADERS, carrier
|
||||
)
|
||||
|
||||
for key, value in carrier.items():
|
||||
headers[key.encode()] = [value.encode()]
|
||||
|
|
Loading…
Reference in New Issue