242 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			242 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
| import json
 | |
| from io import BytesIO
 | |
| 
 | |
| from six import text_type
 | |
| 
 | |
| import attr
 | |
| 
 | |
| from twisted.internet import address, threads
 | |
| from twisted.internet.defer import Deferred
 | |
| from twisted.python.failure import Failure
 | |
| from twisted.test.proto_helpers import MemoryReactorClock
 | |
| 
 | |
| from synapse.http.site import SynapseRequest
 | |
| from synapse.util import Clock
 | |
| 
 | |
| from tests.utils import setup_test_homeserver as _sth
 | |
| 
 | |
| 
 | |
| @attr.s
 | |
| class FakeChannel(object):
 | |
|     """
 | |
|     A fake Twisted Web Channel (the part that interfaces with the
 | |
|     wire).
 | |
|     """
 | |
| 
 | |
|     result = attr.ib(default=attr.Factory(dict))
 | |
|     _producer = None
 | |
| 
 | |
|     @property
 | |
|     def json_body(self):
 | |
|         if not self.result:
 | |
|             raise Exception("No result yet.")
 | |
|         return json.loads(self.result["body"].decode('utf8'))
 | |
| 
 | |
|     @property
 | |
|     def code(self):
 | |
|         if not self.result:
 | |
|             raise Exception("No result yet.")
 | |
|         return int(self.result["code"])
 | |
| 
 | |
|     def writeHeaders(self, version, code, reason, headers):
 | |
|         self.result["version"] = version
 | |
|         self.result["code"] = code
 | |
|         self.result["reason"] = reason
 | |
|         self.result["headers"] = headers
 | |
| 
 | |
|     def write(self, content):
 | |
|         if "body" not in self.result:
 | |
|             self.result["body"] = b""
 | |
| 
 | |
|         self.result["body"] += content
 | |
| 
 | |
|     def registerProducer(self, producer, streaming):
 | |
|         self._producer = producer
 | |
| 
 | |
|     def unregisterProducer(self):
 | |
|         if self._producer is None:
 | |
|             return
 | |
| 
 | |
|         self._producer = None
 | |
| 
 | |
|     def requestDone(self, _self):
 | |
|         self.result["done"] = True
 | |
| 
 | |
|     def getPeer(self):
 | |
|         # We give an address so that getClientIP returns a non null entry,
 | |
|         # causing us to record the MAU
 | |
|         return address.IPv4Address(b"TCP", "127.0.0.1", 3423)
 | |
| 
 | |
|     def getHost(self):
 | |
|         return None
 | |
| 
 | |
|     @property
 | |
|     def transport(self):
 | |
|         return self
 | |
| 
 | |
| 
 | |
| class FakeSite:
 | |
|     """
 | |
|     A fake Twisted Web Site, with mocks of the extra things that
 | |
|     Synapse adds.
 | |
|     """
 | |
| 
 | |
|     server_version_string = b"1"
 | |
|     site_tag = "test"
 | |
| 
 | |
|     @property
 | |
|     def access_logger(self):
 | |
|         class FakeLogger:
 | |
|             def info(self, *args, **kwargs):
 | |
|                 pass
 | |
| 
 | |
|         return FakeLogger()
 | |
| 
 | |
| 
 | |
| def make_request(method, path, content=b"", access_token=None):
 | |
|     """
 | |
|     Make a web request using the given method and path, feed it the
 | |
|     content, and return the Request and the Channel underneath.
 | |
|     """
 | |
|     if not isinstance(method, bytes):
 | |
|         method = method.encode('ascii')
 | |
| 
 | |
|     if not isinstance(path, bytes):
 | |
|         path = path.encode('ascii')
 | |
| 
 | |
|     # Decorate it to be the full path
 | |
|     if not path.startswith(b"/_matrix"):
 | |
|         path = b"/_matrix/client/r0/" + path
 | |
|         path = path.replace(b"//", b"/")
 | |
| 
 | |
|     if isinstance(content, text_type):
 | |
|         content = content.encode('utf8')
 | |
| 
 | |
|     site = FakeSite()
 | |
|     channel = FakeChannel()
 | |
| 
 | |
|     req = SynapseRequest(site, channel)
 | |
|     req.process = lambda: b""
 | |
|     req.content = BytesIO(content)
 | |
| 
 | |
|     if access_token:
 | |
|         req.requestHeaders.addRawHeader(b"Authorization", b"Bearer " + access_token)
 | |
| 
 | |
|     req.requestHeaders.addRawHeader(b"X-Forwarded-For", b"127.0.0.1")
 | |
|     req.requestReceived(method, path, b"1.1")
 | |
| 
 | |
|     return req, channel
 | |
| 
 | |
| 
 | |
| def wait_until_result(clock, request, timeout=100):
 | |
|     """
 | |
|     Wait until the request is finished.
 | |
|     """
 | |
|     clock.run()
 | |
|     x = 0
 | |
| 
 | |
|     while not request.finished:
 | |
| 
 | |
|         # If there's a producer, tell it to resume producing so we get content
 | |
|         if request._channel._producer:
 | |
|             request._channel._producer.resumeProducing()
 | |
| 
 | |
|         x += 1
 | |
| 
 | |
|         if x > timeout:
 | |
|             raise Exception("Timed out waiting for request to finish.")
 | |
| 
 | |
|         clock.advance(0.1)
 | |
| 
 | |
| 
 | |
| def render(request, resource, clock):
 | |
|     request.render(resource)
 | |
|     wait_until_result(clock, request)
 | |
| 
 | |
| 
 | |
| class ThreadedMemoryReactorClock(MemoryReactorClock):
 | |
|     """
 | |
|     A MemoryReactorClock that supports callFromThread.
 | |
|     """
 | |
| 
 | |
|     def callFromThread(self, callback, *args, **kwargs):
 | |
|         """
 | |
|         Make the callback fire in the next reactor iteration.
 | |
|         """
 | |
|         d = Deferred()
 | |
|         d.addCallback(lambda x: callback(*args, **kwargs))
 | |
|         self.callLater(0, d.callback, True)
 | |
|         return d
 | |
| 
 | |
| 
 | |
| def setup_test_homeserver(cleanup_func, *args, **kwargs):
 | |
|     """
 | |
|     Set up a synchronous test server, driven by the reactor used by
 | |
|     the homeserver.
 | |
|     """
 | |
|     d = _sth(cleanup_func, *args, **kwargs).result
 | |
| 
 | |
|     if isinstance(d, Failure):
 | |
|         d.raiseException()
 | |
| 
 | |
|     # Make the thread pool synchronous.
 | |
|     clock = d.get_clock()
 | |
|     pool = d.get_db_pool()
 | |
| 
 | |
|     def runWithConnection(func, *args, **kwargs):
 | |
|         return threads.deferToThreadPool(
 | |
|             pool._reactor,
 | |
|             pool.threadpool,
 | |
|             pool._runWithConnection,
 | |
|             func,
 | |
|             *args,
 | |
|             **kwargs
 | |
|         )
 | |
| 
 | |
|     def runInteraction(interaction, *args, **kwargs):
 | |
|         return threads.deferToThreadPool(
 | |
|             pool._reactor,
 | |
|             pool.threadpool,
 | |
|             pool._runInteraction,
 | |
|             interaction,
 | |
|             *args,
 | |
|             **kwargs
 | |
|         )
 | |
| 
 | |
|     pool.runWithConnection = runWithConnection
 | |
|     pool.runInteraction = runInteraction
 | |
| 
 | |
|     class ThreadPool:
 | |
|         """
 | |
|         Threadless thread pool.
 | |
|         """
 | |
| 
 | |
|         def start(self):
 | |
|             pass
 | |
| 
 | |
|         def stop(self):
 | |
|             pass
 | |
| 
 | |
|         def callInThreadWithCallback(self, onResult, function, *args, **kwargs):
 | |
|             def _(res):
 | |
|                 if isinstance(res, Failure):
 | |
|                     onResult(False, res)
 | |
|                 else:
 | |
|                     onResult(True, res)
 | |
| 
 | |
|             d = Deferred()
 | |
|             d.addCallback(lambda x: function(*args, **kwargs))
 | |
|             d.addBoth(_)
 | |
|             clock._reactor.callLater(0, d.callback, True)
 | |
|             return d
 | |
| 
 | |
|     clock.threadpool = ThreadPool()
 | |
|     pool.threadpool = ThreadPool()
 | |
|     return d
 | |
| 
 | |
| 
 | |
| def get_clock():
 | |
|     clock = ThreadedMemoryReactorClock()
 | |
|     hs_clock = Clock(clock)
 | |
|     return (clock, hs_clock)
 |