diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 5434f999c9..13a2af291d 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -241,7 +241,7 @@ def start(hs, listeners=None): refresh_certificate(hs) # Start the tracer - synapse.config.tracer.init_tracing(hs.config) + synapse.util.tracerutils.TracerUtil.init_tracer(hs.config) # It is now safe to start your Synapse. hs.start_listening(listeners) diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py index cc22e68dea..8cfbc8d85c 100644 --- a/synapse/config/tracer.py +++ b/synapse/config/tracer.py @@ -13,11 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from jaeger_client import Config as JaegerConfig - -from synapse.util.scopecontextmanager import LogContextScopeManager -from synapse.util.tracerutils import TracerUtil - from ._base import Config, ConfigError @@ -55,32 +50,3 @@ class TracerConfig(Config): # homeserver_whitelist: # - ".*" """ - - -def init_tracing(config): - """Set the whitelists and initialise the JaegerClient tracer - - Args: - config (Config) - The config used by the homserver. Here it's used to set the service - name to the homeserver's. - """ - - name = config.worker_name if config.worker_name else "master" - - if config.tracer_config.get("tracer_enabled", False): - TracerUtil.set_homeserver_whitelist( - config.tracer_config["homeserver_whitelist"] - ) - jaeger_config = JaegerConfig( - config={"sampler": {"type": "const", "param": 1}, "logging": True}, - service_name="{} {}".format(config.server_name, name), - scope_manager=LogContextScopeManager(config), - ) - else: # The tracer is not configured so we instantiate a noop tracer - jaeger_config = JaegerConfig( - config={"sampler": {"type": "const", "param": 0}}, - service_name=config.server_name, - ) - - return jaeger_config.initialize_tracer() diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 8d806c949b..7e84582a7a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -23,10 +23,8 @@ from six import PY3, raise_from, string_types from six.moves import urllib import attr -import opentracing import treq from canonicaljson import encode_canonical_json -from opentracing import tags from prometheus_client import Counter from signedjson.sign import sign_json from zope.interface import implementer @@ -343,26 +341,24 @@ class MatrixFederationHttpClient(object): query_bytes = b"" # Retreive current span - scope = opentracing.tracer.start_active_span( + TracerUtil.start_active_span( "outgoing-federation-request", tags={ - tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, - tags.PEER_ADDRESS: request.destination, - tags.HTTP_METHOD: request.method, - tags.HTTP_URL: request.path, + TracerUtil.Tags.SPAN_KIND: TracerUtil.Tags.SPAN_KIND_RPC_CLIENT, + TracerUtil.Tags.PEER_ADDRESS: request.destination, + TracerUtil.Tags.HTTP_METHOD: request.method, + TracerUtil.Tags.HTTP_URL: request.path, }, finish_on_close=True, ) # Inject the span into the headers headers_dict = {} - TracerUtil.inject_span_context_byte_dict( - headers_dict, scope.span, request.destination - ) + TracerUtil.inject_active_span_byte_dict(headers_dict, request.destination) headers_dict[b"User-Agent"] = [self.version_string_bytes] - with limiter, scope: + with limiter: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -440,7 +436,7 @@ class MatrixFederationHttpClient(object): response.phrase.decode("ascii", errors="replace"), ) - scope.span.set_tag(opentracing.tags.HTTP_STATUS_CODE, response.code) + TracerUtil.set_tag(TracerUtil.Tags.HTTP_STATUS_CODE, response.code) if 200 <= response.code < 300: pass @@ -522,7 +518,7 @@ class MatrixFederationHttpClient(object): _flatten_response_never_received(e), ) raise - + TracerUtil.close_active_span() defer.returnValue(response) def build_auth_headers( diff --git a/synapse/http/site.py b/synapse/http/site.py index e093ec1c0e..313320a88c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -15,9 +15,6 @@ import contextlib import logging import time -import opentracing -from opentracing import tags - from twisted.web.server import Request, Site from synapse.http import redact_uri @@ -239,17 +236,16 @@ class SynapseRequest(Request): ) # Start a span - span_context = TracerUtil.extract_span_context(self.requestHeaders) - opentracing.tracer.start_active_span( + TracerUtil.start_active_span_from_context( + self.requestHeaders, "incoming-federation-request", tags={ "request_id": self.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: self.get_method(), - tags.HTTP_URL: self.get_redacted_uri(), - tags.PEER_HOST_IPV6: self.getClientIP(), + TracerUtil.Tags.SPAN_KIND: TracerUtil.Tags.SPAN_KIND_RPC_SERVER, + TracerUtil.Tags.HTTP_METHOD: self.get_method(), + TracerUtil.Tags.HTTP_URL: self.get_redacted_uri(), + TracerUtil.Tags.PEER_HOST_IPV6: self.getClientIP(), }, - child_of=span_context, ) def _finished_processing(self): @@ -321,11 +317,9 @@ class SynapseRequest(Request): usage.evt_db_fetch_count, ) - scope = opentracing.tracer.scope_manager.active - if scope is not None: - # finish the span if it's there. - scope.span.set_tag("peer.address", authenticated_entity) - scope.__exit__(None, None, None) + # finish the span if it's there. + TracerUtil.set_tag("peer.address", authenticated_entity) + TracerUtil.close_active_span() try: self.request_metrics.stop(self.finish_time, self.code, self.sentLength) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 41ffecff4c..51205c98e5 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -72,7 +72,6 @@ REQUIREMENTS = [ # Twisted 18.7.0 requires attrs>=17.4.0 "attrs>=17.4.0", "netaddr>=0.7.18", - "jaeger-client>=4.0.0", "Jinja2>=2.9", "bleach>=1.4.3", ] @@ -96,6 +95,7 @@ CONDITIONAL_REQUIREMENTS = { "url_preview": ["lxml>=3.5.0"], "test": ["mock>=2.0", "parameterized"], "sentry": ["sentry-sdk>=0.7.2"], + "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], } ALL_OPTIONAL_REQUIREMENTS = set() diff --git a/synapse/util/tracerutils.py b/synapse/util/tracerutils.py index 5a2e96e1cb..799766ea36 100644 --- a/synapse/util/tracerutils.py +++ b/synapse/util/tracerutils.py @@ -13,19 +13,179 @@ # See the License for the specific language governing permissions and # limitations under the License.import opentracing +import logging import re +from functools import wraps -import opentracing -from opentracing.propagation import Format - -# block everything by default +logger = logging.getLogger(__name__) -class TracerUtil: +def only_if_tracing(func): + """Executes the function only if we're tracing. Otherwise return. + Assumes the function wrapped may return None""" + + @wraps(func) + def f(cls, *args, **kwargs): + if cls._opentracing: + return func(cls, *args, **kwargs) + else: + return + + return f + + +class TracerUtil(object): + _opentracing = None + _opentracing_formats = None + + # Block everything by default _homeserver_whitelist = None - @staticmethod - def set_homeserver_whitelist(homeserver_whitelist): + @classmethod + def init_tracer(cls, config): + """Set the whitelists and initialise the JaegerClient tracer + + Args: + config (Config) + The config used by the homserver. Here it's used to set the service + name to the homeserver's. + """ + if not config.tracer_config.get("tracer_enabled", False): + # We don't have a tracer + return + + cls.import_opentracing() + cls.set_tags() + cls.setup_tracing(config) + + @classmethod + def import_opentracing(cls): + try: + # Try to import the tracer. If it's not there we want to throw an eror + import opentracing + except ImportError as e: + logger.error( + "The server has been configure to use opentracing but " + "the {} module has not been installed.".format(e.name) + ) + raise + + cls._opentracing = opentracing + cls.set_tags() + + @classmethod + def setup_tracing(cls, config): + try: + from jaeger_client import Config as JaegerConfig + from synapse.util.scopecontextmanager import LogContextScopeManager + except ImportError as e: + logger.error( + "The server has been configure to use opentracing but " + "the {} module has not been installed.".format(e.name) + ) + raise + + # Include the worker name + name = config.worker_name if config.worker_name else "master" + + cls.set_homeserver_whitelist(config.tracer_config["homeserver_whitelist"]) + jaeger_config = JaegerConfig( + config={"sampler": {"type": "const", "param": 1}, "logging": True}, + service_name="{} {}".format(config.server_name, name), + scope_manager=LogContextScopeManager(config), + ) + jaeger_config.initialize_tracer() + + class Tags(object): + """wrapper of opentracings tags. We need to have them if we + want to reference them without opentracing around. Clearly they + should never actually show up in a trace. `set_tags` overwrites + these with the correct ones.""" + + COMPONENT = "invlalid-tag" + DATABASE_INSTANCE = "invlalid-tag" + DATABASE_STATEMENT = "invlalid-tag" + DATABASE_TYPE = "invlalid-tag" + DATABASE_USER = "invlalid-tag" + ERROR = "invlalid-tag" + HTTP_METHOD = "invlalid-tag" + HTTP_STATUS_CODE = "invlalid-tag" + HTTP_URL = "invlalid-tag" + MESSAGE_BUS_DESTINATION = "invlalid-tag" + PEER_ADDRESS = "invlalid-tag" + PEER_HOSTNAME = "invlalid-tag" + PEER_HOST_IPV4 = "invlalid-tag" + PEER_HOST_IPV6 = "invlalid-tag" + PEER_PORT = "invlalid-tag" + PEER_SERVICE = "invlalid-tag" + SAMPLING_PRIORITY = "invlalid-tag" + SERVICE = "invlalid-tag" + SPAN_KIND = "invlalid-tag" + SPAN_KIND_CONSUMER = "invlalid-tag" + SPAN_KIND_PRODUCER = "invlalid-tag" + SPAN_KIND_RPC_CLIENT = "invlalid-tag" + SPAN_KIND_RPC_SERVER = "invlalid-tag" + + @classmethod + @only_if_tracing + def set_tags(cls): + cls.Tags = cls._opentracing.tags + + # Could use kwargs but I want these to be explicit + @classmethod + @only_if_tracing + def start_active_span( + cls, + operation_name, + child_of=None, + references=None, + tags=None, + start_time=None, + ignore_active_span=False, + finish_on_close=True, + ): + # We need to enter the scope here for the logcontext to become active + cls._opentracing.tracer.start_active_span( + operation_name, + child_of=child_of, + references=references, + tags=tags, + start_time=start_time, + ignore_active_span=ignore_active_span, + finish_on_close=finish_on_close, + ).__enter__() + + @classmethod + @only_if_tracing + def close_active_span(cls): + cls._opentracing.tracer.scope_manager.active.__exit__(None, None, None) + + @classmethod + @only_if_tracing + def set_tag(cls, key, value): + cls._opentracing.tracer.active_span.set_tag(key, value) + + @classmethod + @only_if_tracing + def log_kv(cls, key_values, timestamp=None): + cls._opentracing.tracer.active_span.log_kv(key_values, timestamp) + + # Note: we don't have a get baggage items because we're trying to hide all + # scope and span state from synapse. I think this method may also be useless + # as a result + @classmethod + @only_if_tracing + def set_baggage_item(cls, key, value): + cls._opentracing.tracer.active_span.set_baggage_item(key, value) + + @classmethod + @only_if_tracing + def set_operation_name(cls, operation_name): + cls._opentracing.tracer.active_span.set_operation_name(operation_name) + + @classmethod + @only_if_tracing + def set_homeserver_whitelist(cls, homeserver_whitelist): """Sets the whitelist Args: @@ -33,18 +193,29 @@ class TracerUtil: """ if homeserver_whitelist: # Makes a single regex which accepts all passed in regexes in the list - TracerUtil._homeserver_whitelist = re.compile( + cls._homeserver_whitelist = re.compile( "({})".format(")|(".join(homeserver_whitelist)) ) - @staticmethod - def whitelisted_homeserver(destination): - if TracerUtil._homeserver_whitelist: - return TracerUtil._homeserver_whitelist.match(destination) + @classmethod + @only_if_tracing + def whitelisted_homeserver(cls, destination): + if cls._homeserver_whitelist: + return cls._homeserver_whitelist.match(destination) return False - @staticmethod - def extract_span_context(headers): + @classmethod + @only_if_tracing + def start_active_span_from_context( + cls, + headers, + operation_name, + references=None, + tags=None, + start_time=None, + ignore_active_span=False, + finish_on_close=True, + ): """ Extracts a span context from Twisted Headers. args: @@ -56,10 +227,23 @@ class TracerUtil: # So, we take the first item in the list. # Also, twisted uses byte arrays while opentracing expects strings. header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()} - return opentracing.tracer.extract(Format.HTTP_HEADERS, header_dict) + context = cls._opentracing.tracer.extract( + cls._opentracing.Format.HTTP_HEADERS, header_dict + ) - @staticmethod - def inject_span_context(headers, span, destination): + cls._opentracing.tracer.start_active_span( + operation_name, + child_of=context, + references=references, + tags=tags, + start_time=start_time, + ignore_active_span=ignore_active_span, + finish_on_close=finish_on_close, + ) + + @classmethod + @only_if_tracing + def inject_active_span_twisted_headers(cls, headers, destination): """ Injects a span context into twisted headers inplace @@ -80,14 +264,19 @@ class TracerUtil: if not TracerUtil.whitelisted_homeserver(destination): return + + span = cls._opentracing.tracer.active_span carrier = {} - opentracing.tracer.inject(span, Format.HTTP_HEADERS, carrier) + cls._opentracing.tracer.inject( + span, cls._opentracing.Format.HTTP_HEADERS, carrier + ) for key, value in carrier.items(): headers.addRawHeaders(key, value) - @staticmethod - def inject_span_context_byte_dict(headers, span, destination): + @classmethod + @only_if_tracing + def inject_active_span_byte_dict(cls, headers, destination): """ Injects a span context into a dict where the headers are encoded as byte strings @@ -109,8 +298,12 @@ class TracerUtil: if not TracerUtil.whitelisted_homeserver(destination): return + span = cls._opentracing.tracer.active_span + carrier = {} - opentracing.tracer.inject(span, Format.HTTP_HEADERS, carrier) + cls._opentracing.tracer.inject( + span, cls._opentracing.Format.HTTP_HEADERS, carrier + ) for key, value in carrier.items(): headers[key.encode()] = [value.encode()]