From 192e98111d2d2fc18e151308120a411ff7357d5d Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 10 Sep 2020 13:08:08 +0100 Subject: [PATCH 1/8] Remove shared rooms info from upgrade/workers doc as it's still experimental (#8290) --- UPGRADE.rst | 13 ------------- docs/workers.md | 1 - 2 files changed, 14 deletions(-) diff --git a/UPGRADE.rst b/UPGRADE.rst index 77be1b2952..6492fa011f 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -1,16 +1,3 @@ -Upgrading to v1.20.0 -==================== - -Shared rooms endpoint (MSC2666) -------------------------------- - -This release contains a new unstable endpoint `/_matrix/client/unstable/uk.half-shot.msc2666/user/shared_rooms/.*` -for fetching rooms one user has in common with another. This feature requires the -`update_user_directory` config flag to be `True`. If you are you are using a `synapse.app.user_dir` -worker, requests to this endpoint must be handled by that worker. -See `docs/workers.md `_ for more details. - - Upgrading Synapse ================= diff --git a/docs/workers.md b/docs/workers.md index 7a8f5c89fc..bfec745897 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -380,7 +380,6 @@ Handles searches in the user directory. It can handle REST endpoints matching the following regular expressions: ^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$ - ^/_matrix/client/unstable/uk.half-shot.msc2666/user/shared_rooms/.*$ When using this worker you must also set `update_user_directory: False` in the shared configuration file to stop the main synapse running background From b86764662b6058dc53dd7cef923b806dc28f3a53 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 10 Sep 2020 14:55:25 -0400 Subject: [PATCH 2/8] Fix the exception that is raised when invalid JSON is encountered. (#8291) --- changelog.d/8291.bugfix | 1 + synapse/http/matrixfederationclient.py | 5 +- synapse/util/__init__.py | 2 +- tests/federation/test_federation_server.py | 33 +++++++++ tests/http/test_fedclient.py | 48 +++++++++++++ tests/http/test_servlet.py | 80 ++++++++++++++++++++++ 6 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 changelog.d/8291.bugfix create mode 100644 tests/http/test_servlet.py diff --git a/changelog.d/8291.bugfix b/changelog.d/8291.bugfix new file mode 100644 index 0000000000..bc01d26f53 --- /dev/null +++ b/changelog.d/8291.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.20.0rc1 that the wrong exception was raised when invalid JSON data is encountered. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 775fad3be4..5eaf3151ce 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -54,6 +54,7 @@ from synapse.logging.opentracing import ( start_active_span, tags, ) +from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -164,7 +165,9 @@ async def _handle_json_response( try: check_content_type_is_json(response.headers) - d = treq.json_content(response) + # Use the custom JSON decoder (partially re-implements treq.json_content). + d = treq.text_content(response, encoding="utf-8") + d.addCallback(json_decoder.decode) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = await make_deferred_yieldable(d) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 3ad4b28fc7..a13f11f8d8 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) def _reject_invalid_json(val): """Do not allow Infinity, -Infinity, or NaN values in JSON.""" - raise json.JSONDecodeError("Invalid JSON value: '%s'" % val) + raise ValueError("Invalid JSON value: '%s'" % val) # Create a custom encoder to reduce the whitespace produced by JSON encoding and diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 296dc887be..da933ecd75 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -15,6 +15,8 @@ # limitations under the License. import logging +from parameterized import parameterized + from synapse.events import make_event_from_dict from synapse.federation.federation_server import server_matches_acl_event from synapse.rest import admin @@ -23,6 +25,37 @@ from synapse.rest.client.v1 import login, room from tests import unittest +class FederationServerTests(unittest.FederatingHomeserverTestCase): + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + @parameterized.expand([(b"",), (b"foo",), (b'{"limit": Infinity}',)]) + def test_bad_request(self, query_content): + """ + Querying with bad data returns a reasonable error code. + """ + u1 = self.register_user("u1", "pass") + u1_token = self.login("u1", "pass") + + room_1 = self.helper.create_room_as(u1, tok=u1_token) + self.inject_room_member(room_1, "@user:other.example.com", "join") + + "/get_missing_events/(?P[^/]*)/?" + + request, channel = self.make_request( + "POST", + "/_matrix/federation/v1/get_missing_events/%s" % (room_1,), + query_content, + ) + self.render(request) + self.assertEquals(400, channel.code, channel.result) + self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON") + + class ServerACLsTestCase(unittest.TestCase): def test_blacklisted_server(self): e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]}) diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 7f1dfb2128..5604af3795 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -16,6 +16,7 @@ from mock import Mock from netaddr import IPSet +from parameterized import parameterized from twisted.internet import defer from twisted.internet.defer import TimeoutError @@ -511,3 +512,50 @@ class FederationClientTests(HomeserverTestCase): self.reactor.advance(120) self.assertTrue(conn.disconnecting) + + @parameterized.expand([(b"",), (b"foo",), (b'{"a": Infinity}',)]) + def test_json_error(self, return_value): + """ + Test what happens if invalid JSON is returned from the remote endpoint. + """ + + test_d = defer.ensureDeferred(self.cl.get_json("testserv:8008", "foo/bar")) + + self.pump() + + # Nothing happened yet + self.assertNoResult(test_d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8008) + + # complete the connection and wire it up to a fake transport + protocol = factory.buildProtocol(None) + transport = StringTransport() + protocol.makeConnection(transport) + + # that should have made it send the request to the transport + self.assertRegex(transport.value(), b"^GET /foo/bar") + self.assertRegex(transport.value(), b"Host: testserv:8008") + + # Deferred is still without a result + self.assertNoResult(test_d) + + # Send it the HTTP response + protocol.dataReceived( + b"HTTP/1.1 200 OK\r\n" + b"Server: Fake\r\n" + b"Content-Type: application/json\r\n" + b"Content-Length: %i\r\n" + b"\r\n" + b"%s" % (len(return_value), return_value) + ) + + self.pump() + + f = self.failureResultOf(test_d) + self.assertIsInstance(f.value, ValueError) diff --git a/tests/http/test_servlet.py b/tests/http/test_servlet.py new file mode 100644 index 0000000000..45089158ce --- /dev/null +++ b/tests/http/test_servlet.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from io import BytesIO + +from mock import Mock + +from synapse.api.errors import SynapseError +from synapse.http.servlet import ( + parse_json_object_from_request, + parse_json_value_from_request, +) + +from tests import unittest + + +def make_request(content): + """Make an object that acts enough like a request.""" + request = Mock(spec=["content"]) + + if isinstance(content, dict): + content = json.dumps(content).encode("utf8") + + request.content = BytesIO(content) + return request + + +class TestServletUtils(unittest.TestCase): + def test_parse_json_value(self): + """Basic tests for parse_json_value_from_request.""" + # Test round-tripping. + obj = {"foo": 1} + result = parse_json_value_from_request(make_request(obj)) + self.assertEqual(result, obj) + + # Results don't have to be objects. + result = parse_json_value_from_request(make_request(b'["foo"]')) + self.assertEqual(result, ["foo"]) + + # Test empty. + with self.assertRaises(SynapseError): + parse_json_value_from_request(make_request(b"")) + + result = parse_json_value_from_request(make_request(b""), allow_empty_body=True) + self.assertIsNone(result) + + # Invalid UTF-8. + with self.assertRaises(SynapseError): + parse_json_value_from_request(make_request(b"\xFF\x00")) + + # Invalid JSON. + with self.assertRaises(SynapseError): + parse_json_value_from_request(make_request(b"foo")) + + with self.assertRaises(SynapseError): + parse_json_value_from_request(make_request(b'{"foo": Infinity}')) + + def test_parse_json_object(self): + """Basic tests for parse_json_object_from_request.""" + # Test empty. + result = parse_json_object_from_request( + make_request(b""), allow_empty_body=True + ) + self.assertEqual(result, {}) + + # Test not an object + with self.assertRaises(SynapseError): + parse_json_object_from_request(make_request(b'["foo"]')) From fe8ed1b46f781faa45d1bba8f9308cf47c42010f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Sep 2020 12:22:55 +0100 Subject: [PATCH 3/8] Make `StreamToken.room_key` be a `RoomStreamToken` instance. (#8281) --- changelog.d/8281.misc | 1 + mypy.ini | 2 + synapse/handlers/admin.py | 6 +- synapse/handlers/device.py | 12 ++-- synapse/handlers/initial_sync.py | 4 +- synapse/handlers/message.py | 1 + synapse/handlers/pagination.py | 4 +- synapse/handlers/room.py | 15 +++-- synapse/handlers/sync.py | 11 ++-- synapse/notifier.py | 16 +++-- synapse/storage/__init__.py | 5 +- synapse/storage/databases/main/events.py | 21 +++++-- synapse/storage/databases/main/stream.py | 75 ++++++++++-------------- synapse/storage/persist_events.py | 16 +++-- synapse/types.py | 19 +++--- tests/test_utils/event_injection.py | 5 +- 16 files changed, 114 insertions(+), 99 deletions(-) create mode 100644 changelog.d/8281.misc diff --git a/changelog.d/8281.misc b/changelog.d/8281.misc new file mode 100644 index 0000000000..74357120a7 --- /dev/null +++ b/changelog.d/8281.misc @@ -0,0 +1 @@ +Change `StreamToken.room_key` to be a `RoomStreamToken` instance. diff --git a/mypy.ini b/mypy.ini index 460392377e..7986781432 100644 --- a/mypy.ini +++ b/mypy.ini @@ -46,10 +46,12 @@ files = synapse/server_notices, synapse/spam_checker_api, synapse/state, + synapse/storage/databases/main/events.py, synapse/storage/databases/main/stream.py, synapse/storage/databases/main/ui_auth.py, synapse/storage/database.py, synapse/storage/engines, + synapse/storage/persist_events.py, synapse/storage/state.py, synapse/storage/util, synapse/streams, diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 918d0e037c..5e5a64037d 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -125,8 +125,8 @@ class AdminHandler(BaseHandler): else: stream_ordering = room.stream_ordering - from_key = str(RoomStreamToken(0, 0)) - to_key = str(RoomStreamToken(None, stream_ordering)) + from_key = RoomStreamToken(0, 0) + to_key = RoomStreamToken(None, stream_ordering) written_events = set() # Events that we've processed in this room @@ -153,7 +153,7 @@ class AdminHandler(BaseHandler): if not events: break - from_key = events[-1].internal_metadata.after + from_key = RoomStreamToken.parse(events[-1].internal_metadata.after) events = await filter_events_for_client(self.storage, user_id, events) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 643d71a710..4b0a4f96cc 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,6 +29,7 @@ from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ( RoomStreamToken, + StreamToken, get_domain_from_id, get_verify_key_from_cross_signing_key, ) @@ -104,18 +105,15 @@ class DeviceWorkerHandler(BaseHandler): @trace @measure_func("device.get_user_ids_changed") - async def get_user_ids_changed(self, user_id, from_token): + async def get_user_ids_changed(self, user_id: str, from_token: StreamToken): """Get list of users that have had the devices updated, or have newly joined a room, that `user_id` may be interested in. - - Args: - user_id (str) - from_token (StreamToken) """ set_tag("user_id", user_id) set_tag("from_token", from_token) - now_room_key = await self.store.get_room_events_max_id() + now_room_id = self.store.get_room_max_stream_ordering() + now_room_key = RoomStreamToken(None, now_room_id) room_ids = await self.store.get_rooms_for_user(user_id) @@ -142,7 +140,7 @@ class DeviceWorkerHandler(BaseHandler): ) rooms_changed.update(event.room_id for event in member_events) - stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream + stream_ordering = from_token.room_key.stream possibly_changed = set(changed) possibly_left = set() diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index ddb8f0712b..ba4828c713 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage.roommember import RoomsForUser from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, StreamToken, UserID +from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID from synapse.util import unwrapFirstError from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -167,7 +167,7 @@ class InitialSyncHandler(BaseHandler): self.state_handler.get_current_state, event.room_id ) elif event.membership == Membership.LEAVE: - room_end_token = "s%d" % (event.stream_ordering,) + room_end_token = RoomStreamToken(None, event.stream_ordering,) deferred_room_state = run_in_background( self.state_store.get_state_for_events, [event.event_id] ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 276de8f8d0..e54e2b322b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -973,6 +973,7 @@ class EventCreationHandler: This should only be run on the instance in charge of persisting events. """ assert self._is_event_writer + assert self.storage.persistence is not None if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index ec17d3d888..d929a68f7d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -344,7 +344,7 @@ class PaginationHandler: # gets called. raise Exception("limit not set") - room_token = RoomStreamToken.parse(from_token.room_key) + room_token = from_token.room_key with await self.pagination_lock.read(room_id): ( @@ -381,7 +381,7 @@ class PaginationHandler: if leave_token.topological < max_topo: from_token = from_token.copy_and_replace( - "room_key", leave_token_str + "room_key", leave_token ) await self.hs.get_handlers().federation_handler.maybe_backfill( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a29305f655..53d85ab97d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1091,20 +1091,19 @@ class RoomEventSource: async def get_new_events( self, user: UserID, - from_key: str, + from_key: RoomStreamToken, limit: int, room_ids: List[str], is_guest: bool, explicit_room_id: Optional[str] = None, - ) -> Tuple[List[EventBase], str]: + ) -> Tuple[List[EventBase], RoomStreamToken]: # We just ignore the key for now. to_key = self.get_current_key() - from_token = RoomStreamToken.parse(from_key) - if from_token.topological: + if from_key.topological: logger.warning("Stream has topological part!!!! %r", from_key) - from_key = "s%s" % (from_token.stream,) + from_key = RoomStreamToken(None, from_key.stream) app_service = self.store.get_app_service_by_user_id(user.to_string()) if app_service: @@ -1133,14 +1132,14 @@ class RoomEventSource: events[:] = events[:limit] if events: - end_key = events[-1].internal_metadata.after + end_key = RoomStreamToken.parse(events[-1].internal_metadata.after) else: end_key = to_key return (events, end_key) - def get_current_key(self) -> str: - return "s%d" % (self.store.get_room_max_stream_ordering(),) + def get_current_key(self) -> RoomStreamToken: + return RoomStreamToken(None, self.store.get_room_max_stream_ordering()) def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: return self.store.get_room_events_max_id(room_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cc47e8b62c..a615c7c2f0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -378,7 +378,7 @@ class SyncHandler: sync_config = sync_result_builder.sync_config with Measure(self.clock, "ephemeral_by_room"): - typing_key = since_token.typing_key if since_token else "0" + typing_key = since_token.typing_key if since_token else 0 room_ids = sync_result_builder.joined_room_ids @@ -402,7 +402,7 @@ class SyncHandler: event_copy = {k: v for (k, v) in event.items() if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) - receipt_key = since_token.receipt_key if since_token else "0" + receipt_key = since_token.receipt_key if since_token else 0 receipt_source = self.event_sources.sources["receipt"] receipts, receipt_key = await receipt_source.get_new_events( @@ -533,7 +533,7 @@ class SyncHandler: if len(recents) > timeline_limit: limited = True recents = recents[-timeline_limit:] - room_key = recents[0].internal_metadata.before + room_key = RoomStreamToken.parse(recents[0].internal_metadata.before) prev_batch_token = now_token.copy_and_replace("room_key", room_key) @@ -1322,6 +1322,7 @@ class SyncHandler: is_guest=sync_config.is_guest, include_offline=include_offline, ) + assert presence_key sync_result_builder.now_token = now_token.copy_and_replace( "presence_key", presence_key ) @@ -1484,7 +1485,7 @@ class SyncHandler: if rooms_changed: return True - stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream + stream_id = since_token.room_key.stream for room_id in sync_result_builder.joined_room_ids: if self.store.has_room_changed_since(room_id, stream_id): return True @@ -1750,7 +1751,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) + "room_key", RoomStreamToken(None, event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/notifier.py b/synapse/notifier.py index 16f19c938e..12cd84b27b 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,6 +25,7 @@ from typing import ( Set, Tuple, TypeVar, + Union, ) from prometheus_client import Counter @@ -41,7 +42,7 @@ from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.streams.config import PaginationConfig -from synapse.types import Collection, StreamToken, UserID +from synapse.types import Collection, RoomStreamToken, StreamToken, UserID from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -111,7 +112,9 @@ class _NotifierUserStream: with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) - def notify(self, stream_key: str, stream_id: int, time_now_ms: int): + def notify( + self, stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int, + ): """Notify any listeners for this user of a new event from an event source. Args: @@ -294,7 +297,12 @@ class Notifier: rooms.add(event.room_id) if users or rooms: - self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms) + self.on_new_event( + "room_key", + RoomStreamToken(None, max_room_stream_id), + users=users, + rooms=rooms, + ) self._on_updated_room_token(max_room_stream_id) def _on_updated_room_token(self, max_room_stream_id: int): @@ -329,7 +337,7 @@ class Notifier: def on_new_event( self, stream_key: str, - new_token: int, + new_token: Union[int, RoomStreamToken], users: Collection[UserID] = [], rooms: Collection[str] = [], ): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8e5d78f6f7..bbff3c8d5b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -47,6 +47,9 @@ class Storage: # interfaces. self.main = stores.main - self.persistence = EventsPersistenceStorage(hs, stores) self.purge_events = PurgeEventsStorage(hs, stores) self.state = StateGroupStorage(hs, stores) + + self.persistence = None + if stores.persist_events: + self.persistence = EventsPersistenceStorage(hs, stores) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b3d27a2ee7..9cd1403b38 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -213,7 +213,7 @@ class PersistEventsStore: Returns: Filtered event ids """ - results = [] + results = [] # type: List[str] def _get_events_which_are_prevs_txn(txn, batch): sql = """ @@ -631,7 +631,9 @@ class PersistEventsStore: ) @classmethod - def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): + def _filter_events_and_contexts_for_duplicates( + cls, events_and_contexts: List[Tuple[EventBase, EventContext]] + ) -> List[Tuple[EventBase, EventContext]]: """Ensure that we don't have the same event twice. Pick the earliest non-outlier if there is one, else the earliest one. @@ -641,7 +643,9 @@ class PersistEventsStore: Returns: list[(EventBase, EventContext)]: filtered list """ - new_events_and_contexts = OrderedDict() + new_events_and_contexts = ( + OrderedDict() + ) # type: OrderedDict[str, Tuple[EventBase, EventContext]] for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) if prev_event_context: @@ -655,7 +659,12 @@ class PersistEventsStore: new_events_and_contexts[event.event_id] = (event, context) return list(new_events_and_contexts.values()) - def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): + def _update_room_depths_txn( + self, + txn, + events_and_contexts: List[Tuple[EventBase, EventContext]], + backfilled: bool, + ): """Update min_depth for each room Args: @@ -664,7 +673,7 @@ class PersistEventsStore: we are persisting backfilled (bool): True if the events were backfilled """ - depth_updates = {} + depth_updates = {} # type: Dict[str, int] for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) @@ -1436,7 +1445,7 @@ class PersistEventsStore: Forward extremities are handled when we first start persisting the events. """ - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for ev in events: events_by_room.setdefault(ev.room_id, []).append(ev) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 08a13a8b47..2e95518752 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -310,11 +310,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): async def get_room_events_stream_for_rooms( self, room_ids: Collection[str], - from_key: str, - to_key: str, + from_key: RoomStreamToken, + to_key: RoomStreamToken, limit: int = 0, order: str = "DESC", - ) -> Dict[str, Tuple[List[EventBase], str]]: + ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]: """Get new room events in stream ordering since `from_key`. Args: @@ -333,9 +333,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): - list of recent events in the room - stream ordering key for the start of the chunk of events returned. """ - from_id = RoomStreamToken.parse_stream_token(from_key).stream - - room_ids = self._events_stream_cache.get_entities_changed(room_ids, from_id) + room_ids = self._events_stream_cache.get_entities_changed( + room_ids, from_key.stream + ) if not room_ids: return {} @@ -364,16 +364,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return results def get_rooms_that_changed( - self, room_ids: Collection[str], from_key: str + self, room_ids: Collection[str], from_key: RoomStreamToken ) -> Set[str]: """Given a list of rooms and a token, return rooms where there may have been changes. - - Args: - room_ids - from_key: The room_key portion of a StreamToken """ - from_id = RoomStreamToken.parse_stream_token(from_key).stream + from_id = from_key.stream return { room_id for room_id in room_ids @@ -383,11 +379,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): async def get_room_events_stream_for_room( self, room_id: str, - from_key: str, - to_key: str, + from_key: RoomStreamToken, + to_key: RoomStreamToken, limit: int = 0, order: str = "DESC", - ) -> Tuple[List[EventBase], str]: + ) -> Tuple[List[EventBase], RoomStreamToken]: """Get new room events in stream ordering since `from_key`. Args: @@ -408,8 +404,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if from_key == to_key: return [], from_key - from_id = RoomStreamToken.parse_stream_token(from_key).stream - to_id = RoomStreamToken.parse_stream_token(to_key).stream + from_id = from_key.stream + to_id = to_key.stream has_changed = self._events_stream_cache.has_entity_changed(room_id, from_id) @@ -441,7 +437,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = "s%d" % min(r.stream_ordering for r in rows) + key = RoomStreamToken(None, min(r.stream_ordering for r in rows)) else: # Assume we didn't get anything because there was nothing to # get. @@ -450,10 +446,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret, key async def get_membership_changes_for_user( - self, user_id: str, from_key: str, to_key: str + self, user_id: str, from_key: RoomStreamToken, to_key: RoomStreamToken ) -> List[EventBase]: - from_id = RoomStreamToken.parse_stream_token(from_key).stream - to_id = RoomStreamToken.parse_stream_token(to_key).stream + from_id = from_key.stream + to_id = to_key.stream if from_key == to_key: return [] @@ -491,8 +487,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret async def get_recent_events_for_room( - self, room_id: str, limit: int, end_token: str - ) -> Tuple[List[EventBase], str]: + self, room_id: str, limit: int, end_token: RoomStreamToken + ) -> Tuple[List[EventBase], RoomStreamToken]: """Get the most recent events in the room in topological ordering. Args: @@ -518,8 +514,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return (events, token) async def get_recent_event_ids_for_room( - self, room_id: str, limit: int, end_token: str - ) -> Tuple[List[_EventDictReturn], str]: + self, room_id: str, limit: int, end_token: RoomStreamToken + ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: """Get the most recent events in the room in topological ordering. Args: @@ -535,13 +531,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if limit == 0: return [], end_token - parsed_end_token = RoomStreamToken.parse(end_token) - rows, token = await self.db_pool.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, room_id, - from_token=parsed_end_token, + from_token=end_token, limit=limit, ) @@ -619,17 +613,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): allow_none=allow_none, ) - async def get_stream_token_for_event(self, event_id: str) -> str: + async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken: """The stream token for an event Args: event_id: The id of the event to look up a stream token for. Raises: StoreError if the event wasn't in the database. Returns: - A "s%d" stream token. + A stream token. """ stream_id = await self.get_stream_id_for_event(event_id) - return "s%d" % (stream_id,) + return RoomStreamToken(None, stream_id) async def get_topological_token_for_event(self, event_id: str) -> str: """The stream token for an event @@ -954,7 +948,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): direction: str = "b", limit: int = -1, event_filter: Optional[Filter] = None, - ) -> Tuple[List[_EventDictReturn], str]: + ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: """Returns list of events before or after a given token. Args: @@ -1054,17 +1048,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token - return rows, str(next_token) + return rows, next_token async def paginate_room_events( self, room_id: str, - from_key: str, - to_key: Optional[str] = None, + from_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, direction: str = "b", limit: int = -1, event_filter: Optional[Filter] = None, - ) -> Tuple[List[EventBase], str]: + ) -> Tuple[List[EventBase], RoomStreamToken]: """Returns list of events before or after a given token. Args: @@ -1083,17 +1077,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): and `to_key`). """ - parsed_from_key = RoomStreamToken.parse(from_key) - parsed_to_key = None - if to_key: - parsed_to_key = RoomStreamToken.parse(to_key) - rows, token = await self.db_pool.runInteraction( "paginate_room_events", self._paginate_room_events_txn, room_id, - parsed_from_key, - parsed_to_key, + from_key, + to_key, direction, limit, event_filter, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index dbaeef91dd..d89f6ed128 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -18,7 +18,7 @@ import itertools import logging from collections import deque, namedtuple -from typing import Iterable, List, Optional, Set, Tuple +from typing import Dict, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter, Histogram @@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases import Databases from synapse.storage.databases.main.events import DeltaState -from synapse.types import StateMap +from synapse.types import Collection, StateMap from synapse.util.async_helpers import ObservableDeferred from synapse.util.metrics import Measure @@ -185,6 +185,8 @@ class EventsPersistenceStorage: # store for now. self.main_store = stores.main self.state_store = stores.state + + assert stores.persist_events self.persist_events_store = stores.persist_events self._clock = hs.get_clock() @@ -208,7 +210,7 @@ class EventsPersistenceStorage: Returns: the stream ordering of the latest persisted event """ - partitioned = {} + partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]] for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) @@ -305,7 +307,9 @@ class EventsPersistenceStorage: # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then # calculating the state from that. - events_by_room = {} + events_by_room = ( + {} + ) # type: Dict[str, List[Tuple[EventBase, EventContext]]] for event, context in chunk: events_by_room.setdefault(event.room_id, []).append( (event, context) @@ -436,7 +440,7 @@ class EventsPersistenceStorage: self, room_id: str, event_contexts: List[Tuple[EventBase, EventContext]], - latest_event_ids: List[str], + latest_event_ids: Collection[str], ): """Calculates the new forward extremities for a room given events to persist. @@ -470,7 +474,7 @@ class EventsPersistenceStorage: # Remove any events which are prev_events of any existing events. existing_prevs = await self.persist_events_store._get_events_which_are_prevs( result - ) + ) # type: Collection[str] result.difference_update(existing_prevs) # Finally handle the case where the new events have soft-failed prev diff --git a/synapse/types.py b/synapse/types.py index ba45335038..dc09448bdc 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -425,7 +425,9 @@ class RoomStreamToken: @attr.s(slots=True, frozen=True) class StreamToken: - room_key = attr.ib(type=str) + room_key = attr.ib( + type=RoomStreamToken, validator=attr.validators.instance_of(RoomStreamToken) + ) presence_key = attr.ib(type=int) typing_key = attr.ib(type=int) receipt_key = attr.ib(type=int) @@ -445,21 +447,16 @@ class StreamToken: while len(keys) < len(attr.fields(cls)): # i.e. old token from before receipt_key keys.append("0") - return cls(keys[0], *(int(k) for k in keys[1:])) + return cls(RoomStreamToken.parse(keys[0]), *(int(k) for k in keys[1:])) except Exception: raise SynapseError(400, "Invalid Token") def to_string(self): - return self._SEPARATOR.join([str(k) for k in attr.astuple(self)]) + return self._SEPARATOR.join([str(k) for k in attr.astuple(self, recurse=False)]) @property def room_stream_id(self): - # TODO(markjh): Awful hack to work around hacks in the presence tests - # which assume that the keys are integers. - if type(self.room_key) is int: - return self.room_key - else: - return int(self.room_key[1:].split("-")[-1]) + return self.room_key.stream def is_after(self, other): """Does this token contain events that the other doesn't?""" @@ -475,7 +472,7 @@ class StreamToken: or (int(other.groups_key) < int(self.groups_key)) ) - def copy_and_advance(self, key, new_value): + def copy_and_advance(self, key, new_value) -> "StreamToken": """Advance the given key in the token to a new value if and only if the new value is after the old value. """ @@ -491,7 +488,7 @@ class StreamToken: else: return self - def copy_and_replace(self, key, new_value): + def copy_and_replace(self, key, new_value) -> "StreamToken": return attr.evolve(self, **{key: new_value}) diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index fb1ca90336..e93aa84405 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -71,7 +71,10 @@ async def inject_event( """ event, context = await create_event(hs, room_version, prev_event_ids, **kwargs) - await hs.get_storage().persistence.persist_event(event, context) + persistence = hs.get_storage().persistence + assert persistence is not None + + await persistence.persist_event(event, context) return event From 2832ef5bb797df5938ba9a63af586a2d562d4570 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 11 Sep 2020 08:14:15 -0400 Subject: [PATCH 4/8] 1.20.0rc3 --- CHANGES.md | 9 +++++++++ changelog.d/8291.bugfix | 1 - synapse/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) delete mode 100644 changelog.d/8291.bugfix diff --git a/CHANGES.md b/CHANGES.md index dbef6ab6c7..de8ce2625b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.20.0rc3 (2020-09-11) +============================== + +Bugfixes +-------- + +- Fix a bug introduced in v1.20.0rc1 that the wrong exception was raised when invalid JSON data is encountered. ([\#8291](https://github.com/matrix-org/synapse/issues/8291)) + + Synapse 1.20.0rc2 (2020-09-09) ============================== diff --git a/changelog.d/8291.bugfix b/changelog.d/8291.bugfix deleted file mode 100644 index bc01d26f53..0000000000 --- a/changelog.d/8291.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix a bug introduced in v1.20.0rc1 that the wrong exception was raised when invalid JSON data is encountered. diff --git a/synapse/__init__.py b/synapse/__init__.py index 7e8731f86a..bf0bf192a5 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -48,7 +48,7 @@ try: except ImportError: pass -__version__ = "1.20.0rc2" +__version__ = "1.20.0rc3" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when From 08837bb58c287d9b16ec714df55d0d4aec6422cb Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 11 Sep 2020 08:21:57 -0400 Subject: [PATCH 5/8] Clarify changelog. --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index de8ce2625b..dd4d110981 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,7 @@ Synapse 1.20.0rc3 (2020-09-11) Bugfixes -------- -- Fix a bug introduced in v1.20.0rc1 that the wrong exception was raised when invalid JSON data is encountered. ([\#8291](https://github.com/matrix-org/synapse/issues/8291)) +- Fix a bug introduced in v1.20.0rc1 where the wrong exception was raised when invalid JSON data is encountered. ([\#8291](https://github.com/matrix-org/synapse/issues/8291)) Synapse 1.20.0rc2 (2020-09-09) From 04cc249b43e8716513f788b2a4eeb8ede24d19df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 14 Sep 2020 10:16:41 +0100 Subject: [PATCH 6/8] Add experimental support for sharding event persister. Again. (#8294) This is *not* ready for production yet. Caveats: 1. We should write some tests... 2. The stream token that we use for events can get stalled at the minimum position of all writers. This means that new events may not be processed and e.g. sent down sync streams if a writer isn't writing or is slow. --- changelog.d/8294.feature | 1 + synapse/config/_base.py | 21 +++++- synapse/config/_base.pyi | 1 + synapse/config/workers.py | 37 ++++++++--- synapse/handlers/federation.py | 44 +++++++++---- synapse/handlers/message.py | 14 ++-- synapse/handlers/room.py | 14 ++-- synapse/handlers/room_member.py | 7 -- synapse/replication/http/federation.py | 12 +++- synapse/replication/tcp/handler.py | 2 +- synapse/replication/tcp/streams/events.py | 4 +- synapse/storage/databases/__init__.py | 2 +- .../databases/main/event_federation.py | 2 +- synapse/storage/databases/main/events.py | 12 ++-- .../storage/databases/main/events_worker.py | 66 +++++++++++++------ .../delta/58/14events_instance_name.sql | 16 +++++ .../58/14events_instance_name.sql.postgres | 26 ++++++++ synapse/storage/util/id_generators.py | 10 +-- 18 files changed, 211 insertions(+), 80 deletions(-) create mode 100644 changelog.d/8294.feature create mode 100644 synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql create mode 100644 synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres diff --git a/changelog.d/8294.feature b/changelog.d/8294.feature new file mode 100644 index 0000000000..b363e929ea --- /dev/null +++ b/changelog.d/8294.feature @@ -0,0 +1 @@ +Add experimental support for sharding event persister. diff --git a/synapse/config/_base.py b/synapse/config/_base.py index ad5ab6ad62..bb9bf8598d 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key. """ - - # If multiple instances are not defined we always return true. + # If multiple instances are not defined we always return true if not self.instances or len(self.instances) == 1: return True + return self.get_instance(key) == instance_name + + def get_instance(self, key: str) -> str: + """Get the instance responsible for handling the given key. + + Note: For things like federation sending the config for which instance + is sending is known only to the sender instance if there is only one. + Therefore `should_handle` should be used where possible. + """ + + if not self.instances: + return "master" + + if len(self.instances) == 1: + return self.instances[0] + # We shard by taking the hash, modulo it by the number of instances and # then checking whether this instance matches the instance at that # index. @@ -846,7 +861,7 @@ class ShardedWorkerHandlingConfig: dest_hash = sha256(key.encode("utf8")).digest() dest_int = int.from_bytes(dest_hash, byteorder="little") remainder = dest_int % (len(self.instances)) - return self.instances[remainder] == instance_name + return self.instances[remainder] __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index eb911e8f9f..b8faafa9bd 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig: instances: List[str] def __init__(self, instances: List[str]) -> None: ... def should_handle(self, instance_name: str, key: str) -> bool: ... + def get_instance(self, key: str) -> str: ... diff --git a/synapse/config/workers.py b/synapse/config/workers.py index c784a71508..f23e42cdf9 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,12 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Union + import attr from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from .server import ListenerConfig, parse_listener_def +def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: + """Helper for allowing parsing a string or list of strings to a config + option expecting a list of strings. + """ + + if isinstance(obj, str): + return [obj] + return obj + + @attr.s class InstanceLocationConfig: """The host and port to talk to an instance via HTTP replication. @@ -33,11 +45,13 @@ class WriterLocations: """Specifies the instances that write various streams. Attributes: - events: The instance that writes to the event and backfill streams. - events: The instance that writes to the typing stream. + events: The instances that write to the event and backfill streams. + typing: The instance that writes to the typing stream. """ - events = attr.ib(default="master", type=str) + events = attr.ib( + default=["master"], type=List[str], converter=_instance_to_list_converter + ) typing = attr.ib(default="master", type=str) @@ -105,15 +119,18 @@ class WorkerConfig(Config): writers = config.get("stream_writers") or {} self.writers = WriterLocations(**writers) - # Check that the configured writer for events and typing also appears in + # Check that the configured writers for events and typing also appears in # `instance_map`. for stream in ("events", "typing"): - instance = getattr(self.writers, stream) - if instance != "master" and instance not in self.instance_map: - raise ConfigError( - "Instance %r is configured to write %s but does not appear in `instance_map` config." - % (instance, stream) - ) + instances = _instance_to_list_converter(getattr(self.writers, stream)) + for instance in instances: + if instance != "master" and instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured to write %s but does not appear in `instance_map` config." + % (instance, stream) + ) + + self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c195eba830..a5734bebab 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -896,7 +896,8 @@ class FederationHandler(BaseHandler): ) ) - await self._handle_new_events(dest, ev_infos, backfilled=True) + if ev_infos: + await self._handle_new_events(dest, room_id, ev_infos, backfilled=True) # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) @@ -1189,7 +1190,7 @@ class FederationHandler(BaseHandler): event_infos.append(_NewEventInfo(event, None, auth)) await self._handle_new_events( - destination, event_infos, + destination, room_id, event_infos, ) def _sanity_check_event(self, ev): @@ -1336,15 +1337,15 @@ class FederationHandler(BaseHandler): ) max_stream_id = await self._persist_auth_tree( - origin, auth_chain, state, event, room_version_obj + origin, room_id, auth_chain, state, event, room_version_obj ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. - # - # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - self.config.worker.writers.events, "events", max_stream_id + self.config.worker.events_shard_config.get_instance(room_id), + "events", + max_stream_id, ) # Check whether this room is the result of an upgrade of a room we already know @@ -1593,7 +1594,7 @@ class FederationHandler(BaseHandler): ) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify([(event, context)]) + await self.persist_events_and_notify(event.room_id, [(event, context)]) return event @@ -1620,7 +1621,9 @@ class FederationHandler(BaseHandler): await self.federation_client.send_leave(host_list, event) context = await self.state_handler.compute_event_context(event) - stream_id = await self.persist_events_and_notify([(event, context)]) + stream_id = await self.persist_events_and_notify( + event.room_id, [(event, context)] + ) return event, stream_id @@ -1868,7 +1871,7 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( - [(event, context)], backfilled=backfilled + event.room_id, [(event, context)], backfilled=backfilled ) except Exception: run_in_background( @@ -1881,6 +1884,7 @@ class FederationHandler(BaseHandler): async def _handle_new_events( self, origin: str, + room_id: str, event_infos: Iterable[_NewEventInfo], backfilled: bool = False, ) -> None: @@ -1912,6 +1916,7 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( + room_id, [ (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) @@ -1922,6 +1927,7 @@ class FederationHandler(BaseHandler): async def _persist_auth_tree( self, origin: str, + room_id: str, auth_events: List[EventBase], state: List[EventBase], event: EventBase, @@ -1936,6 +1942,7 @@ class FederationHandler(BaseHandler): Args: origin: Where the events came from + room_id, auth_events state event @@ -2010,17 +2017,20 @@ class FederationHandler(BaseHandler): events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR await self.persist_events_and_notify( + room_id, [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) - ] + ], ) new_event_context = await self.state_handler.compute_event_context( event, old_state=state ) - return await self.persist_events_and_notify([(event, new_event_context)]) + return await self.persist_events_and_notify( + room_id, [(event, new_event_context)] + ) async def _prep_event( self, @@ -2871,6 +2881,7 @@ class FederationHandler(BaseHandler): async def persist_events_and_notify( self, + room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], backfilled: bool = False, ) -> int: @@ -2878,14 +2889,19 @@ class FederationHandler(BaseHandler): necessary. Args: - event_and_contexts: + room_id: The room ID of events being persisted. + event_and_contexts: Sequence of events with their associated + context that should be persisted. All events must belong to + the same room. backfilled: Whether these events are a result of backfilling or not """ - if self.config.worker.writers.events != self._instance_name: + instance = self.config.worker.events_shard_config.get_instance(room_id) + if instance != self._instance_name: result = await self._send_events( - instance_name=self.config.worker.writers.events, + instance_name=instance, store=self.store, + room_id=room_id, event_and_contexts=event_and_contexts, backfilled=backfilled, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e54e2b322b..a8fe5cf4e2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -376,9 +376,8 @@ class EventCreationHandler: self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases - self._is_event_writer = ( - self.config.worker.writers.events == hs.get_instance_name() - ) + self._events_shard_config = self.config.worker.events_shard_config + self._instance_name = hs.get_instance_name() self.room_invite_state_types = self.hs.config.room_invite_state_types @@ -902,9 +901,10 @@ class EventCreationHandler: try: # If we're a worker we need to hit out to the master. - if not self._is_event_writer: + writer_instance = self._events_shard_config.get_instance(event.room_id) + if writer_instance != self._instance_name: result = await self.send_event( - instance_name=self.config.worker.writers.events, + instance_name=writer_instance, event_id=event.event_id, store=self.store, requester=requester, @@ -972,8 +972,10 @@ class EventCreationHandler: This should only be run on the instance in charge of persisting events. """ - assert self._is_event_writer assert self.storage.persistence is not None + assert self._events_shard_config.should_handle( + self._instance_name, event.room_id + ) if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 53d85ab97d..eeade6ad3f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler): # Always wait for room creation to progate before returning await self._replication.wait_for_stream_position( - self.hs.config.worker.writers.events, "events", last_stream_id + self.hs.config.worker.events_shard_config.get_instance(room_id), + "events", + last_stream_id, ) return result, last_stream_id @@ -1259,10 +1261,10 @@ class RoomShutdownHandler: # We now wait for the create room to come back in via replication so # that we can assume that all the joins/invites have propogated before # we try and auto join below. - # - # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - self.hs.config.worker.writers.events, "events", stream_id + self.hs.config.worker.events_shard_config.get_instance(new_room_id), + "events", + stream_id, ) else: new_room_id = None @@ -1292,7 +1294,9 @@ class RoomShutdownHandler: # Wait for leave to come in over replication before trying to forget. await self._replication.wait_for_stream_position( - self.hs.config.worker.writers.events, "events", stream_id + self.hs.config.worker.events_shard_config.get_instance(room_id), + "events", + stream_id, ) await self.room_member_handler.forget(target_requester.user, room_id) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 100f335b80..01a6e88262 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -82,13 +82,6 @@ class RoomMemberHandler: self._enable_lookup = hs.config.enable_3pid_lookup self.allow_per_room_profiles = self.config.allow_per_room_profiles - self._event_stream_writer_instance = hs.config.worker.writers.events - self._is_on_event_persistence_instance = ( - self._event_stream_writer_instance == hs.get_instance_name() - ) - if self._is_on_event_persistence_instance: - self.persist_event_storage = hs.get_storage().persistence - self._join_rate_limiter_local = Ratelimiter( clock=self.clock, rate_hz=hs.config.ratelimiting.rc_joins_local.per_second, diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 6b56315148..5c8be747e1 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -65,10 +65,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.federation_handler = hs.get_handlers().federation_handler @staticmethod - async def _serialize_payload(store, event_and_contexts, backfilled): + async def _serialize_payload(store, room_id, event_and_contexts, backfilled): """ Args: store + room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) backfilled (bool): Whether or not the events are the result of backfilling @@ -88,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): } ) - payload = {"events": event_payloads, "backfilled": backfilled} + payload = { + "events": event_payloads, + "backfilled": backfilled, + "room_id": room_id, + } return payload @@ -96,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): with Measure(self.clock, "repl_fed_send_events_parse"): content = parse_json_object_from_request(request) + room_id = content["room_id"] backfilled = content["backfilled"] event_payloads = content["events"] @@ -120,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_handler.persist_events_and_notify( - event_and_contexts, backfilled + room_id, event_and_contexts, backfilled ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 1c303f3a46..b323841f73 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -109,7 +109,7 @@ class ReplicationCommandHandler: if isinstance(stream, (EventsStream, BackfillStream)): # Only add EventStream and BackfillStream as a source on the # instance in charge of event persistence. - if hs.config.worker.writers.events == hs.get_instance_name(): + if hs.get_instance_name() in hs.config.worker.writers.events: self._streams_to_replicate.append(stream) continue diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index f929fc3954..ccc7ca30d8 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -19,7 +19,7 @@ from typing import List, Tuple, Type import attr -from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance +from ._base import Stream, StreamUpdateResult, Token """Handling of the 'events' replication stream @@ -117,7 +117,7 @@ class EventsStream(Stream): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), - current_token_without_instance(self._store.get_current_events_token), + self._store._stream_id_gen.get_current_token_for_writer, self._update_function, ) diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 985b12df91..aa5d490624 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -75,7 +75,7 @@ class Databases: # If we're on a process that can persist events also # instantiate a `PersistEventsStore` - if hs.config.worker.writers.events == hs.get_instance_name(): + if hs.get_instance_name() in hs.config.worker.writers.events: persist_events = PersistEventsStore(hs, database, main) if "state" in database_config.databases: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 0b69aa6a94..4c3c162acf 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ if stream_ordering <= self.stream_ordering_month_ago: - raise StoreError(400, "stream_ordering too old") + raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) sql = """ SELECT event_id FROM stream_ordering_to_exterm diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 9cd1403b38..9a80f419e3 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -32,7 +32,7 @@ from synapse.logging.utils import log_function from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchEntry -from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import StateMap, get_domain_from_id from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.iterutils import batch_iter @@ -97,18 +97,21 @@ class PersistEventsStore: self.store = main_data_store self.database_engine = db.engine self._clock = hs.get_clock() + self._instance_name = hs.get_instance_name() self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id # Ideally we'd move these ID gens here, unfortunately some other ID # generators are chained off them so doing so is a bit of a PITA. - self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator - self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator + self._backfill_id_gen = ( + self.store._backfill_id_gen + ) # type: MultiWriterIdGenerator + self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator # This should only exist on instances that are configured to write assert ( - hs.config.worker.writers.events == hs.get_instance_name() + hs.get_instance_name() in hs.config.worker.writers.events ), "Can only instantiate EventsStore on master" async def _persist_events_and_state_updates( @@ -809,6 +812,7 @@ class PersistEventsStore: table="events", values=[ { + "instance_name": self._instance_name, "stream_ordering": event.internal_metadata.stream_ordering, "topological_ordering": event.depth, "depth": event.depth, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a7a73cc3d8..17f5997b89 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool -from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.storage.engines import PostgresEngine +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.types import Collection, get_domain_from_id from synapse.util.caches.descriptors import Cache, cached from synapse.util.iterutils import batch_iter @@ -78,27 +79,54 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if hs.config.worker.writers.events == hs.get_instance_name(): - # We are the process in charge of generating stream ids for events, - # so instantiate ID generators based on the database - self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", + if isinstance(database.engine, PostgresEngine): + # If we're using Postgres than we can use `MultiWriterIdGenerator` + # regardless of whether this process writes to the streams or not. + self._stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + instance_name=hs.get_instance_name(), + table="events", + instance_column="instance_name", + id_column="stream_ordering", + sequence_name="events_stream_seq", ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], + self._backfill_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + instance_name=hs.get_instance_name(), + table="events", + instance_column="instance_name", + id_column="stream_ordering", + sequence_name="events_backfill_stream_seq", + positive=False, ) else: - # Another process is in charge of persisting events and generating - # stream IDs: rely on the replication streams to let us know which - # IDs we can process. - self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 - ) + # We shouldn't be running in worker mode with SQLite, but its useful + # to support it for unit tests. + # + # If this process is the writer than we need to use + # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets + # updated over replication. (Multiple writers are not supported for + # SQLite). + if hs.get_instance_name() in hs.config.worker.writers.events: + self._stream_id_gen = StreamIdGenerator( + db_conn, "events", "stream_ordering", + ) + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], + ) + else: + self._stream_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering" + ) + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) self._get_event_cache = Cache( "*getEvent*", diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql new file mode 100644 index 0000000000..98ff76d709 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql @@ -0,0 +1,16 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres new file mode 100644 index 0000000000..97c1e6a0c5 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres @@ -0,0 +1,26 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS events_stream_seq; + +SELECT setval('events_stream_seq', ( + SELECT COALESCE(MAX(stream_ordering), 1) FROM events +)); + +CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq; + +SELECT setval('events_backfill_stream_seq', ( + SELECT COALESCE(-MIN(stream_ordering), 1) FROM events +)); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 2a66b3ad4e..1de2b91587 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -240,8 +240,12 @@ class MultiWriterIdGenerator: # gaps should be relatively rare it's still worth doing the book keeping # that allows us to skip forwards when there are gapless runs of # positions. + # + # We start at 1 here as a) the first generated stream ID will be 2, and + # b) other parts of the code assume that stream IDs are strictly greater + # than 0. self._persisted_upto_position = ( - min(self._current_positions.values()) if self._current_positions else 0 + min(self._current_positions.values()) if self._current_positions else 1 ) self._known_persisted_positions = [] # type: List[int] @@ -398,9 +402,7 @@ class MultiWriterIdGenerator: equal to it have been successfully persisted. """ - # Currently we don't support this operation, as it's not obvious how to - # condense the stream positions of multiple writers into a single int. - raise NotImplementedError() + return self.get_persisted_upto_position() def get_current_token_for_writer(self, instance_name: str) -> int: """Returns the position of the given writer. From 6605470bfb8944d369b8fc73195a380b95b6de9d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 14 Sep 2020 09:05:36 -0400 Subject: [PATCH 7/8] Improve SAML error messages (#8248) --- UPGRADE.rst | 14 ++ changelog.d/8248.feature | 1 + docs/sample_config.yaml | 30 +---- synapse/config/saml2_config.py | 34 +---- synapse/handlers/oidc_handler.py | 4 +- synapse/handlers/saml_handler.py | 169 +++++++++++++++--------- synapse/res/templates/saml_error.html | 52 -------- synapse/res/templates/sso_error.html | 43 +++++- synapse/rest/saml2/response_resource.py | 16 +-- 9 files changed, 178 insertions(+), 185 deletions(-) create mode 100644 changelog.d/8248.feature delete mode 100644 synapse/res/templates/saml_error.html diff --git a/UPGRADE.rst b/UPGRADE.rst index fc8982ddfe..49e86e628f 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -114,6 +114,20 @@ request to with the query parameters from the original link, presented as a URL-encoded form. See the file itself for more details. +Updated Single Sign-on HTML Templates +------------------------------------- + +The ``saml_error.html`` template was removed from Synapse and replaced with the +``sso_error.html`` template. If your Synapse is configured to use SAML and a +custom ``sso_redirect_confirm_template_dir`` configuration then any customisations +of the ``saml_error.html`` template will need to be merged into the ``sso_error.html`` +template. These templates are similar, but the parameters are slightly different: + +* The ``msg`` parameter should be renamed to ``error_description``. +* There is no longer a ``code`` parameter for the response code. +* A string ``error`` parameter is available that includes a short hint of why a + user is seeing the error page. + Upgrading to v1.18.0 ==================== diff --git a/changelog.d/8248.feature b/changelog.d/8248.feature new file mode 100644 index 0000000000..f3c4a74bc7 --- /dev/null +++ b/changelog.d/8248.feature @@ -0,0 +1 @@ +Consolidate the SSO error template across all configuration. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 2a5b2e0935..fb04ff283d 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1485,11 +1485,14 @@ trusted_key_servers: # At least one of `sp_config` or `config_path` must be set in this section to # enable SAML login. # -# (You will probably also want to set the following options to `false` to +# You will probably also want to set the following options to `false` to # disable the regular login/registration flows: # * enable_registration # * password_config.enabled # +# You will also want to investigate the settings under the "sso" configuration +# section below. +# # Once SAML support is enabled, a metadata file will be exposed at # https://:/_matrix/saml2/metadata.xml, which you may be able to # use to configure your SAML IdP with. Alternatively, you can manually configure @@ -1612,31 +1615,6 @@ saml2_config: # - attribute: department # value: "sales" - # Directory in which Synapse will try to find the template files below. - # If not set, default templates from within the Synapse package will be used. - # - # DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates. - # If you *do* uncomment it, you will need to make sure that all the templates - # below are in the directory. - # - # Synapse will look for the following templates in this directory: - # - # * HTML page to display to users if something goes wrong during the - # authentication process: 'saml_error.html'. - # - # When rendering, this template is given the following variables: - # * code: an HTML error code corresponding to the error that is being - # returned (typically 400 or 500) - # - # * msg: a textual message describing the error. - # - # The variables will automatically be HTML-escaped. - # - # You can see the default templates at: - # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates - # - #template_dir: "res/templates" - # OpenID Connect integration. The following settings can be used to make Synapse # use an OpenID Connect Provider for authentication, instead of its internal diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py index cc7401888b..99aa8b3bf1 100644 --- a/synapse/config/saml2_config.py +++ b/synapse/config/saml2_config.py @@ -169,10 +169,6 @@ class SAML2Config(Config): saml2_config.get("saml_session_lifetime", "15m") ) - self.saml2_error_html_template = self.read_templates( - ["saml_error.html"], saml2_config.get("template_dir") - )[0] - def _default_saml_config_dict( self, required_attributes: set, optional_attributes: set ): @@ -225,11 +221,14 @@ class SAML2Config(Config): # At least one of `sp_config` or `config_path` must be set in this section to # enable SAML login. # - # (You will probably also want to set the following options to `false` to + # You will probably also want to set the following options to `false` to # disable the regular login/registration flows: # * enable_registration # * password_config.enabled # + # You will also want to investigate the settings under the "sso" configuration + # section below. + # # Once SAML support is enabled, a metadata file will be exposed at # https://:/_matrix/saml2/metadata.xml, which you may be able to # use to configure your SAML IdP with. Alternatively, you can manually configure @@ -351,31 +350,6 @@ class SAML2Config(Config): # value: "staff" # - attribute: department # value: "sales" - - # Directory in which Synapse will try to find the template files below. - # If not set, default templates from within the Synapse package will be used. - # - # DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates. - # If you *do* uncomment it, you will need to make sure that all the templates - # below are in the directory. - # - # Synapse will look for the following templates in this directory: - # - # * HTML page to display to users if something goes wrong during the - # authentication process: 'saml_error.html'. - # - # When rendering, this template is given the following variables: - # * code: an HTML error code corresponding to the error that is being - # returned (typically 400 or 500) - # - # * msg: a textual message describing the error. - # - # The variables will automatically be HTML-escaped. - # - # You can see the default templates at: - # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates - # - #template_dir: "res/templates" """ % { "config_dir_path": config_dir_path } diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py index 1b06f3173f..4230dbaf99 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py @@ -131,10 +131,10 @@ class OidcHandler: def _render_error( self, request, error: str, error_description: Optional[str] = None ) -> None: - """Renders the error template and respond with it. + """Render the error template and respond to the request with it. This is used to show errors to the user. The template of this page can - be found under ``synapse/res/templates/sso_error.html``. + be found under `synapse/res/templates/sso_error.html`. Args: request: The incoming request from the browser. diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py index 66b063f991..8715abd4d1 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py @@ -21,9 +21,10 @@ import saml2 import saml2.response from saml2.client import Saml2Client -from synapse.api.errors import AuthError, SynapseError +from synapse.api.errors import SynapseError from synapse.config import ConfigError from synapse.config.saml2_config import SamlAttributeRequirement +from synapse.http.server import respond_with_html from synapse.http.servlet import parse_string from synapse.http.site import SynapseRequest from synapse.module_api import ModuleApi @@ -41,6 +42,10 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class MappingException(Exception): + """Used to catch errors when mapping the SAML2 response to a user.""" + + @attr.s class Saml2SessionData: """Data we track about SAML2 sessions""" @@ -68,6 +73,7 @@ class SamlHandler: hs.config.saml2_grandfathered_mxid_source_attribute ) self._saml2_attribute_requirements = hs.config.saml2.attribute_requirements + self._error_template = hs.config.sso_error_template # plugin to do custom mapping from saml response to mxid self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class( @@ -84,6 +90,25 @@ class SamlHandler: # a lock on the mappings self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock) + def _render_error( + self, request, error: str, error_description: Optional[str] = None + ) -> None: + """Render the error template and respond to the request with it. + + This is used to show errors to the user. The template of this page can + be found under `synapse/res/templates/sso_error.html`. + + Args: + request: The incoming request from the browser. + We'll respond with an HTML page describing the error. + error: A technical identifier for this error. + error_description: A human-readable description of the error. + """ + html = self._error_template.render( + error=error, error_description=error_description + ) + respond_with_html(request, 400, html) + def handle_redirect_request( self, client_redirect_url: bytes, ui_auth_session_id: Optional[str] = None ) -> bytes: @@ -134,49 +159,6 @@ class SamlHandler: # the dict. self.expire_sessions() - # Pull out the user-agent and IP from the request. - user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[ - 0 - ].decode("ascii", "surrogateescape") - ip_address = self.hs.get_ip_from_request(request) - - user_id, current_session = await self._map_saml_response_to_user( - resp_bytes, relay_state, user_agent, ip_address - ) - - # Complete the interactive auth session or the login. - if current_session and current_session.ui_auth_session_id: - await self._auth_handler.complete_sso_ui_auth( - user_id, current_session.ui_auth_session_id, request - ) - - else: - await self._auth_handler.complete_sso_login(user_id, request, relay_state) - - async def _map_saml_response_to_user( - self, - resp_bytes: str, - client_redirect_url: str, - user_agent: str, - ip_address: str, - ) -> Tuple[str, Optional[Saml2SessionData]]: - """ - Given a sample response, retrieve the cached session and user for it. - - Args: - resp_bytes: The SAML response. - client_redirect_url: The redirect URL passed in by the client. - user_agent: The user agent of the client making the request. - ip_address: The IP address of the client making the request. - - Returns: - Tuple of the user ID and SAML session associated with this response. - - Raises: - SynapseError if there was a problem with the response. - RedirectException: some mapping providers may raise this if they need - to redirect to an interstitial page. - """ try: saml2_auth = self._saml_client.parse_authn_request_response( resp_bytes, @@ -189,12 +171,23 @@ class SamlHandler: # in the (user-visible) exception message, so let's log the exception here # so we can track down the session IDs later. logger.warning(str(e)) - raise SynapseError(400, "Unexpected SAML2 login.") + self._render_error( + request, "unsolicited_response", "Unexpected SAML2 login." + ) + return except Exception as e: - raise SynapseError(400, "Unable to parse SAML2 response: %s." % (e,)) + self._render_error( + request, + "invalid_response", + "Unable to parse SAML2 response: %s." % (e,), + ) + return if saml2_auth.not_signed: - raise SynapseError(400, "SAML2 response was not signed.") + self._render_error( + request, "unsigned_respond", "SAML2 response was not signed." + ) + return logger.debug("SAML2 response: %s", saml2_auth.origxml) for assertion in saml2_auth.assertions: @@ -213,15 +206,73 @@ class SamlHandler: saml2_auth.in_response_to, None ) + # Ensure that the attributes of the logged in user meet the required + # attributes. for requirement in self._saml2_attribute_requirements: - _check_attribute_requirement(saml2_auth.ava, requirement) + if not _check_attribute_requirement(saml2_auth.ava, requirement): + self._render_error( + request, "unauthorised", "You are not authorised to log in here." + ) + return + + # Pull out the user-agent and IP from the request. + user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[ + 0 + ].decode("ascii", "surrogateescape") + ip_address = self.hs.get_ip_from_request(request) + + # Call the mapper to register/login the user + try: + user_id = await self._map_saml_response_to_user( + saml2_auth, relay_state, user_agent, ip_address + ) + except MappingException as e: + logger.exception("Could not map user") + self._render_error(request, "mapping_error", str(e)) + return + + # Complete the interactive auth session or the login. + if current_session and current_session.ui_auth_session_id: + await self._auth_handler.complete_sso_ui_auth( + user_id, current_session.ui_auth_session_id, request + ) + + else: + await self._auth_handler.complete_sso_login(user_id, request, relay_state) + + async def _map_saml_response_to_user( + self, + saml2_auth: saml2.response.AuthnResponse, + client_redirect_url: str, + user_agent: str, + ip_address: str, + ) -> str: + """ + Given a SAML response, retrieve the user ID for it and possibly register the user. + + Args: + saml2_auth: The parsed SAML2 response. + client_redirect_url: The redirect URL passed in by the client. + user_agent: The user agent of the client making the request. + ip_address: The IP address of the client making the request. + + Returns: + The user ID associated with this response. + + Raises: + MappingException if there was a problem mapping the response to a user. + RedirectException: some mapping providers may raise this if they need + to redirect to an interstitial page. + """ remote_user_id = self._user_mapping_provider.get_remote_user_id( saml2_auth, client_redirect_url ) if not remote_user_id: - raise Exception("Failed to extract remote user id from SAML response") + raise MappingException( + "Failed to extract remote user id from SAML response" + ) with (await self._mapping_lock.queue(self._auth_provider_id)): # first of all, check if we already have a mapping for this user @@ -235,7 +286,7 @@ class SamlHandler: ) if registered_user_id is not None: logger.info("Found existing mapping %s", registered_user_id) - return registered_user_id, current_session + return registered_user_id # backwards-compatibility hack: see if there is an existing user with a # suitable mapping from the uid @@ -260,7 +311,7 @@ class SamlHandler: await self._datastore.record_user_external_id( self._auth_provider_id, remote_user_id, registered_user_id ) - return registered_user_id, current_session + return registered_user_id # Map saml response to user attributes using the configured mapping provider for i in range(1000): @@ -277,7 +328,7 @@ class SamlHandler: localpart = attribute_dict.get("mxid_localpart") if not localpart: - raise Exception( + raise MappingException( "Error parsing SAML2 response: SAML mapping provider plugin " "did not return a mxid_localpart value" ) @@ -294,8 +345,8 @@ class SamlHandler: else: # Unable to generate a username in 1000 iterations # Break and return error to the user - raise SynapseError( - 500, "Unable to generate a Matrix ID from the SAML response" + raise MappingException( + "Unable to generate a Matrix ID from the SAML response" ) logger.info("Mapped SAML user to local part %s", localpart) @@ -310,7 +361,7 @@ class SamlHandler: await self._datastore.record_user_external_id( self._auth_provider_id, remote_user_id, registered_user_id ) - return registered_user_id, current_session + return registered_user_id def expire_sessions(self): expire_before = self._clock.time_msec() - self._saml2_session_lifetime @@ -323,11 +374,11 @@ class SamlHandler: del self._outstanding_requests_dict[reqid] -def _check_attribute_requirement(ava: dict, req: SamlAttributeRequirement): +def _check_attribute_requirement(ava: dict, req: SamlAttributeRequirement) -> bool: values = ava.get(req.attribute, []) for v in values: if v == req.value: - return + return True logger.info( "SAML2 attribute %s did not match required value '%s' (was '%s')", @@ -335,7 +386,7 @@ def _check_attribute_requirement(ava: dict, req: SamlAttributeRequirement): req.value, values, ) - raise AuthError(403, "You are not authorized to log in here.") + return False DOT_REPLACE_PATTERN = re.compile( @@ -390,7 +441,7 @@ class DefaultSamlMappingProvider: return saml_response.ava["uid"][0] except KeyError: logger.warning("SAML2 response lacks a 'uid' attestation") - raise SynapseError(400, "'uid' not in SAML2 response") + raise MappingException("'uid' not in SAML2 response") def saml_response_to_user_attributes( self, diff --git a/synapse/res/templates/saml_error.html b/synapse/res/templates/saml_error.html deleted file mode 100644 index 01cd9bdaf3..0000000000 --- a/synapse/res/templates/saml_error.html +++ /dev/null @@ -1,52 +0,0 @@ - - - - - SSO login error - - -{# a 403 means we have actively rejected their login #} -{% if code == 403 %} -

You are not allowed to log in here.

-{% else %} -

- There was an error during authentication: -

-
{{ msg }}
-

- If you are seeing this page after clicking a link sent to you via email, make - sure you only click the confirmation link once, and that you open the - validation link in the same client you're logging in from. -

-

- Try logging in again from your Matrix client and if the problem persists - please contact the server's administrator. -

- - -{% endif %} - - diff --git a/synapse/res/templates/sso_error.html b/synapse/res/templates/sso_error.html index 43a211386b..af8459719a 100644 --- a/synapse/res/templates/sso_error.html +++ b/synapse/res/templates/sso_error.html @@ -5,14 +5,49 @@ SSO error -

Oops! Something went wrong during authentication.

+{# If an error of unauthorised is returned it means we have actively rejected their login #} +{% if error == "unauthorised" %} +

You are not allowed to log in here.

+{% else %} +

+ There was an error during authentication: +

+
{{ error_description }}
+

+ If you are seeing this page after clicking a link sent to you via email, make + sure you only click the confirmation link once, and that you open the + validation link in the same client you're logging in from. +

Try logging in again from your Matrix client and if the problem persists please contact the server's administrator.

Error: {{ error }}

- {% if error_description %} -
{{ error_description }}
- {% endif %} + + +{% endif %} diff --git a/synapse/rest/saml2/response_resource.py b/synapse/rest/saml2/response_resource.py index c10188a5d7..f6668fb5e3 100644 --- a/synapse/rest/saml2/response_resource.py +++ b/synapse/rest/saml2/response_resource.py @@ -13,10 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.python import failure -from synapse.api.errors import SynapseError -from synapse.http.server import DirectServeHtmlResource, return_html_error +from synapse.http.server import DirectServeHtmlResource class SAML2ResponseResource(DirectServeHtmlResource): @@ -27,21 +25,15 @@ class SAML2ResponseResource(DirectServeHtmlResource): def __init__(self, hs): super().__init__() self._saml_handler = hs.get_saml_handler() - self._error_html_template = hs.config.saml2.saml2_error_html_template async def _async_render_GET(self, request): # We're not expecting any GET request on that resource if everything goes right, # but some IdPs sometimes end up responding with a 302 redirect on this endpoint. # In this case, just tell the user that something went wrong and they should # try to authenticate again. - f = failure.Failure( - SynapseError(400, "Unexpected GET request on /saml2/authn_response") + self._saml_handler._render_error( + request, "unexpected_get", "Unexpected GET request on /saml2/authn_response" ) - return_html_error(f, request, self._error_html_template) async def _async_render_POST(self, request): - try: - await self._saml_handler.handle_saml_response(request) - except Exception: - f = failure.Failure() - return_html_error(f, request, self._error_html_template) + await self._saml_handler.handle_saml_response(request) From b82d68c0bd952131836d00994c3c2a79b3d3a267 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 14 Sep 2020 17:07:04 +0300 Subject: [PATCH 8/8] Add the topic and avatar to the room details admin API (#8305) --- changelog.d/8305.feature | 1 + docs/admin_api/rooms.md | 4 ++++ synapse/storage/databases/main/room.py | 3 ++- tests/rest/admin/test_room.py | 2 ++ 4 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 changelog.d/8305.feature diff --git a/changelog.d/8305.feature b/changelog.d/8305.feature new file mode 100644 index 0000000000..862dfdf959 --- /dev/null +++ b/changelog.d/8305.feature @@ -0,0 +1 @@ +Add the room topic and avatar to the room details admin API. diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 0f267d2b7b..fa9b914fa7 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -275,6 +275,8 @@ The following fields are possible in the JSON response body: * `room_id` - The ID of the room. * `name` - The name of the room. +* `topic` - The topic of the room. +* `avatar` - The `mxc` URI to the avatar of the room. * `canonical_alias` - The canonical (main) alias address of the room. * `joined_members` - How many users are currently in the room. * `joined_local_members` - How many local users are currently in the room. @@ -304,6 +306,8 @@ Response: { "room_id": "!mscvqgqpHYjBGDxNym:matrix.org", "name": "Music Theory", + "avatar": "mxc://matrix.org/AQDaVFlbkQoErdOgqWRgiGSV", + "topic": "Theory, Composition, Notation, Analysis", "canonical_alias": "#musictheory:matrix.org", "joined_members": 127 "joined_local_members": 2, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 717df97301..127588ce4c 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -104,7 +104,8 @@ class RoomWorkerStore(SQLBaseStore): curr.local_users_in_room AS joined_local_members, rooms.room_version AS version, rooms.creator, state.encryption, state.is_federatable AS federatable, rooms.is_public AS public, state.join_rules, state.guest_access, - state.history_visibility, curr.current_state_events AS state_events + state.history_visibility, curr.current_state_events AS state_events, + state.avatar, state.topic FROM rooms LEFT JOIN room_stats_state state USING (room_id) LEFT JOIN room_stats_current curr USING (room_id) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 408c568a27..6dfc709dc5 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1174,6 +1174,8 @@ class RoomTestCase(unittest.HomeserverTestCase): self.assertIn("room_id", channel.json_body) self.assertIn("name", channel.json_body) + self.assertIn("topic", channel.json_body) + self.assertIn("avatar", channel.json_body) self.assertIn("canonical_alias", channel.json_body) self.assertIn("joined_members", channel.json_body) self.assertIn("joined_local_members", channel.json_body)