Fixes for opentracing scopes (#11869)
`start_active_span` was inconsistent as to whether it would activate the span immediately, or wait for `scope.__enter__` to happen (it depended on whether the current logcontext already had an associated scope). The inconsistency was rather confusing if you were hoping to set up a couple of separate spans before activating either. Looking at the other implementations of opentracing `ScopeManager`s, the intention is that it *should* be activated immediately, as the name implies. Indeed, the idea is that you don't have to use the scope as a contextmanager at all - you can just call `.close` on the result. Hence, our cleanup has to happen in `.close` rather than `.__exit__`. So, the main change here is to ensure that `start_active_span` does activate the span, and that `scope.close()` does close the scope. We also add some tests, which requires a `tracer` param so that we don't have to rely on the global variable in unit tests.anoa/as_edu_fixes
parent
a8da046907
commit
31b554c297
|
@ -0,0 +1 @@
|
|||
Ensure that `opentracing` scopes are activated and closed at the right time.
|
|
@ -443,10 +443,14 @@ def start_active_span(
|
|||
start_time=None,
|
||||
ignore_active_span=False,
|
||||
finish_on_close=True,
|
||||
*,
|
||||
tracer=None,
|
||||
):
|
||||
"""Starts an active opentracing span. Note, the scope doesn't become active
|
||||
until it has been entered, however, the span starts from the time this
|
||||
message is called.
|
||||
"""Starts an active opentracing span.
|
||||
|
||||
Records the start time for the span, and sets it as the "active span" in the
|
||||
scope manager.
|
||||
|
||||
Args:
|
||||
See opentracing.tracer
|
||||
Returns:
|
||||
|
@ -456,7 +460,11 @@ def start_active_span(
|
|||
if opentracing is None:
|
||||
return noop_context_manager() # type: ignore[unreachable]
|
||||
|
||||
return opentracing.tracer.start_active_span(
|
||||
if tracer is None:
|
||||
# use the global tracer by default
|
||||
tracer = opentracing.tracer
|
||||
|
||||
return tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=child_of,
|
||||
references=references,
|
||||
|
@ -468,7 +476,11 @@ def start_active_span(
|
|||
|
||||
|
||||
def start_active_span_follows_from(
|
||||
operation_name: str, contexts: Collection, inherit_force_tracing=False
|
||||
operation_name: str,
|
||||
contexts: Collection,
|
||||
*,
|
||||
inherit_force_tracing=False,
|
||||
tracer=None,
|
||||
):
|
||||
"""Starts an active opentracing span, with additional references to previous spans
|
||||
|
||||
|
@ -477,12 +489,17 @@ def start_active_span_follows_from(
|
|||
contexts: the previous spans to inherit from
|
||||
inherit_force_tracing: if set, and any of the previous contexts have had tracing
|
||||
forced, the new span will also have tracing forced.
|
||||
tracer: override the opentracing tracer. By default the global tracer is used.
|
||||
"""
|
||||
if opentracing is None:
|
||||
return noop_context_manager() # type: ignore[unreachable]
|
||||
|
||||
references = [opentracing.follows_from(context) for context in contexts]
|
||||
scope = start_active_span(operation_name, references=references)
|
||||
scope = start_active_span(
|
||||
operation_name,
|
||||
references=references,
|
||||
tracer=tracer,
|
||||
)
|
||||
|
||||
if inherit_force_tracing and any(
|
||||
is_context_forced_tracing(ctx) for ctx in contexts
|
||||
|
|
|
@ -28,8 +28,9 @@ class LogContextScopeManager(ScopeManager):
|
|||
The LogContextScopeManager tracks the active scope in opentracing
|
||||
by using the log contexts which are native to synapse. This is so
|
||||
that the basic opentracing api can be used across twisted defereds.
|
||||
(I would love to break logcontexts and this into an OS package. but
|
||||
let's wait for twisted's contexts to be released.)
|
||||
|
||||
It would be nice just to use opentracing's ContextVarsScopeManager,
|
||||
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
|
@ -65,29 +66,45 @@ class LogContextScopeManager(ScopeManager):
|
|||
Scope.close() on the returned instance.
|
||||
"""
|
||||
|
||||
enter_logcontext = False
|
||||
ctx = current_context()
|
||||
|
||||
if not ctx:
|
||||
# We don't want this scope to affect.
|
||||
logger.error("Tried to activate scope outside of loggingcontext")
|
||||
return Scope(None, span) # type: ignore[arg-type]
|
||||
elif ctx.scope is not None:
|
||||
# We want the logging scope to look exactly the same so we give it
|
||||
# a blank suffix
|
||||
|
||||
if ctx.scope is not None:
|
||||
# start a new logging context as a child of the existing one.
|
||||
# Doing so -- rather than updating the existing logcontext -- means that
|
||||
# creating several concurrent spans under the same logcontext works
|
||||
# correctly.
|
||||
ctx = nested_logging_context("")
|
||||
enter_logcontext = True
|
||||
else:
|
||||
# if there is no span currently associated with the current logcontext, we
|
||||
# just store the scope in it.
|
||||
#
|
||||
# This feels a bit dubious, but it does hack around a problem where a
|
||||
# span outlasts its parent logcontext (which would otherwise lead to
|
||||
# "Re-starting finished log context" errors).
|
||||
enter_logcontext = False
|
||||
|
||||
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
|
||||
ctx.scope = scope
|
||||
if enter_logcontext:
|
||||
ctx.__enter__()
|
||||
|
||||
return scope
|
||||
|
||||
|
||||
class _LogContextScope(Scope):
|
||||
"""
|
||||
A custom opentracing scope. The only significant difference is that it will
|
||||
close the log context it's related to if the logcontext was created specifically
|
||||
for this scope.
|
||||
A custom opentracing scope, associated with a LogContext
|
||||
|
||||
* filters out _DefGen_Return exceptions which arise from calling
|
||||
`defer.returnValue` in Twisted code
|
||||
|
||||
* When the scope is closed, the logcontext's active scope is reset to None.
|
||||
and - if enter_logcontext was set - the logcontext is finished too.
|
||||
"""
|
||||
|
||||
def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close):
|
||||
|
@ -101,8 +118,7 @@ class _LogContextScope(Scope):
|
|||
logcontext (LogContext):
|
||||
the logcontext to which this scope is attached.
|
||||
enter_logcontext (Boolean):
|
||||
if True the logcontext will be entered and exited when the scope
|
||||
is entered and exited respectively
|
||||
if True the logcontext will be exited when the scope is finished
|
||||
finish_on_close (Boolean):
|
||||
if True finish the span when the scope is closed
|
||||
"""
|
||||
|
@ -111,26 +127,28 @@ class _LogContextScope(Scope):
|
|||
self._finish_on_close = finish_on_close
|
||||
self._enter_logcontext = enter_logcontext
|
||||
|
||||
def __enter__(self):
|
||||
if self._enter_logcontext:
|
||||
self.logcontext.__enter__()
|
||||
def __exit__(self, exc_type, value, traceback):
|
||||
if exc_type == twisted.internet.defer._DefGen_Return:
|
||||
# filter out defer.returnValue() calls
|
||||
exc_type = value = traceback = None
|
||||
super().__exit__(exc_type, value, traceback)
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if type == twisted.internet.defer._DefGen_Return:
|
||||
super().__exit__(None, None, None)
|
||||
else:
|
||||
super().__exit__(type, value, traceback)
|
||||
if self._enter_logcontext:
|
||||
self.logcontext.__exit__(type, value, traceback)
|
||||
else: # the logcontext existed before the creation of the scope
|
||||
self.logcontext.scope = None
|
||||
def __str__(self):
|
||||
return f"Scope<{self.span}>"
|
||||
|
||||
def close(self):
|
||||
if self.manager.active is not self:
|
||||
logger.error("Tried to close a non-active scope!")
|
||||
return
|
||||
active_scope = self.manager.active
|
||||
if active_scope is not self:
|
||||
logger.error(
|
||||
"Closing scope %s which is not the currently-active one %s",
|
||||
self,
|
||||
active_scope,
|
||||
)
|
||||
|
||||
if self._finish_on_close:
|
||||
self.span.finish()
|
||||
|
||||
self.logcontext.scope = None
|
||||
|
||||
if self._enter_logcontext:
|
||||
self.logcontext.__exit__(None, None, None)
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.test.proto_helpers import MemoryReactorClock
|
||||
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.opentracing import (
|
||||
start_active_span,
|
||||
start_active_span_follows_from,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
|
||||
try:
|
||||
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||
except ImportError:
|
||||
LogContextScopeManager = None # type: ignore
|
||||
|
||||
try:
|
||||
import jaeger_client
|
||||
except ImportError:
|
||||
jaeger_client = None # type: ignore
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
|
||||
class LogContextScopeManagerTestCase(TestCase):
|
||||
if LogContextScopeManager is None:
|
||||
skip = "Requires opentracing" # type: ignore[unreachable]
|
||||
if jaeger_client is None:
|
||||
skip = "Requires jaeger_client" # type: ignore[unreachable]
|
||||
|
||||
def setUp(self) -> None:
|
||||
# since this is a unit test, we don't really want to mess around with the
|
||||
# global variables that power opentracing. We create our own tracer instance
|
||||
# and test with it.
|
||||
|
||||
scope_manager = LogContextScopeManager({})
|
||||
config = jaeger_client.config.Config(
|
||||
config={}, service_name="test", scope_manager=scope_manager
|
||||
)
|
||||
|
||||
self._reporter = jaeger_client.reporter.InMemoryReporter()
|
||||
|
||||
self._tracer = config.create_tracer(
|
||||
sampler=jaeger_client.ConstSampler(True),
|
||||
reporter=self._reporter,
|
||||
)
|
||||
|
||||
def test_start_active_span(self) -> None:
|
||||
# the scope manager assumes a logging context of some sort.
|
||||
with LoggingContext("root context"):
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
# start_active_span should start and activate a span.
|
||||
scope = start_active_span("span", tracer=self._tracer)
|
||||
span = scope.span
|
||||
self.assertEqual(self._tracer.active_span, span)
|
||||
self.assertIsNotNone(span.start_time)
|
||||
|
||||
# entering the context doesn't actually do a whole lot.
|
||||
with scope as ctx:
|
||||
self.assertIs(ctx, scope)
|
||||
self.assertEqual(self._tracer.active_span, span)
|
||||
|
||||
# ... but leaving it unsets the active span, and finishes the span.
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
self.assertIsNotNone(span.end_time)
|
||||
|
||||
# the span should have been reported
|
||||
self.assertEqual(self._reporter.get_spans(), [span])
|
||||
|
||||
def test_nested_spans(self) -> None:
|
||||
"""Starting two spans off inside each other should work"""
|
||||
|
||||
with LoggingContext("root context"):
|
||||
with start_active_span("root span", tracer=self._tracer) as root_scope:
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
scope1 = start_active_span(
|
||||
"child1",
|
||||
tracer=self._tracer,
|
||||
)
|
||||
self.assertEqual(
|
||||
self._tracer.active_span, scope1.span, "child1 was not activated"
|
||||
)
|
||||
self.assertEqual(
|
||||
scope1.span.context.parent_id, root_scope.span.context.span_id
|
||||
)
|
||||
|
||||
scope2 = start_active_span_follows_from(
|
||||
"child2",
|
||||
contexts=(scope1,),
|
||||
tracer=self._tracer,
|
||||
)
|
||||
self.assertEqual(self._tracer.active_span, scope2.span)
|
||||
self.assertEqual(
|
||||
scope2.span.context.parent_id, scope1.span.context.span_id
|
||||
)
|
||||
|
||||
with scope1, scope2:
|
||||
pass
|
||||
|
||||
# the root scope should be restored
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
self.assertIsNotNone(scope2.span.end_time)
|
||||
self.assertIsNotNone(scope1.span.end_time)
|
||||
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
# the spans should be reported in order of their finishing.
|
||||
self.assertEqual(
|
||||
self._reporter.get_spans(), [scope2.span, scope1.span, root_scope.span]
|
||||
)
|
||||
|
||||
def test_overlapping_spans(self) -> None:
|
||||
"""Overlapping spans which are not neatly nested should work"""
|
||||
reactor = MemoryReactorClock()
|
||||
clock = Clock(reactor)
|
||||
|
||||
scopes = []
|
||||
|
||||
async def task(i: int):
|
||||
scope = start_active_span(
|
||||
f"task{i}",
|
||||
tracer=self._tracer,
|
||||
)
|
||||
scopes.append(scope)
|
||||
|
||||
self.assertEqual(self._tracer.active_span, scope.span)
|
||||
await clock.sleep(4)
|
||||
self.assertEqual(self._tracer.active_span, scope.span)
|
||||
scope.close()
|
||||
|
||||
async def root():
|
||||
with start_active_span("root span", tracer=self._tracer) as root_scope:
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
scopes.append(root_scope)
|
||||
|
||||
d1 = run_in_background(task, 1)
|
||||
await clock.sleep(2)
|
||||
d2 = run_in_background(task, 2)
|
||||
|
||||
# because we did run_in_background, the active span should still be the
|
||||
# root.
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults([d1, d2], consumeErrors=True)
|
||||
)
|
||||
|
||||
self.assertEqual(self._tracer.active_span, root_scope.span)
|
||||
|
||||
with LoggingContext("root context"):
|
||||
# start the test off
|
||||
d1 = defer.ensureDeferred(root())
|
||||
|
||||
# let the tasks complete
|
||||
reactor.pump((2,) * 8)
|
||||
|
||||
self.successResultOf(d1)
|
||||
self.assertIsNone(self._tracer.active_span)
|
||||
|
||||
# the spans should be reported in order of their finishing: task 1, task 2,
|
||||
# root.
|
||||
self.assertEqual(
|
||||
self._reporter.get_spans(),
|
||||
[scopes[1].span, scopes[2].span, scopes[0].span],
|
||||
)
|
Loading…
Reference in New Issue