97 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			97 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
| import twisted.python.failure
 | |
| from twisted.internet import defer
 | |
| from twisted.internet import reactor
 | |
| from .. import unittest
 | |
| 
 | |
| from synapse.util.async import sleep
 | |
| from synapse.util import logcontext
 | |
| from synapse.util.logcontext import LoggingContext
 | |
| 
 | |
| 
 | |
| class LoggingContextTestCase(unittest.TestCase):
 | |
| 
 | |
|     def _check_test_key(self, value):
 | |
|         self.assertEquals(
 | |
|             LoggingContext.current_context().test_key, value
 | |
|         )
 | |
| 
 | |
|     def test_with_context(self):
 | |
|         with LoggingContext() as context_one:
 | |
|             context_one.test_key = "test"
 | |
|             self._check_test_key("test")
 | |
| 
 | |
|     @defer.inlineCallbacks
 | |
|     def test_sleep(self):
 | |
|         @defer.inlineCallbacks
 | |
|         def competing_callback():
 | |
|             with LoggingContext() as competing_context:
 | |
|                 competing_context.test_key = "competing"
 | |
|                 yield sleep(0)
 | |
|                 self._check_test_key("competing")
 | |
| 
 | |
|         reactor.callLater(0, competing_callback)
 | |
| 
 | |
|         with LoggingContext() as context_one:
 | |
|             context_one.test_key = "one"
 | |
|             yield sleep(0)
 | |
|             self._check_test_key("one")
 | |
| 
 | |
|     def _test_preserve_fn(self, function):
 | |
|         sentinel_context = LoggingContext.current_context()
 | |
| 
 | |
|         callback_completed = [False]
 | |
| 
 | |
|         @defer.inlineCallbacks
 | |
|         def cb():
 | |
|             context_one.test_key = "one"
 | |
|             yield function()
 | |
|             self._check_test_key("one")
 | |
| 
 | |
|             callback_completed[0] = True
 | |
| 
 | |
|         with LoggingContext() as context_one:
 | |
|             context_one.test_key = "one"
 | |
| 
 | |
|             # fire off function, but don't wait on it.
 | |
|             logcontext.preserve_fn(cb)()
 | |
| 
 | |
|             self._check_test_key("one")
 | |
| 
 | |
|         # now wait for the function under test to have run, and check that
 | |
|         # the logcontext is left in a sane state.
 | |
|         d2 = defer.Deferred()
 | |
| 
 | |
|         def check_logcontext():
 | |
|             if not callback_completed[0]:
 | |
|                 reactor.callLater(0.01, check_logcontext)
 | |
|                 return
 | |
| 
 | |
|             # make sure that the context was reset before it got thrown back
 | |
|             # into the reactor
 | |
|             try:
 | |
|                 self.assertIs(LoggingContext.current_context(),
 | |
|                               sentinel_context)
 | |
|                 d2.callback(None)
 | |
|             except BaseException:
 | |
|                 d2.errback(twisted.python.failure.Failure())
 | |
| 
 | |
|         reactor.callLater(0.01, check_logcontext)
 | |
| 
 | |
|         # test is done once d2 finishes
 | |
|         return d2
 | |
| 
 | |
|     def test_preserve_fn_with_blocking_fn(self):
 | |
|         @defer.inlineCallbacks
 | |
|         def blocking_function():
 | |
|             yield sleep(0)
 | |
| 
 | |
|         return self._test_preserve_fn(blocking_function)
 | |
| 
 | |
|     def test_preserve_fn_with_non_blocking_fn(self):
 | |
|         @defer.inlineCallbacks
 | |
|         def nonblocking_function():
 | |
|             with logcontext.PreserveLoggingContext():
 | |
|                 yield defer.succeed(None)
 | |
| 
 | |
|         return self._test_preserve_fn(nonblocking_function)
 |