Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

anoa/redirect_instances
David Robertson 2023-01-23 11:46:48 +00:00
commit c4c8a2716e
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
46 changed files with 681 additions and 267 deletions

1
changelog.d/14752.misc Normal file
View File

@ -0,0 +1 @@
Enable Complement tests for Faster Remote Room Joins against worker-mode Synapse.

1
changelog.d/14844.misc Normal file
View File

@ -0,0 +1 @@
Add check to avoid starting duplicate partial state syncs.

1
changelog.d/14873.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where the `populate_room_stats` background job could fail on broken rooms.

1
changelog.d/14874.bugfix Normal file
View File

@ -0,0 +1 @@
Faster joins: Fix a bug in worker deployments where the room stats and user directory would not get updated when finishing a fast join until another event is sent or received.

1
changelog.d/14875.docker Normal file
View File

@ -0,0 +1 @@
Bump default Python version in the Dockerfile from 3.9 to 3.11.

1
changelog.d/14877.misc Normal file
View File

@ -0,0 +1 @@
Always notify replication when a stream advances automatically.

1
changelog.d/14881.misc Normal file
View File

@ -0,0 +1 @@
Reduce max time we wait for stream positions.

1
changelog.d/14882.bugfix Normal file
View File

@ -0,0 +1 @@
Faster joins: Fix incompatibility with joins into restricted rooms where no local users have the ability to invite.

1
changelog.d/14885.misc Normal file
View File

@ -0,0 +1 @@
Add missing type hints.

1
changelog.d/14889.misc Normal file
View File

@ -0,0 +1 @@
Add missing type hints.

View File

@ -20,7 +20,7 @@
# `poetry export | pip install -r /dev/stdin`, but beware: we have experienced bugs in
# in `poetry export` in the past.
ARG PYTHON_VERSION=3.9
ARG PYTHON_VERSION=3.11
###
### Stage 0: generate requirements.txt
@ -34,11 +34,11 @@ FROM docker.io/python:${PYTHON_VERSION}-slim-bullseye as requirements
# Here we use it to set up a cache for apt (and below for pip), to improve
# rebuild speeds on slow connections.
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
build-essential git libffi-dev libssl-dev \
&& rm -rf /var/lib/apt/lists/*
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
build-essential git libffi-dev libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# We install poetry in its own build stage to avoid its dependencies conflicting with
# synapse's dependencies.
@ -64,9 +64,9 @@ ARG TEST_ONLY_IGNORE_POETRY_LOCKFILE
# Otherwise, just create an empty requirements file so that the Dockerfile can
# proceed.
RUN if [ -z "$TEST_ONLY_IGNORE_POETRY_LOCKFILE" ]; then \
/root/.local/bin/poetry export --extras all -o /synapse/requirements.txt ${TEST_ONLY_SKIP_DEP_HASH_VERIFICATION:+--without-hashes}; \
/root/.local/bin/poetry export --extras all -o /synapse/requirements.txt ${TEST_ONLY_SKIP_DEP_HASH_VERIFICATION:+--without-hashes}; \
else \
touch /synapse/requirements.txt; \
touch /synapse/requirements.txt; \
fi
###
@ -76,24 +76,24 @@ FROM docker.io/python:${PYTHON_VERSION}-slim-bullseye as builder
# install the OS build deps
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
build-essential \
libffi-dev \
libjpeg-dev \
libpq-dev \
libssl-dev \
libwebp-dev \
libxml++2.6-dev \
libxslt1-dev \
openssl \
zlib1g-dev \
git \
curl \
libicu-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
build-essential \
libffi-dev \
libjpeg-dev \
libpq-dev \
libssl-dev \
libwebp-dev \
libxml++2.6-dev \
libxslt1-dev \
openssl \
zlib1g-dev \
git \
curl \
libicu-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# Install rust and ensure its in the PATH
@ -134,9 +134,9 @@ ARG TEST_ONLY_IGNORE_POETRY_LOCKFILE
RUN --mount=type=cache,target=/synapse/target,sharing=locked \
--mount=type=cache,target=${CARGO_HOME}/registry,sharing=locked \
if [ -z "$TEST_ONLY_IGNORE_POETRY_LOCKFILE" ]; then \
pip install --prefix="/install" --no-deps --no-warn-script-location /synapse[all]; \
pip install --prefix="/install" --no-deps --no-warn-script-location /synapse[all]; \
else \
pip install --prefix="/install" --no-warn-script-location /synapse[all]; \
pip install --prefix="/install" --no-warn-script-location /synapse[all]; \
fi
###
@ -151,20 +151,20 @@ LABEL org.opencontainers.image.source='https://github.com/matrix-org/synapse.git
LABEL org.opencontainers.image.licenses='Apache-2.0'
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
curl \
gosu \
libjpeg62-turbo \
libpq5 \
libwebp6 \
xmlsec1 \
libjemalloc2 \
libicu67 \
libssl-dev \
openssl \
&& rm -rf /var/lib/apt/lists/*
curl \
gosu \
libjpeg62-turbo \
libpq5 \
libwebp6 \
xmlsec1 \
libjemalloc2 \
libicu67 \
libssl-dev \
openssl \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /install /usr/local
COPY ./docker/start.py /start.py
@ -175,4 +175,4 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
ENTRYPOINT ["/start.py"]
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
CMD curl -fSs http://localhost:8008/health || exit 1
CMD curl -fSs http://localhost:8008/health || exit 1

View File

@ -94,10 +94,8 @@ allow_device_name_lookup_over_federation: true
experimental_features:
# Enable history backfilling support
msc2716_enabled: true
{% if not workers_in_use %}
# client-side support for partial state in /send_join responses
faster_joins: true
{% endif %}
# Enable support for polls
msc3381_polls_enabled: true
# Enable deleting device-specific notification settings stored in account data

View File

@ -33,7 +33,6 @@ exclude = (?x)
|synapse/storage/schema/
|tests/api/test_auth.py
|tests/api/test_ratelimiting.py
|tests/app/test_openid_listener.py
|tests/appservice/test_scheduler.py
|tests/events/test_presence_router.py
@ -51,7 +50,6 @@ exclude = (?x)
|tests/rest/client/test_transactions.py
|tests/rest/media/v1/test_media_storage.py
|tests/server.py
|tests/server_notices/test_resource_limits_server_notices.py
|tests/test_state.py
|tests/test_terms_auth.py
)$

View File

@ -190,7 +190,7 @@ fi
extra_test_args=()
test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930"
test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930,faster_joins"
# All environment variables starting with PASS_ will be shared.
# (The prefix is stripped off before reaching the container.)
@ -223,12 +223,9 @@ else
export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite
fi
# We only test faster room joins on monoliths, because they are purposefully
# being developed without worker support to start with.
#
# The tests for importing historical messages (MSC2716) also only pass with monoliths,
# currently.
test_tags="$test_tags,faster_joins,msc2716"
# The tests for importing historical messages (MSC2716)
# only pass with monoliths, currently.
test_tags="$test_tags,msc2716"
fi

View File

@ -51,6 +51,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
from synapse.notifier import ReplicationNotifier
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import PushRuleStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
@ -260,6 +261,9 @@ class MockHomeserver:
def should_send_federation(self) -> bool:
return False
def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()
class Porter:
def __init__(

View File

@ -282,13 +282,6 @@ def start(config_options: List[str]) -> None:
"synapse.app.user_dir",
)
if config.experimental.faster_joins_enabled:
raise ConfigError(
"You have enabled the experimental `faster_joins` config option, but it is "
"not compatible with worker deployments yet. Please disable `faster_joins` "
"or run Synapse as a single process deployment instead."
)
synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage

View File

@ -1157,6 +1157,11 @@ class FederationClient(FederationBase):
"members_omitted was set, but no servers were listed in the room"
)
if response.members_omitted and not partial_state:
raise InvalidResponseError(
"members_omitted was set, but we asked for full state"
)
return SendJoinResult(
event=event,
state=signed_state,

View File

@ -974,6 +974,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
self._notifier = hs.get_notifier()
self._remote_edu_linearizer = Linearizer(name="remote_device_list")
@ -1054,6 +1055,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
user_id,
device_id,
)
self._notifier.notify_replication()
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:

View File

@ -27,6 +27,7 @@ from typing import (
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
@ -47,7 +48,6 @@ from synapse.api.errors import (
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
LimitExceededError,
NotFoundError,
RequestSendFailed,
SynapseError,
@ -171,12 +171,29 @@ class FederationHandler:
self.third_party_event_rules = hs.get_third_party_event_rules()
# Tracks running partial state syncs by room ID.
# Partial state syncs currently only run on the main process, so it's okay to
# track them in-memory for now.
self._active_partial_state_syncs: Set[str] = set()
# Tracks partial state syncs we may want to restart.
# A dictionary mapping room IDs to (initial destination, other destinations)
# tuples.
self._partial_state_syncs_maybe_needing_restart: Dict[
str, Tuple[Optional[str], Collection[str]]
] = {}
# A lock guarding the partial state flag for rooms.
# When the lock is held for a given room, no other concurrent code may
# partial state or un-partial state the room.
self._is_partial_state_room_linearizer = Linearizer(
name="_is_partial_state_room_linearizer"
)
# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
)
@trace
@ -587,7 +604,23 @@ class FederationHandler:
self._federation_event_handler.room_queues[room_id] = []
await self._clean_room_for_join(room_id)
is_host_joined = await self.store.is_host_joined(room_id, self.server_name)
if not is_host_joined:
# We may have old forward extremities lying around if the homeserver left
# the room completely in the past. Clear them out.
#
# Note that this check-then-clear is subject to races where
# * the homeserver is in the room and stops being in the room just after
# the check. We won't reset the forward extremities, but that's okay,
# since they will be almost up to date.
# * the homeserver is not in the room and starts being in the room just
# after the check. This can't happen, since `RoomMemberHandler` has a
# linearizer lock which prevents concurrent remote joins into the same
# room.
# In short, the races either have an acceptable outcome or should be
# impossible.
await self._clean_room_for_join(room_id)
try:
# Try the host we successfully got a response to /make_join/
@ -599,94 +632,116 @@ class FederationHandler:
except ValueError:
pass
ret = await self.federation_client.send_join(
host_list, event, room_version_obj
)
async with self._is_partial_state_room_linearizer.queue(room_id):
already_partial_state_room = await self.store.is_partial_state_room(
room_id
)
event = ret.event
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)
ret = await self.federation_client.send_join(
host_list,
event,
room_version_obj,
# Perform a full join when we are already in the room and it is a
# full state room, since we are not allowed to persist a partial
# state join event in a full state room. In the future, we could
# optimize this by always performing a partial state join and
# computing the state ourselves or retrieving it from the remote
# homeserver if necessary.
#
# There's a race where we leave the room, then perform a full join
# anyway. This should end up being fast anyway, since we would
# already have the full room state and auth chain persisted.
partial_state=not is_host_joined or already_partial_state_room,
)
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
event = ret.event
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)
logger.debug("do_invite_join event: %s", event)
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
# if this is the first time we've joined this room, it's time to add
# a row to `rooms` with the correct room version. If there's already a
# row there, we should override it, since it may have been populated
# based on an invite request which lied about the room version.
#
# federation_client.send_join has already checked that the room
# version in the received create event is the same as room_version_obj,
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
room_id=room_id,
room_version=room_version_obj,
state_events=state,
)
logger.debug("do_invite_join event: %s", event)
if ret.partial_state:
# Mark the room as having partial state.
# The background process is responsible for unmarking this flag,
# even if the join fails.
await self.store.store_partial_state_room(
# if this is the first time we've joined this room, it's time to add
# a row to `rooms` with the correct room version. If there's already a
# row there, we should override it, since it may have been populated
# based on an invite request which lied about the room version.
#
# federation_client.send_join has already checked that the room
# version in the received create event is the same as room_version_obj,
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
room_version=room_version_obj,
state_events=state,
)
try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError as e:
# The homeserver was already in the room and it is no longer partial
# stated. We ought to be doing a local join instead. Turn the error into
# a 429, as a hint to the client to try again.
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
# do a remote join for restricted rooms even if we have full state.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
else:
# Record the join event id for future use (when we finish the full
# join). We have to do this after persisting the event to keep foreign
# key constraints intact.
if ret.partial_state:
await self.store.write_partial_state_rooms_join_event_id(
room_id, event.event_id
)
finally:
# Always kick off the background process that asynchronously fetches
# state for the room.
# If the join failed, the background process is responsible for
# cleaning up — including unmarking the room as a partial state room.
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
initial_destination=origin,
other_destinations=ret.servers_in_room,
if ret.partial_state and not already_partial_state_room:
# Mark the room as having partial state.
# The background process is responsible for unmarking this flag,
# even if the join fails.
# TODO(faster_joins):
# We may want to reset the partial state info if it's from an
# old, failed partial state join.
# https://github.com/matrix-org/synapse/issues/13000
await self.store.store_partial_state_room(
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
)
try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError:
# This should be impossible, since we hold the lock on the room's
# partial statedness.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise
else:
# Record the join event id for future use (when we finish the full
# join). We have to do this after persisting the event to keep
# foreign key constraints intact.
if ret.partial_state and not already_partial_state_room:
# TODO(faster_joins):
# We may want to reset the partial state info if it's from
# an old, failed partial state join.
# https://github.com/matrix-org/synapse/issues/13000
await self.store.write_partial_state_rooms_join_event_id(
room_id, event.event_id
)
finally:
# Always kick off the background process that asynchronously fetches
# state for the room.
# If the join failed, the background process is responsible for
# cleaning up — including unmarking the room as a partial state
# room.
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for
# this room.
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
)
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
await self._replication.wait_for_stream_position(
@ -1660,20 +1715,100 @@ class FederationHandler:
# well.
return None
async def _resume_sync_partial_state_room(self) -> None:
async def _resume_partial_state_room_sync(self) -> None:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app
partial_state_rooms = await self.store.get_partial_state_room_resync_info()
for room_id, resync_info in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
self._start_partial_state_room_sync(
initial_destination=resync_info.joined_via,
other_destinations=resync_info.servers_in_room,
room_id=room_id,
)
def _start_partial_state_room_sync(
self,
initial_destination: Optional[str],
other_destinations: Collection[str],
room_id: str,
) -> None:
"""Starts the background process to resync the state of a partial state room,
if it is not already running.
Args:
initial_destination: the initial homeserver to pull the state from
other_destinations: other homeservers to try to pull the state from, if
`initial_destination` is unavailable
room_id: room to be resynced
"""
async def _sync_partial_state_room_wrapper() -> None:
if room_id in self._active_partial_state_syncs:
# Another local user has joined the room while there is already a
# partial state sync running. This implies that there is a new join
# event to un-partial state. We might find ourselves in one of a few
# scenarios:
# 1. There is an existing partial state sync. The partial state sync
# un-partial states the new join event before completing and all is
# well.
# 2. Before the latest join, the homeserver was no longer in the room
# and there is an existing partial state sync from our previous
# membership of the room. The partial state sync may have:
# a) succeeded, but not yet terminated. The room will not be
# un-partial stated again unless we restart the partial state
# sync.
# b) failed, because we were no longer in the room and remote
# homeservers were refusing our requests, but not yet
# terminated. After the latest join, remote homeservers may
# start answering our requests again, so we should restart the
# partial state sync.
# In the cases where we would want to restart the partial state sync,
# the room would have the partial state flag when the partial state sync
# terminates.
self._partial_state_syncs_maybe_needing_restart[room_id] = (
initial_destination,
other_destinations,
)
return
self._active_partial_state_syncs.add(room_id)
try:
await self._sync_partial_state_room(
initial_destination=initial_destination,
other_destinations=other_destinations,
room_id=room_id,
)
finally:
# Read the room's partial state flag while we still hold the claim to
# being the active partial state sync (so that another partial state
# sync can't come along and mess with it under us).
# Normally, the partial state flag will be gone. If it isn't, then we
# may find ourselves in scenario 2a or 2b as described in the comment
# above, where we want to restart the partial state sync.
is_still_partial_state_room = await self.store.is_partial_state_room(
room_id
)
self._active_partial_state_syncs.remove(room_id)
if room_id in self._partial_state_syncs_maybe_needing_restart:
(
restart_initial_destination,
restart_other_destinations,
) = self._partial_state_syncs_maybe_needing_restart.pop(room_id)
if is_still_partial_state_room:
self._start_partial_state_room_sync(
initial_destination=restart_initial_destination,
other_destinations=restart_other_destinations,
room_id=room_id,
)
run_as_background_process(
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
)
async def _sync_partial_state_room(
self,
initial_destination: Optional[str],
@ -1688,6 +1823,12 @@ class FederationHandler:
`initial_destination` is unavailable
room_id: room to be resynced
"""
# Assume that we run on the main process for now.
# TODO(faster_joins,multiple workers)
# When moving the sync to workers, we need to ensure that
# * `_start_partial_state_room_sync` still prevents duplicate resyncs
# * `_is_partial_state_room_linearizer` correctly guards partial state flags
# for rooms between the workers doing remote joins and resync.
assert not self.config.worker.worker_app
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
@ -1725,20 +1866,24 @@ class FederationHandler:
logger.info("Handling any pending device list updates")
await self._device_handler.handle_room_un_partial_stated(room_id)
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
async with self._is_partial_state_room_linearizer.queue(room_id):
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()
# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
# https://github.com/matrix-org/synapse/issues/12815
return
# we raced against more events arriving with partial state. Go round

View File

@ -226,8 +226,7 @@ class Notifier:
self.store = hs.get_datastores().main
self.pending_new_room_events: List[_PendingRoomEventEntry] = []
# Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = []
self._replication_notifier = hs.get_replication_notifier()
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
self._federation_client = hs.get_federation_http_client()
@ -279,7 +278,7 @@ class Notifier:
it needs to do any asynchronous work, a background thread should be started and
wrapped with run_as_background_process.
"""
self.replication_callbacks.append(cb)
self._replication_notifier.add_replication_callback(cb)
def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
"""Add a callback that will be called when a user joins a room.
@ -741,8 +740,7 @@ class Notifier:
def notify_replication(self) -> None:
"""Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks:
cb()
self._replication_notifier.notify_replication()
def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
for cb in self._new_join_in_room_callbacks:
@ -759,3 +757,26 @@ class Notifier:
# Tell the federation client about the fact the server is back up, so
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)
@attr.s(auto_attribs=True)
class ReplicationNotifier:
"""Tracks callbacks for things that need to know about stream changes.
This is separate from the notifier to avoid circular dependencies.
"""
_replication_callbacks: List[Callable[[], None]] = attr.Factory(list)
def add_replication_callback(self, cb: Callable[[], None]) -> None:
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
wrapped with run_as_background_process.
"""
self._replication_callbacks.append(cb)
def notify_replication(self) -> None:
"""Notify the any replication listeners that there's a new event"""
for cb in self._replication_callbacks:
cb()

View File

@ -352,7 +352,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_name=instance_name,
stream_name=stream_name,
position=position,
raise_on_timeout=False,
)
return result
@ -414,7 +413,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_name=content[_STREAM_POSITION_KEY]["instance_name"],
stream_name=stream_name,
position=position,
raise_on_timeout=False,
)
if self.CACHE:

View File

@ -59,7 +59,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
@ -207,6 +207,12 @@ class ReplicationDataHandler:
# we don't need to optimise this for multiple rows.
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
# The row's data is an `EventsStreamCurrentStateRow`.
# When we recompute the current state of a room based on forward
# extremities (see `update_current_state`), no new events are
# persisted, so we must poke the replication callbacks ourselves.
# This functionality is used when finishing up a partial state join.
self.notifier.notify_replication()
continue
assert isinstance(row, EventsStreamRow)
assert isinstance(row.data, EventsStreamEventRow)
@ -326,7 +332,6 @@ class ReplicationDataHandler:
instance_name: str,
stream_name: str,
position: int,
raise_on_timeout: bool = True,
) -> None:
"""Wait until this instance has received updates up to and including
the given stream position.
@ -335,8 +340,6 @@ class ReplicationDataHandler:
instance_name
stream_name
position
raise_on_timeout: Whether to raise an exception if we time out
waiting for the updates, or if we log an error and return.
"""
if instance_name == self._instance_name:
@ -365,19 +368,23 @@ class ReplicationDataHandler:
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
logger.info("Waiting for repl stream %r to reach %s", stream_name, position)
logger.info(
"Waiting for repl stream %r to reach %s (%s)",
stream_name,
position,
instance_name,
)
try:
await make_deferred_yieldable(deferred)
except defer.TimeoutError:
logger.error("Timed out waiting for stream %s", stream_name)
if raise_on_timeout:
raise
return
logger.info(
"Finished waiting for repl stream %r to reach %s", stream_name, position
"Finished waiting for repl stream %r to reach %s (%s)",
stream_name,
position,
instance_name,
)
def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:

View File

@ -16,7 +16,6 @@ from typing import TYPE_CHECKING
import attr
from synapse.replication.tcp.streams import Stream
from synapse.replication.tcp.streams._base import current_token_without_instance
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -42,8 +41,7 @@ class UnPartialStatedRoomStream(Stream):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_token,
store.get_un_partial_stated_rooms_from_stream,
)
@ -70,7 +68,6 @@ class UnPartialStatedEventStream(Stream):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
current_token_without_instance(store.get_un_partial_stated_events_token),
store.get_un_partial_stated_events_token,
store.get_un_partial_stated_events_from_stream,
)

View File

@ -107,7 +107,7 @@ from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpC
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.module_api import ModuleApi
from synapse.notifier import Notifier
from synapse.notifier import Notifier, ReplicationNotifier
from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
from synapse.push.pusherpool import PusherPool
from synapse.replication.tcp.client import ReplicationDataHandler
@ -389,6 +389,10 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_notifier(self) -> Notifier:
return Notifier(self)
@cache_in_self
def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()
@cache_in_self
def get_auth(self) -> Auth:
return Auth(self)

View File

@ -493,8 +493,6 @@ class StateStorageController:
up to date.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/12814
# https://github.com/matrix-org/synapse/issues/12815
# https://github.com/matrix-org/synapse/issues/13008
return await self.stores.main.get_partial_current_state_deltas(

View File

@ -75,6 +75,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="account_data",
instance_name=self._instance_name,
tables=[
@ -95,6 +96,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
# SQLite).
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],

View File

@ -75,6 +75,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._cache_id_gen = MultiWriterIdGenerator(
db_conn,
database,
notifier=hs.get_replication_notifier(),
stream_name="caches",
instance_name=hs.get_instance_name(),
tables=[

View File

@ -91,6 +91,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
tables=[("device_inbox", "instance_name", "stream_id")],
@ -101,7 +102,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
else:
self._can_write_to_device = True
self._device_inbox_id_gen = StreamIdGenerator(
db_conn, "device_inbox", "stream_id"
db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id"
)
max_device_inbox_id = self._device_inbox_id_gen.get_current_token()

View File

@ -92,6 +92,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
# class below that is used on the main process.
self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_lists_stream",
"stream_id",
extra_tables=[

View File

@ -1181,7 +1181,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
super().__init__(database, db_conn, hs)
self._cross_signing_id_gen = StreamIdGenerator(
db_conn, "e2e_cross_signing_keys", "stream_id"
db_conn,
hs.get_replication_notifier(),
"e2e_cross_signing_keys",
"stream_id",
)
async def set_e2e_device_keys(

View File

@ -110,6 +110,10 @@ event_fetch_ongoing_gauge = Gauge(
)
class InvalidEventError(Exception):
"""The event retrieved from the database is invalid and cannot be used."""
@attr.s(slots=True, auto_attribs=True)
class EventCacheEntry:
event: EventBase
@ -191,6 +195,7 @@ class EventsWorkerStore(SQLBaseStore):
self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="events",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
@ -200,6 +205,7 @@ class EventsWorkerStore(SQLBaseStore):
self._backfill_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="backfill",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
@ -217,12 +223,14 @@ class EventsWorkerStore(SQLBaseStore):
# SQLite).
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"events",
"stream_ordering",
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"events",
"stream_ordering",
step=-1,
@ -300,6 +308,7 @@ class EventsWorkerStore(SQLBaseStore):
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="un_partial_stated_event_stream",
instance_name=hs.get_instance_name(),
tables=[
@ -311,14 +320,18 @@ class EventsWorkerStore(SQLBaseStore):
)
else:
self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
db_conn, "un_partial_stated_event_stream", "stream_id"
db_conn,
hs.get_replication_notifier(),
"un_partial_stated_event_stream",
"stream_id",
)
def get_un_partial_stated_events_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
# writers because workers that don't write often will hold all
# readers up.
return self._un_partial_stated_events_stream_id_gen.get_current_token()
def get_un_partial_stated_events_token(self, instance_name: str) -> int:
return (
self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer(
instance_name
)
)
async def get_un_partial_stated_events_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
@ -408,6 +421,8 @@ class EventsWorkerStore(SQLBaseStore):
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
elif stream_name == UnPartialStatedEventStream.NAME:
self._un_partial_stated_events_stream_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)
async def have_censored_event(self, event_id: str) -> bool:
@ -1299,7 +1314,7 @@ class EventsWorkerStore(SQLBaseStore):
# invites, so just accept it for all membership events.
#
if d["type"] != EventTypes.Member:
raise Exception(
raise InvalidEventError(
"Room %s for event %s is unknown" % (d["room_id"], event_id)
)

View File

@ -77,6 +77,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
@ -85,7 +86,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
)
else:
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
)
self.hs = hs

View File

@ -118,6 +118,7 @@ class PushRulesWorkerStore(
# class below that is used on the main process.
self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"push_rules_stream",
"stream_id",
is_writer=hs.config.worker.worker_app is None,

View File

@ -62,6 +62,7 @@ class PusherWorkerStore(SQLBaseStore):
# class below that is used on the main process.
self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"pushers",
"id",
extra_tables=[("deleted_pushers", "stream_id")],

View File

@ -73,6 +73,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
self._receipts_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="receipts",
instance_name=self._instance_name,
tables=[("receipts_linearized", "instance_name", "stream_id")],
@ -91,6 +92,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# SQLite).
self._receipts_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"receipts_linearized",
"stream_id",
is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,

View File

@ -43,6 +43,7 @@ from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.config.homeserver import HomeServerConfig
from synapse.events import EventBase
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@ -126,6 +127,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="un_partial_stated_room_stream",
instance_name=self._instance_name,
tables=[
@ -137,9 +139,19 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
else:
self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
db_conn, "un_partial_stated_room_stream", "stream_id"
db_conn,
hs.get_replication_notifier(),
"un_partial_stated_room_stream",
"stream_id",
)
def process_replication_position(
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == UnPartialStatedRoomStream.NAME:
self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
return super().process_replication_position(stream_name, instance_name, token)
async def store_room(
self,
room_id: str,
@ -1277,13 +1289,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
return result["join_event_id"], result["device_lists_stream_id"]
def get_un_partial_stated_rooms_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there
# are multiple writers because workers that don't write often will
# hold all readers up.
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
return self._un_partial_stated_rooms_stream_id_gen.get_current_token_for_writer(
instance_name
)
async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int

View File

@ -95,6 +95,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
self._get_state_group_for_event.invalidate((row.event_id,))
self.is_partial_state_event.invalidate((row.event_id,))
super().process_replication_rows(stream_name, instance_name, token, rows)
@ -485,6 +486,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"rejection_status_changed": rejection_status_changed,
},
)
txn.call_after(self.hs.get_notifier().on_new_replication_data)
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):

View File

@ -29,6 +29,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.events_worker import InvalidEventError
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
@ -554,7 +555,17 @@ class StatsStore(StateDeltasStore):
"get_initial_state_for_room", _fetch_current_state_stats
)
state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
try:
state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
except InvalidEventError as e:
# If an exception occurs fetching events then the room is broken;
# skip process it to avoid being stuck on a room.
logger.warning(
"Failed to fetch events for room %s, skipping stats calculation: %r.",
room_id,
e,
)
return
room_state: Dict[str, Union[None, bool, str]] = {
"join_rules": None,

View File

@ -20,6 +20,7 @@ from collections import OrderedDict
from contextlib import contextmanager
from types import TracebackType
from typing import (
TYPE_CHECKING,
AsyncContextManager,
ContextManager,
Dict,
@ -49,6 +50,9 @@ from synapse.storage.database import (
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import PostgresSequenceGenerator
if TYPE_CHECKING:
from synapse.notifier import ReplicationNotifier
logger = logging.getLogger(__name__)
@ -182,6 +186,7 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
def __init__(
self,
db_conn: LoggingDatabaseConnection,
notifier: "ReplicationNotifier",
table: str,
column: str,
extra_tables: Iterable[Tuple[str, str]] = (),
@ -205,6 +210,8 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
# The key and values are the same, but we never look at the values.
self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
self._notifier = notifier
def advance(self, instance_name: str, new_id: int) -> None:
# Advance should never be called on a writer instance, only over replication
if self._is_writer:
@ -227,6 +234,8 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
with self._lock:
self._unfinished_ids.pop(next_id)
self._notifier.notify_replication()
return _AsyncCtxManagerWrapper(manager())
def get_next_mult(self, n: int) -> AsyncContextManager[Sequence[int]]:
@ -250,6 +259,8 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
for next_id in next_ids:
self._unfinished_ids.pop(next_id)
self._notifier.notify_replication()
return _AsyncCtxManagerWrapper(manager())
def get_current_token(self) -> int:
@ -296,6 +307,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
self,
db_conn: LoggingDatabaseConnection,
db: DatabasePool,
notifier: "ReplicationNotifier",
stream_name: str,
instance_name: str,
tables: List[Tuple[str, str, str]],
@ -304,6 +316,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
positive: bool = True,
) -> None:
self._db = db
self._notifier = notifier
self._stream_name = stream_name
self._instance_name = instance_name
self._positive = positive
@ -535,7 +548,9 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
# Cast safety: the second argument to _MultiWriterCtxManager, multiple_ids,
# controls the return type. If `None` or omitted, the context manager yields
# a single integer stream_id; otherwise it yields a list of stream_ids.
return cast(AsyncContextManager[int], _MultiWriterCtxManager(self))
return cast(
AsyncContextManager[int], _MultiWriterCtxManager(self, self._notifier)
)
def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]:
# If we have a list of instances that are allowed to write to this
@ -544,7 +559,10 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
raise Exception("Tried to allocate stream ID on non-writer")
# Cast safety: see get_next.
return cast(AsyncContextManager[List[int]], _MultiWriterCtxManager(self, n))
return cast(
AsyncContextManager[List[int]],
_MultiWriterCtxManager(self, self._notifier, n),
)
def get_next_txn(self, txn: LoggingTransaction) -> int:
"""
@ -563,6 +581,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
txn.call_after(self._mark_id_as_finished, next_id)
txn.call_on_exception(self._mark_id_as_finished, next_id)
txn.call_after(self._notifier.notify_replication)
# Update the `stream_positions` table with newly updated stream
# ID (unless self._writers is not set in which case we don't
@ -787,6 +806,7 @@ class _MultiWriterCtxManager:
"""Async context manager returned by MultiWriterIdGenerator"""
id_gen: MultiWriterIdGenerator
notifier: "ReplicationNotifier"
multiple_ids: Optional[int] = None
stream_ids: List[int] = attr.Factory(list)
@ -814,6 +834,8 @@ class _MultiWriterCtxManager:
for i in self.stream_ids:
self.id_gen._mark_id_as_finished(i)
self.notifier.notify_replication()
if exc_type is not None:
return False

View File

@ -8,7 +8,10 @@ from tests import unittest
class TestRatelimiter(unittest.HomeserverTestCase):
def test_allowed_via_can_do_action(self):
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
allowed, time_allowed = self.get_success_or_raise(
limiter.can_do_action(None, key="test_id", _time_now_s=0)
@ -30,7 +33,7 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_allowed_appservice_ratelimited_via_can_requester_do_action(self):
appservice = ApplicationService(
None,
token="fake_token",
id="foo",
rate_limited=True,
sender="@as:example.com",
@ -38,7 +41,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
as_requester = create_requester("@user:example.com", app_service=appservice)
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
allowed, time_allowed = self.get_success_or_raise(
limiter.can_do_action(as_requester, _time_now_s=0)
@ -60,7 +66,7 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_allowed_appservice_via_can_requester_do_action(self):
appservice = ApplicationService(
None,
token="fake_token",
id="foo",
rate_limited=False,
sender="@as:example.com",
@ -68,7 +74,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
as_requester = create_requester("@user:example.com", app_service=appservice)
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
allowed, time_allowed = self.get_success_or_raise(
limiter.can_do_action(as_requester, _time_now_s=0)
@ -90,7 +99,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_allowed_via_ratelimit(self):
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
# Shouldn't raise
@ -114,7 +126,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
"""
# Create a Ratelimiter with a very low allowed rate_hz and burst_count
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
# First attempt should be allowed
@ -160,7 +175,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
"""
# Create a Ratelimiter with a very low allowed rate_hz and burst_count
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
# First attempt should be allowed
@ -188,7 +206,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_pruning(self):
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=1
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=1,
)
self.get_success_or_raise(
limiter.can_do_action(None, key="test_id_1", _time_now_s=0)
@ -223,7 +244,7 @@ class TestRatelimiter(unittest.HomeserverTestCase):
)
)
limiter = Ratelimiter(store=store, clock=None, rate_hz=0.1, burst_count=1)
limiter = Ratelimiter(store=store, clock=self.clock, rate_hz=0.1, burst_count=1)
# Shouldn't raise
for _ in range(20):
@ -231,7 +252,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_multiple_actions(self):
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=3,
)
# Test that 4 actions aren't allowed with a maximum burst of 3.
allowed, time_allowed = self.get_success_or_raise(
@ -295,7 +319,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
extra tokens by timing requests.
"""
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=3,
)
def consume_at(time: float) -> bool:
@ -317,7 +344,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_record_action_which_doesnt_fill_bucket(self) -> None:
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=3,
)
# Observe two actions, leaving room in the bucket for one more.
@ -337,7 +367,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_record_action_which_fills_bucket(self) -> None:
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=3,
)
# Observe three actions, filling up the bucket.
@ -363,7 +396,10 @@ class TestRatelimiter(unittest.HomeserverTestCase):
def test_record_action_which_overfills_bucket(self) -> None:
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
store=self.hs.get_datastores().main,
clock=self.clock,
rate_hz=0.1,
burst_count=3,
)
# Observe four actions, exceeding the bucket.

View File

@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import cast
from typing import Collection, Optional, cast
from unittest import TestCase
from unittest.mock import Mock, patch
from twisted.internet.defer import Deferred
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes
@ -679,3 +680,112 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase):
f"Stale partial-stated room flag left over for {room_id} after a"
f" failed do_invite_join!",
)
def test_duplicate_partial_state_room_syncs(self) -> None:
"""
Tests that concurrent partial state syncs are not started for the same room.
"""
is_partial_state = True
end_sync: "Deferred[None]" = Deferred()
async def is_partial_state_room(room_id: str) -> bool:
return is_partial_state
async def sync_partial_state_room(
initial_destination: Optional[str],
other_destinations: Collection[str],
room_id: str,
) -> None:
nonlocal end_sync
try:
await end_sync
finally:
end_sync = Deferred()
mock_is_partial_state_room = Mock(side_effect=is_partial_state_room)
mock_sync_partial_state_room = Mock(side_effect=sync_partial_state_room)
fed_handler = self.hs.get_federation_handler()
store = self.hs.get_datastores().main
with patch.object(
fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room
), patch.object(store, "is_partial_state_room", mock_is_partial_state_room):
# Start the partial state sync.
fed_handler._start_partial_state_room_sync("hs1", ["hs2"], "room_id")
self.assertEqual(mock_sync_partial_state_room.call_count, 1)
# Try to start another partial state sync.
# Nothing should happen.
fed_handler._start_partial_state_room_sync("hs3", ["hs2"], "room_id")
self.assertEqual(mock_sync_partial_state_room.call_count, 1)
# End the partial state sync
is_partial_state = False
end_sync.callback(None)
# The partial state sync should not be restarted.
self.assertEqual(mock_sync_partial_state_room.call_count, 1)
# The next attempt to start the partial state sync should work.
is_partial_state = True
fed_handler._start_partial_state_room_sync("hs3", ["hs2"], "room_id")
self.assertEqual(mock_sync_partial_state_room.call_count, 2)
def test_partial_state_room_sync_restart(self) -> None:
"""
Tests that partial state syncs are restarted when a second partial state sync
was deduplicated and the first partial state sync fails.
"""
is_partial_state = True
end_sync: "Deferred[None]" = Deferred()
async def is_partial_state_room(room_id: str) -> bool:
return is_partial_state
async def sync_partial_state_room(
initial_destination: Optional[str],
other_destinations: Collection[str],
room_id: str,
) -> None:
nonlocal end_sync
try:
await end_sync
finally:
end_sync = Deferred()
mock_is_partial_state_room = Mock(side_effect=is_partial_state_room)
mock_sync_partial_state_room = Mock(side_effect=sync_partial_state_room)
fed_handler = self.hs.get_federation_handler()
store = self.hs.get_datastores().main
with patch.object(
fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room
), patch.object(store, "is_partial_state_room", mock_is_partial_state_room):
# Start the partial state sync.
fed_handler._start_partial_state_room_sync("hs1", ["hs2"], "room_id")
self.assertEqual(mock_sync_partial_state_room.call_count, 1)
# Fail the partial state sync.
# The partial state sync should not be restarted.
end_sync.errback(Exception("Failed to request /state_ids"))
self.assertEqual(mock_sync_partial_state_room.call_count, 1)
# Start the partial state sync again.
fed_handler._start_partial_state_room_sync("hs1", ["hs2"], "room_id")
self.assertEqual(mock_sync_partial_state_room.call_count, 2)
# Deduplicate another partial state sync.
fed_handler._start_partial_state_room_sync("hs3", ["hs2"], "room_id")
self.assertEqual(mock_sync_partial_state_room.call_count, 2)
# Fail the partial state sync.
# It should restart with the latest parameters.
end_sync.errback(Exception("Failed to request /state_ids"))
self.assertEqual(mock_sync_partial_state_room.call_count, 3)
mock_sync_partial_state_room.assert_called_with(
initial_destination="hs3",
other_destinations=["hs2"],
room_id="room_id",
)

View File

@ -404,6 +404,9 @@ class ModuleApiTestCase(HomeserverTestCase):
self.module_api.send_local_online_presence_to([remote_user_id])
)
# We don't always send out federation immediately, so we advance the clock.
self.reactor.advance(1000)
# Check that a presence update was sent as part of a federation transaction
found_update = False
calls = (

View File

@ -14,7 +14,7 @@
from twisted.internet import defer
from synapse.replication.tcp.commands import PositionCommand, RdataCommand
from synapse.replication.tcp.commands import PositionCommand
from tests.replication._base import BaseMultiWorkerStreamTestCase
@ -111,20 +111,14 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
next_token = self.get_success(ctx.__aenter__())
self.get_success(ctx.__aexit__(None, None, None))
cmd_handler.send_command(
RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
)
self.replicate()
self.get_success(
data_handler.wait_for_stream_position("worker1", "caches", next_token)
)
# `wait_for_stream_position` should only return once master receives an
# RDATA from the worker
ctx = cache_id_gen.get_next()
next_token = self.get_success(ctx.__aenter__())
self.get_success(ctx.__aexit__(None, None, None))
# `wait_for_stream_position` should only return once master receives a
# notification that `next_token` has persisted.
ctx_worker1 = cache_id_gen.get_next()
next_token = self.get_success(ctx_worker1.__aenter__())
d = defer.ensureDeferred(
data_handler.wait_for_stream_position("worker1", "caches", next_token)
@ -142,10 +136,7 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
)
self.assertFalse(d.called)
# ... but receiving the RDATA should
cmd_handler.send_command(
RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
)
self.replicate()
# ... but worker1 finishing (and so sending an update) should.
self.get_success(ctx_worker1.__aexit__(None, None, None))
self.assertTrue(d.called)

View File

@ -69,7 +69,7 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase):
self._rlsn._store.user_last_seen_monthly_active = Mock(
return_value=make_awaitable(1000)
)
self._rlsn._server_notices_manager.send_notice = Mock(
self._rlsn._server_notices_manager.send_notice = Mock( # type: ignore[assignment]
return_value=make_awaitable(Mock())
)
self._send_notice = self._rlsn._server_notices_manager.send_notice
@ -82,8 +82,8 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase):
self._rlsn._server_notices_manager.maybe_get_notice_room_for_user = Mock(
return_value=make_awaitable("!something:localhost")
)
self._rlsn._store.add_tag_to_room = Mock(return_value=make_awaitable(None))
self._rlsn._store.get_tags_for_room = Mock(return_value=make_awaitable({}))
self._rlsn._store.add_tag_to_room = Mock(return_value=make_awaitable(None)) # type: ignore[assignment]
self._rlsn._store.get_tags_for_room = Mock(return_value=make_awaitable({})) # type: ignore[assignment]
@override_config({"hs_disabled": True})
def test_maybe_send_server_notice_disabled_hs(self):
@ -361,9 +361,10 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
tok: The access token of the user that joined the room.
room_id: The ID of the room that's been joined.
"""
user_id = None
tok = None
invites = []
# We need at least one user to process
self.assertGreater(self.hs.config.server.max_mau_value, 0)
invites = {}
# Register as many users as the MAU limit allows.
for i in range(self.hs.config.server.max_mau_value):

View File

@ -40,9 +40,23 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
self.token = self.login("foo", "pass")
def _generate_room(self) -> str:
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
"""Create a room and return the room ID."""
return self.helper.create_room_as(self.user_id, tok=self.token)
return room_id
def run_background_updates(self, update_name: str) -> None:
"""Insert and run the background update."""
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": "{}"},
)
)
# ... and tell the DataStore that it hasn't finished all updates yet
self.store.db_pool.updates._all_done = False
# Now let's actually drive the updates to completion
self.wait_for_background_updates()
def test_background_populate_rooms_creator_column(self) -> None:
"""Test that the background update to populate the rooms creator column
@ -71,22 +85,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
self.assertEqual(room_creator_before, None)
# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
"progress_json": "{}",
},
)
)
# ... and tell the DataStore that it hasn't finished all updates yet
self.store.db_pool.updates._all_done = False
# Now let's actually drive the updates to completion
self.wait_for_background_updates()
self.run_background_updates(_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN)
# Make sure the background update filled in the room creator
room_creator_after = self.get_success(
@ -137,22 +136,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
)
# Insert and run the background update
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN,
"progress_json": "{}",
},
)
)
# ... and tell the DataStore that it hasn't finished all updates yet
self.store.db_pool.updates._all_done = False
# Now let's actually drive the updates to completion
self.wait_for_background_updates()
self.run_background_updates(_BackgroundUpdates.ADD_ROOM_TYPE_COLUMN)
# Make sure the background update filled in the room type
room_type_after = self.get_success(
@ -164,3 +148,39 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
)
self.assertEqual(room_type_after, RoomTypes.SPACE)
def test_populate_stats_broken_rooms(self) -> None:
"""Ensure that re-populating room stats skips broken rooms."""
# Create a good room.
good_room_id = self._generate_room()
# Create a room and then break it by having no room version.
room_id = self._generate_room()
self.get_success(
self.store.db_pool.simple_update(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"room_version": None},
desc="test",
)
)
# Nuke any current stats in the database.
self.get_success(
self.store.db_pool.simple_delete(
table="room_stats_state", keyvalues={"1": 1}, desc="test"
)
)
self.run_background_updates("populate_stats_process_rooms")
# Only the good room appears in the stats tables.
results = self.get_success(
self.store.db_pool.simple_select_onecol(
table="room_stats_state",
keyvalues={},
retcol="room_id",
)
)
self.assertEqual(results, [good_room_id])

View File

@ -52,6 +52,7 @@ class StreamIdGeneratorTestCase(HomeserverTestCase):
def _create(conn: LoggingDatabaseConnection) -> StreamIdGenerator:
return StreamIdGenerator(
db_conn=conn,
notifier=self.hs.get_replication_notifier(),
table="foobar",
column="stream_id",
)
@ -196,6 +197,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
return MultiWriterIdGenerator(
conn,
self.db_pool,
notifier=self.hs.get_replication_notifier(),
stream_name="test_stream",
instance_name=instance_name,
tables=[("foobar", "instance_name", "stream_id")],
@ -630,6 +632,7 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
return MultiWriterIdGenerator(
conn,
self.db_pool,
notifier=self.hs.get_replication_notifier(),
stream_name="test_stream",
instance_name=instance_name,
tables=[("foobar", "instance_name", "stream_id")],
@ -766,6 +769,7 @@ class MultiTableMultiWriterIdGeneratorTestCase(HomeserverTestCase):
return MultiWriterIdGenerator(
conn,
self.db_pool,
notifier=self.hs.get_replication_notifier(),
stream_name="test_stream",
instance_name=instance_name,
tables=[