diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3e0cd294a1..547c6aec80 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -99,7 +99,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - def on_incoming_transaction(self, transaction_data): + def on_incoming_transaction(self, origin, transaction_data): # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() @@ -108,34 +108,33 @@ class FederationServer(FederationBase): if not transaction.transaction_id: raise Exception("Transaction missing transaction_id") - if not transaction.origin: - raise Exception("Transaction missing origin") logger.debug("[%s] Got transaction", transaction.transaction_id) # use a linearizer to ensure that we don't process the same transaction # multiple times in parallel. with (yield self._transaction_linearizer.queue( - (transaction.origin, transaction.transaction_id), + (origin, transaction.transaction_id), )): result = yield self._handle_incoming_transaction( - transaction, request_time, + origin, transaction, request_time, ) defer.returnValue(result) @defer.inlineCallbacks - def _handle_incoming_transaction(self, transaction, request_time): + def _handle_incoming_transaction(self, origin, transaction, request_time): """ Process an incoming transaction and return the HTTP response Args: + origin (unicode): the server making the request transaction (Transaction): incoming transaction request_time (int): timestamp that the HTTP request arrived at Returns: Deferred[(int, object)]: http response code and body """ - response = yield self.transaction_actions.have_responded(transaction) + response = yield self.transaction_actions.have_responded(origin, transaction) if response: logger.debug( @@ -149,7 +148,7 @@ class FederationServer(FederationBase): received_pdus_counter.inc(len(transaction.pdus)) - origin_host, _ = parse_server_name(transaction.origin) + origin_host, _ = parse_server_name(origin) pdus_by_room = {} @@ -190,7 +189,7 @@ class FederationServer(FederationBase): event_id = pdu.event_id try: yield self._handle_received_pdu( - transaction.origin, pdu + origin, pdu ) pdu_results[event_id] = {} except FederationError as e: @@ -212,7 +211,7 @@ class FederationServer(FederationBase): if hasattr(transaction, "edus"): for edu in (Edu(**x) for x in transaction.edus): yield self.received_edu( - transaction.origin, + origin, edu.edu_type, edu.content ) @@ -224,6 +223,7 @@ class FederationServer(FederationBase): logger.debug("Returning: %s", str(response)) yield self.transaction_actions.set_response( + origin, transaction, 200, response ) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 9146215c21..74ffd13b4f 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -36,7 +36,7 @@ class TransactionActions(object): self.store = datastore @log_function - def have_responded(self, transaction): + def have_responded(self, origin, transaction): """ Have we already responded to a transaction with the same id and origin? @@ -50,11 +50,11 @@ class TransactionActions(object): "transaction_id") return self.store.get_received_txn_response( - transaction.transaction_id, transaction.origin + transaction.transaction_id, origin ) @log_function - def set_response(self, transaction, code, response): + def set_response(self, origin, transaction, code, response): """ Persist how we responded to a transaction. Returns: @@ -66,7 +66,7 @@ class TransactionActions(object): return self.store.set_received_txn_response( transaction.transaction_id, - transaction.origin, + origin, code, response, ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 77969a4f38..8cde9716ac 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -353,7 +353,7 @@ class FederationSendServlet(BaseFederationServlet): try: code, response = yield self.handler.on_incoming_transaction( - transaction_data + origin, transaction_data, ) except Exception: logger.exception("on_incoming_transaction failed") diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index ad58073a14..c2d951b45f 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -33,7 +33,7 @@ from ..utils import ( ) -def _expect_edu(destination, edu_type, content, origin="test"): +def _expect_edu_transaction(edu_type, content, origin="test"): return { "origin": origin, "origin_server_ts": 1000000, @@ -42,8 +42,8 @@ def _expect_edu(destination, edu_type, content, origin="test"): } -def _make_edu_json(origin, edu_type, content): - return json.dumps(_expect_edu("test", edu_type, content, origin=origin)).encode( +def _make_edu_transaction_json(edu_type, content): + return json.dumps(_expect_edu_transaction(edu_type, content)).encode( 'utf8' ) @@ -190,8 +190,7 @@ class TypingNotificationsTestCase(unittest.TestCase): call( "farm", path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu( - "farm", + data=_expect_edu_transaction( "m.typing", content={ "room_id": self.room_id, @@ -221,11 +220,10 @@ class TypingNotificationsTestCase(unittest.TestCase): self.assertEquals(self.event_source.get_current_key(), 0) - yield self.mock_federation_resource.trigger( + (code, response) = yield self.mock_federation_resource.trigger( "PUT", "/_matrix/federation/v1/send/1000000/", - _make_edu_json( - "farm", + _make_edu_transaction_json( "m.typing", content={ "room_id": self.room_id, @@ -233,7 +231,7 @@ class TypingNotificationsTestCase(unittest.TestCase): "typing": True, }, ), - federation_auth=True, + federation_auth_origin=b'farm', ) self.on_new_event.assert_has_calls( @@ -264,8 +262,7 @@ class TypingNotificationsTestCase(unittest.TestCase): call( "farm", path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu( - "farm", + data=_expect_edu_transaction( "m.typing", content={ "room_id": self.room_id, diff --git a/tests/utils.py b/tests/utils.py index bb0fc74054..8de2898b2f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -306,7 +306,10 @@ class MockHttpResource(HttpServer): @patch('twisted.web.http.Request') @defer.inlineCallbacks - def trigger(self, http_method, path, content, mock_request, federation_auth=False): + def trigger( + self, http_method, path, content, mock_request, + federation_auth_origin=None, + ): """ Fire an HTTP event. Args: @@ -315,6 +318,7 @@ class MockHttpResource(HttpServer): content : The HTTP body mock_request : Mocked request to pass to the event so it can get content. + federation_auth_origin (bytes|None): domain to authenticate as, for federation Returns: A tuple of (code, response) Raises: @@ -335,8 +339,10 @@ class MockHttpResource(HttpServer): mock_request.getClientIP.return_value = "-" headers = {} - if federation_auth: - headers[b"Authorization"] = [b"X-Matrix origin=test,key=,sig="] + if federation_auth_origin is not None: + headers[b"Authorization"] = [ + b"X-Matrix origin=%s,key=,sig=" % (federation_auth_origin, ) + ] mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers) # return the right path if the event requires it