239 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			239 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
| # Copyright 2018 New Vector Ltd
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import functools
 | |
| import sys
 | |
| from typing import Any, Callable, Generator, List, TypeVar, cast
 | |
| 
 | |
| from typing_extensions import ParamSpec
 | |
| 
 | |
| from twisted.internet import defer
 | |
| from twisted.internet.defer import Deferred
 | |
| from twisted.python.failure import Failure
 | |
| 
 | |
| # Tracks if we've already patched inlineCallbacks
 | |
| _already_patched = False
 | |
| 
 | |
| 
 | |
| T = TypeVar("T")
 | |
| P = ParamSpec("P")
 | |
| 
 | |
| 
 | |
| def do_patch() -> None:
 | |
|     """
 | |
|     Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit
 | |
|     """
 | |
| 
 | |
|     from synapse.logging.context import current_context
 | |
| 
 | |
|     global _already_patched
 | |
| 
 | |
|     orig_inline_callbacks = defer.inlineCallbacks
 | |
|     if _already_patched:
 | |
|         return
 | |
| 
 | |
|     def new_inline_callbacks(
 | |
|         f: Callable[P, Generator["Deferred[object]", object, T]]
 | |
|     ) -> Callable[P, "Deferred[T]"]:
 | |
|         @functools.wraps(f)
 | |
|         def wrapped(*args: P.args, **kwargs: P.kwargs) -> "Deferred[T]":
 | |
|             start_context = current_context()
 | |
|             changes: List[str] = []
 | |
|             orig: Callable[P, "Deferred[T]"] = orig_inline_callbacks(
 | |
|                 _check_yield_points(f, changes)
 | |
|             )
 | |
| 
 | |
|             try:
 | |
|                 res: "Deferred[T]" = orig(*args, **kwargs)
 | |
|             except Exception:
 | |
|                 if current_context() != start_context:
 | |
|                     for err in changes:
 | |
|                         print(err, file=sys.stderr)
 | |
| 
 | |
|                     err = "%s changed context from %s to %s on exception" % (
 | |
|                         f,
 | |
|                         start_context,
 | |
|                         current_context(),
 | |
|                     )
 | |
|                     print(err, file=sys.stderr)
 | |
|                     raise Exception(err)
 | |
|                 raise
 | |
| 
 | |
|             if not isinstance(res, Deferred) or res.called:
 | |
|                 if current_context() != start_context:
 | |
|                     for err in changes:
 | |
|                         print(err, file=sys.stderr)
 | |
| 
 | |
|                     err = "Completed %s changed context from %s to %s" % (
 | |
|                         f,
 | |
|                         start_context,
 | |
|                         current_context(),
 | |
|                     )
 | |
|                     # print the error to stderr because otherwise all we
 | |
|                     # see in travis-ci is the 500 error
 | |
|                     print(err, file=sys.stderr)
 | |
|                     raise Exception(err)
 | |
|                 return res
 | |
| 
 | |
|             if current_context():
 | |
|                 err = (
 | |
|                     "%s returned incomplete deferred in non-sentinel context "
 | |
|                     "%s (start was %s)"
 | |
|                 ) % (f, current_context(), start_context)
 | |
|                 print(err, file=sys.stderr)
 | |
|                 raise Exception(err)
 | |
| 
 | |
|             def check_ctx(r: T) -> T:
 | |
|                 if current_context() != start_context:
 | |
|                     for err in changes:
 | |
|                         print(err, file=sys.stderr)
 | |
|                     err = "%s completion of %s changed context from %s to %s" % (
 | |
|                         "Failure" if isinstance(r, Failure) else "Success",
 | |
|                         f,
 | |
|                         start_context,
 | |
|                         current_context(),
 | |
|                     )
 | |
|                     print(err, file=sys.stderr)
 | |
|                     raise Exception(err)
 | |
|                 return r
 | |
| 
 | |
|             res.addBoth(check_ctx)
 | |
|             return res
 | |
| 
 | |
|         return wrapped
 | |
| 
 | |
|     defer.inlineCallbacks = new_inline_callbacks
 | |
|     _already_patched = True
 | |
| 
 | |
| 
 | |
| def _check_yield_points(
 | |
|     f: Callable[P, Generator["Deferred[object]", object, T]],
 | |
|     changes: List[str],
 | |
| ) -> Callable:
 | |
|     """Wraps a generator that is about to be passed to defer.inlineCallbacks
 | |
|     checking that after every yield the log contexts are correct.
 | |
| 
 | |
|     It's perfectly valid for log contexts to change within a function, e.g. due
 | |
|     to new Measure blocks, so such changes are added to the given `changes`
 | |
|     list instead of triggering an exception.
 | |
| 
 | |
|     Args:
 | |
|         f: generator function to wrap
 | |
|         changes: A list of strings detailing how the contexts
 | |
|             changed within a function.
 | |
| 
 | |
|     Returns:
 | |
|         function
 | |
|     """
 | |
| 
 | |
|     from synapse.logging.context import current_context
 | |
| 
 | |
|     @functools.wraps(f)
 | |
|     def check_yield_points_inner(
 | |
|         *args: P.args, **kwargs: P.kwargs
 | |
|     ) -> Generator["Deferred[object]", object, T]:
 | |
|         gen = f(*args, **kwargs)
 | |
| 
 | |
|         last_yield_line_no = gen.gi_frame.f_lineno
 | |
|         result: Any = None
 | |
|         while True:
 | |
|             expected_context = current_context()
 | |
| 
 | |
|             try:
 | |
|                 isFailure = isinstance(result, Failure)
 | |
|                 if isFailure:
 | |
|                     d = result.throwExceptionIntoGenerator(gen)
 | |
|                 else:
 | |
|                     d = gen.send(result)
 | |
|             except (StopIteration, defer._DefGen_Return) as e:
 | |
|                 if current_context() != expected_context:
 | |
|                     # This happens when the context is lost sometime *after* the
 | |
|                     # final yield and returning. E.g. we forgot to yield on a
 | |
|                     # function that returns a deferred.
 | |
|                     #
 | |
|                     # We don't raise here as it's perfectly valid for contexts to
 | |
|                     # change in a function, as long as it sets the correct context
 | |
|                     # on resolving (which is checked separately).
 | |
|                     err = (
 | |
|                         "Function %r returned and changed context from %s to %s,"
 | |
|                         " in %s between %d and end of func"
 | |
|                         % (
 | |
|                             f.__qualname__,
 | |
|                             expected_context,
 | |
|                             current_context(),
 | |
|                             f.__code__.co_filename,
 | |
|                             last_yield_line_no,
 | |
|                         )
 | |
|                     )
 | |
|                     changes.append(err)
 | |
|                 # The `StopIteration` or `_DefGen_Return` contains the return value from the
 | |
|                 # generator.
 | |
|                 return cast(T, e.value)
 | |
| 
 | |
|             frame = gen.gi_frame
 | |
| 
 | |
|             if isinstance(d, defer.Deferred) and not d.called:
 | |
|                 # This happens if we yield on a deferred that doesn't follow
 | |
|                 # the log context rules without wrapping in a `make_deferred_yieldable`.
 | |
|                 # We raise here as this should never happen.
 | |
|                 if current_context():
 | |
|                     err = (
 | |
|                         "%s yielded with context %s rather than sentinel,"
 | |
|                         " yielded on line %d in %s"
 | |
|                         % (
 | |
|                             frame.f_code.co_name,
 | |
|                             current_context(),
 | |
|                             frame.f_lineno,
 | |
|                             frame.f_code.co_filename,
 | |
|                         )
 | |
|                     )
 | |
|                     raise Exception(err)
 | |
| 
 | |
|             # the wrapped function yielded a Deferred: yield it back up to the parent
 | |
|             # inlineCallbacks().
 | |
|             try:
 | |
|                 result = yield d
 | |
|             except Exception:
 | |
|                 # this will fish an earlier Failure out of the stack where possible, and
 | |
|                 # thus is preferable to passing in an exception to the Failure
 | |
|                 # constructor, since it results in less stack-mangling.
 | |
|                 result = Failure()
 | |
| 
 | |
|             if current_context() != expected_context:
 | |
| 
 | |
|                 # This happens because the context is lost sometime *after* the
 | |
|                 # previous yield and *after* the current yield. E.g. the
 | |
|                 # deferred we waited on didn't follow the rules, or we forgot to
 | |
|                 # yield on a function between the two yield points.
 | |
|                 #
 | |
|                 # We don't raise here as its perfectly valid for contexts to
 | |
|                 # change in a function, as long as it sets the correct context
 | |
|                 # on resolving (which is checked separately).
 | |
|                 err = (
 | |
|                     "%s changed context from %s to %s, happened between lines %d and %d in %s"
 | |
|                     % (
 | |
|                         frame.f_code.co_name,
 | |
|                         expected_context,
 | |
|                         current_context(),
 | |
|                         last_yield_line_no,
 | |
|                         frame.f_lineno,
 | |
|                         frame.f_code.co_filename,
 | |
|                     )
 | |
|                 )
 | |
|                 changes.append(err)
 | |
| 
 | |
|             last_yield_line_no = frame.f_lineno
 | |
| 
 | |
|     return check_yield_points_inner
 |