Improved validation for received requests (#9817)
* Simplify `start_listening` callpath * Correctly check the size of uploaded filespull/9887/head
parent
84936e2264
commit
3ff2251754
|
@ -0,0 +1 @@
|
|||
Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced.
|
|
@ -17,6 +17,9 @@
|
|||
|
||||
"""Contains constants from the specification."""
|
||||
|
||||
# the max size of a (canonical-json-encoded) event
|
||||
MAX_PDU_SIZE = 65536
|
||||
|
||||
# the "depth" field on events is limited to 2**63 - 1
|
||||
MAX_DEPTH = 2 ** 63 - 1
|
||||
|
||||
|
|
|
@ -30,9 +30,10 @@ from twisted.internet import defer, error, reactor
|
|||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import MAX_PDU_SIZE
|
||||
from synapse.app import check_bind_error
|
||||
from synapse.app.phone_stats_home import start_phone_stats_home
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
|
@ -288,7 +289,7 @@ def refresh_certificate(hs):
|
|||
logger.info("Context factories updated.")
|
||||
|
||||
|
||||
async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
||||
async def start(hs: "synapse.server.HomeServer"):
|
||||
"""
|
||||
Start a Synapse server or worker.
|
||||
|
||||
|
@ -300,7 +301,6 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
|
|||
|
||||
Args:
|
||||
hs: homeserver instance
|
||||
listeners: Listener configuration ('listeners' in homeserver.yaml)
|
||||
"""
|
||||
# Set up the SIGHUP machinery.
|
||||
if hasattr(signal, "SIGHUP"):
|
||||
|
@ -336,7 +336,7 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
|
|||
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa
|
||||
|
||||
# It is now safe to start your Synapse.
|
||||
hs.start_listening(listeners)
|
||||
hs.start_listening()
|
||||
hs.get_datastore().db_pool.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
|
@ -530,3 +530,25 @@ def sdnotify(state):
|
|||
# this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
|
||||
# unless systemd is expecting us to notify it.
|
||||
logger.warning("Unable to send notification to systemd: %s", e)
|
||||
|
||||
|
||||
def max_request_body_size(config: HomeServerConfig) -> int:
|
||||
"""Get a suitable maximum size for incoming HTTP requests"""
|
||||
|
||||
# Other than media uploads, the biggest request we expect to see is a fully-loaded
|
||||
# /federation/v1/send request.
|
||||
#
|
||||
# The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are
|
||||
# limited to 65536 bytes (possibly slightly more if the sender didn't use canonical
|
||||
# json encoding); there is no specced limit to EDUs (see
|
||||
# https://github.com/matrix-org/matrix-doc/issues/3121).
|
||||
#
|
||||
# in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M)
|
||||
#
|
||||
max_request_size = 200 * MAX_PDU_SIZE
|
||||
|
||||
# if we have a media repo enabled, we may need to allow larger uploads than that
|
||||
if config.media.can_load_media_repo:
|
||||
max_request_size = max(max_request_size, config.media.max_upload_size)
|
||||
|
||||
return max_request_size
|
||||
|
|
|
@ -70,12 +70,6 @@ class AdminCmdSlavedStore(
|
|||
class AdminCmdServer(HomeServer):
|
||||
DATASTORE_CLASS = AdminCmdSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
pass
|
||||
|
||||
def start_listening(self, listeners):
|
||||
pass
|
||||
|
||||
|
||||
async def export_data_command(hs, args):
|
||||
"""Export data for a user.
|
||||
|
@ -232,7 +226,7 @@ def start(config_options):
|
|||
|
||||
async def run():
|
||||
with LoggingContext("command"):
|
||||
_base.start(ss, [])
|
||||
_base.start(ss)
|
||||
await args.func(ss, args)
|
||||
|
||||
_base.start_worker_reactor(
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
import sys
|
||||
from typing import Dict, Iterable, Optional
|
||||
from typing import Dict, Optional
|
||||
|
||||
from twisted.internet import address
|
||||
from twisted.web.resource import IResource
|
||||
|
@ -32,7 +32,7 @@ from synapse.api.urls import (
|
|||
SERVER_KEY_V2_PREFIX,
|
||||
)
|
||||
from synapse.app import _base
|
||||
from synapse.app._base import register_start
|
||||
from synapse.app._base import max_request_body_size, register_start
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
|
@ -367,6 +367,7 @@ class GenericWorkerServer(HomeServer):
|
|||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
max_request_body_size=max_request_body_size(self.config),
|
||||
reactor=self.get_reactor(),
|
||||
),
|
||||
reactor=self.get_reactor(),
|
||||
|
@ -374,8 +375,8 @@ class GenericWorkerServer(HomeServer):
|
|||
|
||||
logger.info("Synapse worker now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners: Iterable[ListenerConfig]):
|
||||
for listener in listeners:
|
||||
def start_listening(self):
|
||||
for listener in self.config.worker_listeners:
|
||||
if listener.type == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener.type == "manhole":
|
||||
|
@ -468,7 +469,7 @@ def start(config_options):
|
|||
# streams. Will no-op if no streams can be written to by this worker.
|
||||
hs.get_replication_streamer()
|
||||
|
||||
register_start(_base.start, hs, config.worker_listeners)
|
||||
register_start(_base.start, hs)
|
||||
|
||||
_base.start_worker_reactor("synapse-generic-worker", config)
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
import logging
|
||||
import os
|
||||
import sys
|
||||
from typing import Iterable, Iterator
|
||||
from typing import Iterator
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import EncodingResourceWrapper, IResource
|
||||
|
@ -36,7 +36,13 @@ from synapse.api.urls import (
|
|||
WEB_CLIENT_PREFIX,
|
||||
)
|
||||
from synapse.app import _base
|
||||
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
|
||||
from synapse.app._base import (
|
||||
listen_ssl,
|
||||
listen_tcp,
|
||||
max_request_body_size,
|
||||
quit_with_error,
|
||||
register_start,
|
||||
)
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
|
@ -132,6 +138,7 @@ class SynapseHomeServer(HomeServer):
|
|||
listener_config,
|
||||
create_resource_tree(resources, root_resource),
|
||||
self.version_string,
|
||||
max_request_body_size=max_request_body_size(self.config),
|
||||
reactor=self.get_reactor(),
|
||||
)
|
||||
|
||||
|
@ -268,14 +275,14 @@ class SynapseHomeServer(HomeServer):
|
|||
|
||||
return resources
|
||||
|
||||
def start_listening(self, listeners: Iterable[ListenerConfig]):
|
||||
def start_listening(self):
|
||||
if self.config.redis_enabled:
|
||||
# If redis is enabled we connect via the replication command handler
|
||||
# in the same way as the workers (since we're effectively a client
|
||||
# rather than a server).
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
for listener in listeners:
|
||||
for listener in self.config.server.listeners:
|
||||
if listener.type == "http":
|
||||
self._listening_services.extend(
|
||||
self._listener_http(self.config, listener)
|
||||
|
@ -407,7 +414,7 @@ def setup(config_options):
|
|||
# Loading the provider metadata also ensures the provider config is valid.
|
||||
await oidc.load_metadata()
|
||||
|
||||
await _base.start(hs, config.listeners)
|
||||
await _base.start(hs)
|
||||
|
||||
hs.get_datastore().db_pool.updates.start_doing_background_updates()
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ from twisted.logger import (
|
|||
)
|
||||
|
||||
import synapse
|
||||
from synapse.app import _base as appbase
|
||||
from synapse.logging._structured import setup_structured_logging
|
||||
from synapse.logging.context import LoggingContextFilter
|
||||
from synapse.logging.filter import MetadataFilter
|
||||
|
@ -318,6 +317,8 @@ def setup_logging(
|
|||
# Perform one-time logging configuration.
|
||||
_setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
|
||||
# Add a SIGHUP handler to reload the logging configuration, if one is available.
|
||||
from synapse.app import _base as appbase
|
||||
|
||||
appbase.register_sighup(_reload_logging_config, log_config_path)
|
||||
|
||||
# Log immediately so we can grep backwards.
|
||||
|
|
|
@ -21,7 +21,7 @@ from signedjson.key import decode_verify_key_bytes
|
|||
from signedjson.sign import SignatureVerifyException, verify_signed_json
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership
|
||||
from synapse.api.errors import AuthError, EventSizeError, SynapseError
|
||||
from synapse.api.room_versions import (
|
||||
KNOWN_ROOM_VERSIONS,
|
||||
|
@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None:
|
|||
too_big("type")
|
||||
if len(event.event_id) > 255:
|
||||
too_big("event_id")
|
||||
if len(encode_canonical_json(event.get_pdu_json())) > 65536:
|
||||
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
|
||||
too_big("event")
|
||||
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional, Tuple, Type, Union
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
|
@ -50,6 +50,7 @@ class SynapseRequest(Request):
|
|||
* Redaction of access_token query-params in __repr__
|
||||
* Logging at start and end
|
||||
* Metrics to record CPU, wallclock and DB time by endpoint.
|
||||
* A limit to the size of request which will be accepted
|
||||
|
||||
It also provides a method `processing`, which returns a context manager. If this
|
||||
method is called, the request won't be logged until the context manager is closed;
|
||||
|
@ -60,8 +61,9 @@ class SynapseRequest(Request):
|
|||
logcontext: the log context for this request
|
||||
"""
|
||||
|
||||
def __init__(self, channel, *args, **kw):
|
||||
def __init__(self, channel, *args, max_request_body_size=1024, **kw):
|
||||
Request.__init__(self, channel, *args, **kw)
|
||||
self._max_request_body_size = max_request_body_size
|
||||
self.site = channel.site # type: SynapseSite
|
||||
self._channel = channel # this is used by the tests
|
||||
self.start_time = 0.0
|
||||
|
@ -98,6 +100,18 @@ class SynapseRequest(Request):
|
|||
self.site.site_tag,
|
||||
)
|
||||
|
||||
def handleContentChunk(self, data):
|
||||
# we should have a `content` by now.
|
||||
assert self.content, "handleContentChunk() called before gotLength()"
|
||||
if self.content.tell() + len(data) > self._max_request_body_size:
|
||||
logger.warning(
|
||||
"Aborting connection from %s because the request exceeds maximum size",
|
||||
self.client,
|
||||
)
|
||||
self.transport.abortConnection()
|
||||
return
|
||||
super().handleContentChunk(data)
|
||||
|
||||
@property
|
||||
def requester(self) -> Optional[Union[Requester, str]]:
|
||||
return self._requester
|
||||
|
@ -505,6 +519,7 @@ class SynapseSite(Site):
|
|||
config: ListenerConfig,
|
||||
resource: IResource,
|
||||
server_version_string,
|
||||
max_request_body_size: int,
|
||||
reactor: IReactorTime,
|
||||
):
|
||||
"""
|
||||
|
@ -516,6 +531,8 @@ class SynapseSite(Site):
|
|||
resource: The base of the resource tree to be used for serving requests on
|
||||
this site
|
||||
server_version_string: A string to present for the Server header
|
||||
max_request_body_size: Maximum request body length to allow before
|
||||
dropping the connection
|
||||
reactor: reactor to be used to manage connection timeouts
|
||||
"""
|
||||
Site.__init__(self, resource, reactor=reactor)
|
||||
|
@ -524,9 +541,14 @@ class SynapseSite(Site):
|
|||
|
||||
assert config.http_options is not None
|
||||
proxied = config.http_options.x_forwarded
|
||||
self.requestFactory = (
|
||||
XForwardedForRequest if proxied else SynapseRequest
|
||||
) # type: Type[Request]
|
||||
request_class = XForwardedForRequest if proxied else SynapseRequest
|
||||
|
||||
def request_factory(channel, queued) -> Request:
|
||||
return request_class(
|
||||
channel, max_request_body_size=max_request_body_size, queued=queued
|
||||
)
|
||||
|
||||
self.requestFactory = request_factory # type: ignore
|
||||
self.access_logger = logging.getLogger(logger_name)
|
||||
self.server_version_string = server_version_string.encode("ascii")
|
||||
|
||||
|
|
|
@ -51,8 +51,6 @@ class UploadResource(DirectServeJsonResource):
|
|||
|
||||
async def _async_render_POST(self, request: SynapseRequest) -> None:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
# TODO: The checks here are a bit late. The content will have
|
||||
# already been uploaded to a tmp file at this point
|
||||
content_length = request.getHeader("Content-Length")
|
||||
if content_length is None:
|
||||
raise SynapseError(msg="Request must specify a Content-Length", code=400)
|
||||
|
|
|
@ -287,6 +287,14 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||
if self.config.run_background_tasks:
|
||||
self.setup_background_tasks()
|
||||
|
||||
def start_listening(self) -> None:
|
||||
"""Start the HTTP, manhole, metrics, etc listeners
|
||||
|
||||
Does nothing in this base class; overridden in derived classes to start the
|
||||
appropriate listeners.
|
||||
"""
|
||||
pass
|
||||
|
||||
def setup_background_tasks(self) -> None:
|
||||
"""
|
||||
Some handlers have side effects on instantiation (like registering
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet.address import IPv6Address
|
||||
from twisted.test.proto_helpers import StringTransport
|
||||
|
||||
from synapse.app.homeserver import SynapseHomeServer
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class SynapseRequestTestCase(HomeserverTestCase):
|
||||
def make_homeserver(self, reactor, clock):
|
||||
return self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer)
|
||||
|
||||
def test_large_request(self):
|
||||
"""overlarge HTTP requests should be rejected"""
|
||||
self.hs.start_listening()
|
||||
|
||||
# find the HTTP server which is configured to listen on port 0
|
||||
(port, factory, _backlog, interface) = self.reactor.tcpServers[0]
|
||||
self.assertEqual(interface, "::")
|
||||
self.assertEqual(port, 0)
|
||||
|
||||
# as a control case, first send a regular request.
|
||||
|
||||
# complete the connection and wire it up to a fake transport
|
||||
client_address = IPv6Address("TCP", "::1", "2345")
|
||||
protocol = factory.buildProtocol(client_address)
|
||||
transport = StringTransport()
|
||||
protocol.makeConnection(transport)
|
||||
|
||||
protocol.dataReceived(
|
||||
b"POST / HTTP/1.1\r\n"
|
||||
b"Connection: close\r\n"
|
||||
b"Transfer-Encoding: chunked\r\n"
|
||||
b"\r\n"
|
||||
b"0\r\n"
|
||||
b"\r\n"
|
||||
)
|
||||
|
||||
while not transport.disconnecting:
|
||||
self.reactor.advance(1)
|
||||
|
||||
# we should get a 404
|
||||
self.assertRegex(transport.value().decode(), r"^HTTP/1\.1 404 ")
|
||||
|
||||
# now send an oversized request
|
||||
protocol = factory.buildProtocol(client_address)
|
||||
transport = StringTransport()
|
||||
protocol.makeConnection(transport)
|
||||
|
||||
protocol.dataReceived(
|
||||
b"POST / HTTP/1.1\r\n"
|
||||
b"Connection: close\r\n"
|
||||
b"Transfer-Encoding: chunked\r\n"
|
||||
b"\r\n"
|
||||
)
|
||||
|
||||
# we deliberately send all the data in one big chunk, to ensure that
|
||||
# twisted isn't buffering the data in the chunked transfer decoder.
|
||||
# we start with the chunk size, in hex. (We won't actually send this much)
|
||||
protocol.dataReceived(b"10000000\r\n")
|
||||
sent = 0
|
||||
while not transport.disconnected:
|
||||
self.assertLess(sent, 0x10000000, "connection did not drop")
|
||||
protocol.dataReceived(b"\0" * 1024)
|
||||
sent += 1024
|
||||
|
||||
# default max upload size is 50M, so it should drop on the next buffer after
|
||||
# that.
|
||||
self.assertEqual(sent, 50 * 1024 * 1024 + 1024)
|
|
@ -359,6 +359,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
config=worker_hs.config.server.listeners[0],
|
||||
resource=resource,
|
||||
server_version_string="1",
|
||||
max_request_body_size=4096,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
|
|
|
@ -202,6 +202,7 @@ class OptionsResourceTests(unittest.TestCase):
|
|||
parse_listener_def({"type": "http", "port": 0}),
|
||||
self.resource,
|
||||
"1.0",
|
||||
max_request_body_size=1234,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
|
|
|
@ -247,6 +247,7 @@ class HomeserverTestCase(TestCase):
|
|||
config=self.hs.config.server.listeners[0],
|
||||
resource=self.resource,
|
||||
server_version_string="1",
|
||||
max_request_body_size=1234,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue