diff --git a/changelog.d/12877.bugfix b/changelog.d/12877.bugfix new file mode 100644 index 0000000000..1ecf448baf --- /dev/null +++ b/changelog.d/12877.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.54 which could sometimes cause exceptions when handling federated traffic. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 9ce06dfa28..25df1905c6 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -49,11 +49,6 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -# Send join responses can be huge, so we set a separate limit here. The response -# is parsed in a streaming manner, which helps alleviate the issue of memory -# usage a bit. -MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024 - class TransportLayerClient: """Sends federation HTTP requests to other servers""" @@ -349,7 +344,6 @@ class TransportLayerClient: path=path, data=content, parser=SendJoinParser(room_version, v1_api=True), - max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN, ) async def send_join_v2( @@ -372,7 +366,6 @@ class TransportLayerClient: args=query_params, data=content, parser=SendJoinParser(room_version, v1_api=False), - max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN, ) async def send_leave_v1( @@ -1360,6 +1353,11 @@ class SendJoinParser(ByteParser[SendJoinResponse]): CONTENT_TYPE = "application/json" + # /send_join responses can be huge, so we override the size limit here. The response + # is parsed in a streaming manner, which helps alleviate the issue of memory + # usage a bit. + MAX_RESPONSE_SIZE = 500 * 1024 * 1024 + def __init__(self, room_version: RoomVersion, v1_api: bool): self._response = SendJoinResponse([], [], event_dict={}) self._room_version = room_version @@ -1427,6 +1425,9 @@ class _StateParser(ByteParser[StateRequestResponse]): CONTENT_TYPE = "application/json" + # As with /send_join, /state responses can be huge. + MAX_RESPONSE_SIZE = 500 * 1024 * 1024 + def __init__(self, room_version: RoomVersion): self._response = StateRequestResponse([], []) self._room_version = room_version diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 0b9475debd..db44721ef5 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -92,9 +92,6 @@ incoming_responses_counter = Counter( "synapse_http_matrixfederationclient_responses", "", ["method", "code"] ) -# a federation response can be rather large (eg a big state_ids is 50M or so), so we -# need a generous limit here. -MAX_RESPONSE_SIZE = 100 * 1024 * 1024 MAX_LONG_RETRIES = 10 MAX_SHORT_RETRIES = 3 @@ -116,6 +113,11 @@ class ByteParser(ByteWriteable, Generic[T], abc.ABC): the content type doesn't match we fail the request. """ + # a federation response can be rather large (eg a big state_ids is 50M or so), so we + # need a generous limit here. + MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024 + """The largest response this parser will accept.""" + @abc.abstractmethod def finish(self) -> T: """Called when response has finished streaming and the parser should @@ -203,7 +205,6 @@ async def _handle_response( response: IResponse, start_ms: int, parser: ByteParser[T], - max_response_size: Optional[int] = None, ) -> T: """ Reads the body of a response with a timeout and sends it to a parser @@ -215,15 +216,12 @@ async def _handle_response( response: response to the request start_ms: Timestamp when request was made parser: The parser for the response - max_response_size: The maximum size to read from the response, if None - uses the default. Returns: The parsed response """ - if max_response_size is None: - max_response_size = MAX_RESPONSE_SIZE + max_response_size = parser.MAX_RESPONSE_SIZE try: check_content_type_is(response.headers, parser.CONTENT_TYPE) @@ -240,7 +238,7 @@ async def _handle_response( "{%s} [%s] JSON response exceeded max size %i - %s %s", request.txn_id, request.destination, - MAX_RESPONSE_SIZE, + max_response_size, request.method, request.uri.decode("ascii"), ) @@ -772,7 +770,6 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Literal[None] = None, - max_response_size: Optional[int] = None, ) -> Union[JsonDict, list]: ... @@ -790,7 +787,6 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser[T]] = None, - max_response_size: Optional[int] = None, ) -> T: ... @@ -807,7 +803,6 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser] = None, - max_response_size: Optional[int] = None, ): """Sends the specified json data using PUT @@ -843,8 +838,6 @@ class MatrixFederationHttpClient: enabled. parser: The parser to use to decode the response. Defaults to parsing as JSON. - max_response_size: The maximum size to read from the response, if None - uses the default. Returns: Succeeds when we get a 2xx HTTP response. The @@ -895,7 +888,6 @@ class MatrixFederationHttpClient: response, start_ms, parser=parser, - max_response_size=max_response_size, ) return body @@ -984,7 +976,6 @@ class MatrixFederationHttpClient: ignore_backoff: bool = False, try_trailing_slash_on_400: bool = False, parser: Literal[None] = None, - max_response_size: Optional[int] = None, ) -> Union[JsonDict, list]: ... @@ -999,7 +990,6 @@ class MatrixFederationHttpClient: ignore_backoff: bool = ..., try_trailing_slash_on_400: bool = ..., parser: ByteParser[T] = ..., - max_response_size: Optional[int] = ..., ) -> T: ... @@ -1013,7 +1003,6 @@ class MatrixFederationHttpClient: ignore_backoff: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser] = None, - max_response_size: Optional[int] = None, ): """GETs some json from the given host homeserver and path @@ -1043,9 +1032,6 @@ class MatrixFederationHttpClient: parser: The parser to use to decode the response. Defaults to parsing as JSON. - max_response_size: The maximum size to read from the response. If None, - uses the default. - Returns: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. @@ -1090,7 +1076,6 @@ class MatrixFederationHttpClient: response, start_ms, parser=parser, - max_response_size=max_response_size, ) return body diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 638babae69..006dbab093 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -26,7 +26,7 @@ from twisted.web.http import HTTPChannel from synapse.api.errors import RequestSendFailed from synapse.http.matrixfederationclient import ( - MAX_RESPONSE_SIZE, + JsonParser, MatrixFederationHttpClient, MatrixFederationRequest, ) @@ -609,9 +609,9 @@ class FederationClientTests(HomeserverTestCase): while not test_d.called: protocol.dataReceived(b"a" * chunk_size) sent += chunk_size - self.assertLessEqual(sent, MAX_RESPONSE_SIZE) + self.assertLessEqual(sent, JsonParser.MAX_RESPONSE_SIZE) - self.assertEqual(sent, MAX_RESPONSE_SIZE) + self.assertEqual(sent, JsonParser.MAX_RESPONSE_SIZE) f = self.failureResultOf(test_d) self.assertIsInstance(f.value, RequestSendFailed)