910 lines
36 KiB
Python
910 lines
36 KiB
Python
# Copyright 2014 - 2016 OpenMarket Ltd
|
|
# Copyright 2017 - 2018 New Vector Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import logging
|
|
from typing import TYPE_CHECKING, Dict, List, Optional, Set
|
|
|
|
import attr
|
|
|
|
from twisted.python.failure import Failure
|
|
|
|
from synapse.api.constants import Direction, EventTypes, Membership
|
|
from synapse.api.errors import SynapseError
|
|
from synapse.api.filtering import Filter
|
|
from synapse.events.utils import SerializeEventConfig
|
|
from synapse.handlers.room import ShutdownRoomResponse
|
|
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
|
|
from synapse.logging.opentracing import trace
|
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
|
from synapse.rest.admin._base import assert_user_is_admin
|
|
from synapse.streams.config import PaginationConfig
|
|
from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
|
|
from synapse.types.state import StateFilter
|
|
from synapse.util.async_helpers import ReadWriteLock
|
|
from synapse.util.stringutils import random_string
|
|
from synapse.visibility import filter_events_for_client
|
|
|
|
if TYPE_CHECKING:
|
|
from synapse.server import HomeServer
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# How many single event gaps we tolerate returning in a `/messages` response before we
|
|
# backfill and try to fill in the history. This is an arbitrarily picked number so feel
|
|
# free to tune it in the future.
|
|
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
|
|
|
|
|
|
# This is used to avoid purging a room several time at the same moment,
|
|
# and also paginating during a purge. Pagination can trigger backfill,
|
|
# which would create old events locally, and would potentially clash with the room delete.
|
|
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"
|
|
|
|
|
|
@attr.s(slots=True, auto_attribs=True)
|
|
class PurgeStatus:
|
|
"""Object tracking the status of a purge request
|
|
|
|
This class contains information on the progress of a purge request, for
|
|
return by get_purge_status.
|
|
"""
|
|
|
|
STATUS_ACTIVE = 0
|
|
STATUS_COMPLETE = 1
|
|
STATUS_FAILED = 2
|
|
|
|
STATUS_TEXT = {
|
|
STATUS_ACTIVE: "active",
|
|
STATUS_COMPLETE: "complete",
|
|
STATUS_FAILED: "failed",
|
|
}
|
|
|
|
# Save the error message if an error occurs
|
|
error: str = ""
|
|
|
|
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
|
|
status: int = STATUS_ACTIVE
|
|
|
|
def asdict(self) -> JsonDict:
|
|
ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
|
|
if self.error:
|
|
ret["error"] = self.error
|
|
return ret
|
|
|
|
|
|
@attr.s(slots=True, auto_attribs=True)
|
|
class DeleteStatus:
|
|
"""Object tracking the status of a delete room request
|
|
|
|
This class contains information on the progress of a delete room request, for
|
|
return by get_delete_status.
|
|
"""
|
|
|
|
STATUS_PURGING = 0
|
|
STATUS_COMPLETE = 1
|
|
STATUS_FAILED = 2
|
|
STATUS_SHUTTING_DOWN = 3
|
|
|
|
STATUS_TEXT = {
|
|
STATUS_PURGING: "purging",
|
|
STATUS_COMPLETE: "complete",
|
|
STATUS_FAILED: "failed",
|
|
STATUS_SHUTTING_DOWN: "shutting_down",
|
|
}
|
|
|
|
# Tracks whether this request has completed.
|
|
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
|
|
status: int = STATUS_PURGING
|
|
|
|
# Save the error message if an error occurs
|
|
error: str = ""
|
|
|
|
# Saves the result of an action to give it back to REST API
|
|
shutdown_room: ShutdownRoomResponse = {
|
|
"kicked_users": [],
|
|
"failed_to_kick_users": [],
|
|
"local_aliases": [],
|
|
"new_room_id": None,
|
|
}
|
|
|
|
def asdict(self) -> JsonDict:
|
|
ret = {
|
|
"status": DeleteStatus.STATUS_TEXT[self.status],
|
|
"shutdown_room": self.shutdown_room,
|
|
}
|
|
if self.error:
|
|
ret["error"] = self.error
|
|
return ret
|
|
|
|
|
|
class PaginationHandler:
|
|
"""Handles pagination and purge history requests.
|
|
|
|
These are in the same handler due to the fact we need to block clients
|
|
paginating during a purge.
|
|
"""
|
|
|
|
# when to remove a completed deletion/purge from the results map
|
|
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
|
|
|
|
def __init__(self, hs: "HomeServer"):
|
|
self.hs = hs
|
|
self.auth = hs.get_auth()
|
|
self.store = hs.get_datastores().main
|
|
self._storage_controllers = hs.get_storage_controllers()
|
|
self._state_storage_controller = self._storage_controllers.state
|
|
self.clock = hs.get_clock()
|
|
self._server_name = hs.hostname
|
|
self._room_shutdown_handler = hs.get_room_shutdown_handler()
|
|
self._relations_handler = hs.get_relations_handler()
|
|
self._worker_locks = hs.get_worker_locks_handler()
|
|
|
|
self.pagination_lock = ReadWriteLock()
|
|
# IDs of rooms in which there currently an active purge *or delete* operation.
|
|
self._purges_in_progress_by_room: Set[str] = set()
|
|
# map from purge id to PurgeStatus
|
|
self._purges_by_id: Dict[str, PurgeStatus] = {}
|
|
# map from purge id to DeleteStatus
|
|
self._delete_by_id: Dict[str, DeleteStatus] = {}
|
|
# map from room id to delete ids
|
|
# Dict[`room_id`, List[`delete_id`]]
|
|
self._delete_by_room: Dict[str, List[str]] = {}
|
|
self._event_serializer = hs.get_event_client_serializer()
|
|
|
|
self._retention_default_max_lifetime = (
|
|
hs.config.retention.retention_default_max_lifetime
|
|
)
|
|
|
|
self._retention_allowed_lifetime_min = (
|
|
hs.config.retention.retention_allowed_lifetime_min
|
|
)
|
|
self._retention_allowed_lifetime_max = (
|
|
hs.config.retention.retention_allowed_lifetime_max
|
|
)
|
|
self._is_master = hs.config.worker.worker_app is None
|
|
|
|
if hs.config.retention.retention_enabled and self._is_master:
|
|
# Run the purge jobs described in the configuration file.
|
|
for job in hs.config.retention.retention_purge_jobs:
|
|
logger.info("Setting up purge job with config: %s", job)
|
|
|
|
self.clock.looping_call(
|
|
run_as_background_process,
|
|
job.interval,
|
|
"purge_history_for_rooms_in_range",
|
|
self.purge_history_for_rooms_in_range,
|
|
job.shortest_max_lifetime,
|
|
job.longest_max_lifetime,
|
|
)
|
|
|
|
async def purge_history_for_rooms_in_range(
|
|
self, min_ms: Optional[int], max_ms: Optional[int]
|
|
) -> None:
|
|
"""Purge outdated events from rooms within the given retention range.
|
|
|
|
If a default retention policy is defined in the server's configuration and its
|
|
'max_lifetime' is within this range, also targets rooms which don't have a
|
|
retention policy.
|
|
|
|
Args:
|
|
min_ms: Duration in milliseconds that define the lower limit of
|
|
the range to handle (exclusive). If None, it means that the range has no
|
|
lower limit.
|
|
max_ms: Duration in milliseconds that define the upper limit of
|
|
the range to handle (inclusive). If None, it means that the range has no
|
|
upper limit.
|
|
"""
|
|
# We want the storage layer to include rooms with no retention policy in its
|
|
# return value only if a default retention policy is defined in the server's
|
|
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
|
|
# max_ms or higher than min_ms (or both).
|
|
if self._retention_default_max_lifetime is not None:
|
|
include_null = True
|
|
|
|
if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
|
|
# The default max_lifetime is lower than (or equal to) min_ms.
|
|
include_null = False
|
|
|
|
if max_ms is not None and max_ms < self._retention_default_max_lifetime:
|
|
# The default max_lifetime is higher than max_ms.
|
|
include_null = False
|
|
else:
|
|
include_null = False
|
|
|
|
logger.info(
|
|
"[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)",
|
|
min_ms,
|
|
max_ms,
|
|
include_null,
|
|
)
|
|
|
|
rooms = await self.store.get_rooms_for_retention_period_in_range(
|
|
min_ms, max_ms, include_null
|
|
)
|
|
|
|
logger.debug("[purge] Rooms to purge: %s", rooms)
|
|
|
|
for room_id, retention_policy in rooms.items():
|
|
logger.info("[purge] Attempting to purge messages in room %s", room_id)
|
|
|
|
if room_id in self._purges_in_progress_by_room:
|
|
logger.warning(
|
|
"[purge] not purging room %s as there's an ongoing purge running"
|
|
" for this room",
|
|
room_id,
|
|
)
|
|
continue
|
|
|
|
# If max_lifetime is None, it means that the room has no retention policy.
|
|
# Given we only retrieve such rooms when there's a default retention policy
|
|
# defined in the server's configuration, we can safely assume that's the
|
|
# case and use it for this room.
|
|
max_lifetime = (
|
|
retention_policy.max_lifetime or self._retention_default_max_lifetime
|
|
)
|
|
|
|
# Cap the effective max_lifetime to be within the range allowed in the
|
|
# config.
|
|
# We do this in two steps:
|
|
# 1. Make sure it's higher or equal to the minimum allowed value, and if
|
|
# it's not replace it with that value. This is because the server
|
|
# operator can be required to not delete information before a given
|
|
# time, e.g. to comply with freedom of information laws.
|
|
# 2. Make sure the resulting value is lower or equal to the maximum allowed
|
|
# value, and if it's not replace it with that value. This is because the
|
|
# server operator can be required to delete any data after a specific
|
|
# amount of time.
|
|
if self._retention_allowed_lifetime_min is not None:
|
|
max_lifetime = max(self._retention_allowed_lifetime_min, max_lifetime)
|
|
|
|
if self._retention_allowed_lifetime_max is not None:
|
|
max_lifetime = min(max_lifetime, self._retention_allowed_lifetime_max)
|
|
|
|
logger.debug("[purge] max_lifetime for room %s: %s", room_id, max_lifetime)
|
|
|
|
# Figure out what token we should start purging at.
|
|
ts = self.clock.time_msec() - max_lifetime
|
|
|
|
stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
|
|
|
|
r = await self.store.get_room_event_before_stream_ordering(
|
|
room_id,
|
|
stream_ordering,
|
|
)
|
|
if not r:
|
|
logger.warning(
|
|
"[purge] purging events not possible: No event found "
|
|
"(ts %i => stream_ordering %i)",
|
|
ts,
|
|
stream_ordering,
|
|
)
|
|
continue
|
|
|
|
(stream, topo, _event_id) = r
|
|
token = "t%d-%d" % (topo, stream)
|
|
|
|
purge_id = random_string(16)
|
|
|
|
self._purges_by_id[purge_id] = PurgeStatus()
|
|
|
|
logger.info(
|
|
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
|
|
)
|
|
|
|
# We want to purge everything, including local events, and to run the purge in
|
|
# the background so that it's not blocking any other operation apart from
|
|
# other purges in the same room.
|
|
run_as_background_process(
|
|
"_purge_history",
|
|
self._purge_history,
|
|
purge_id,
|
|
room_id,
|
|
token,
|
|
True,
|
|
)
|
|
|
|
def start_purge_history(
|
|
self, room_id: str, token: str, delete_local_events: bool = False
|
|
) -> str:
|
|
"""Start off a history purge on a room.
|
|
|
|
Args:
|
|
room_id: The room to purge from
|
|
token: topological token to delete events before
|
|
delete_local_events: True to delete local events as well as
|
|
remote ones
|
|
|
|
Returns:
|
|
unique ID for this purge transaction.
|
|
"""
|
|
if room_id in self._purges_in_progress_by_room:
|
|
raise SynapseError(
|
|
400, "History purge already in progress for %s" % (room_id,)
|
|
)
|
|
|
|
purge_id = random_string(16)
|
|
|
|
# we log the purge_id here so that it can be tied back to the
|
|
# request id in the log lines.
|
|
logger.info("[purge] starting purge_id %s", purge_id)
|
|
|
|
self._purges_by_id[purge_id] = PurgeStatus()
|
|
run_as_background_process(
|
|
"purge_history",
|
|
self._purge_history,
|
|
purge_id,
|
|
room_id,
|
|
token,
|
|
delete_local_events,
|
|
)
|
|
return purge_id
|
|
|
|
async def _purge_history(
|
|
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
|
|
) -> None:
|
|
"""Carry out a history purge on a room.
|
|
|
|
Args:
|
|
purge_id: The ID for this purge.
|
|
room_id: The room to purge from
|
|
token: topological token to delete events before
|
|
delete_local_events: True to delete local events as well as remote ones
|
|
"""
|
|
self._purges_in_progress_by_room.add(room_id)
|
|
try:
|
|
async with self._worker_locks.acquire_read_write_lock(
|
|
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
|
|
):
|
|
await self._storage_controllers.purge_events.purge_history(
|
|
room_id, token, delete_local_events
|
|
)
|
|
logger.info("[purge] complete")
|
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
|
except Exception:
|
|
f = Failure()
|
|
logger.error(
|
|
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
|
|
)
|
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
|
self._purges_by_id[purge_id].error = f.getErrorMessage()
|
|
finally:
|
|
self._purges_in_progress_by_room.discard(room_id)
|
|
|
|
# remove the purge from the list 24 hours after it completes
|
|
def clear_purge() -> None:
|
|
del self._purges_by_id[purge_id]
|
|
|
|
self.hs.get_reactor().callLater(
|
|
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
|
|
)
|
|
|
|
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
|
|
"""Get the current status of an active purge
|
|
|
|
Args:
|
|
purge_id: purge_id returned by start_purge_history
|
|
"""
|
|
return self._purges_by_id.get(purge_id)
|
|
|
|
def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
|
|
"""Get the current status of an active deleting
|
|
|
|
Args:
|
|
delete_id: delete_id returned by start_shutdown_and_purge_room
|
|
"""
|
|
return self._delete_by_id.get(delete_id)
|
|
|
|
def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
|
|
"""Get all active delete ids by room
|
|
|
|
Args:
|
|
room_id: room_id that is deleted
|
|
"""
|
|
return self._delete_by_room.get(room_id)
|
|
|
|
async def purge_room(self, room_id: str, force: bool = False) -> None:
|
|
"""Purge the given room from the database.
|
|
This function is part the delete room v1 API.
|
|
|
|
Args:
|
|
room_id: room to be purged
|
|
force: set true to skip checking for joined users.
|
|
"""
|
|
async with self._worker_locks.acquire_multi_read_write_lock(
|
|
[
|
|
(PURGE_PAGINATION_LOCK_NAME, room_id),
|
|
(NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id),
|
|
],
|
|
write=True,
|
|
):
|
|
# first check that we have no users in this room
|
|
if not force:
|
|
joined = await self.store.is_host_joined(room_id, self._server_name)
|
|
if joined:
|
|
raise SynapseError(400, "Users are still joined to this room")
|
|
|
|
await self._storage_controllers.purge_events.purge_room(room_id)
|
|
|
|
@trace
|
|
async def get_messages(
|
|
self,
|
|
requester: Requester,
|
|
room_id: str,
|
|
pagin_config: PaginationConfig,
|
|
as_client_event: bool = True,
|
|
event_filter: Optional[Filter] = None,
|
|
use_admin_priviledge: bool = False,
|
|
) -> JsonDict:
|
|
"""Get messages in a room.
|
|
|
|
Args:
|
|
requester: The user requesting messages.
|
|
room_id: The room they want messages from.
|
|
pagin_config: The pagination config rules to apply, if any.
|
|
as_client_event: True to get events in client-server format.
|
|
event_filter: Filter to apply to results or None
|
|
use_admin_priviledge: if `True`, return all events, regardless
|
|
of whether `user` has access to them. To be used **ONLY**
|
|
from the admin API.
|
|
|
|
Returns:
|
|
Pagination API results
|
|
"""
|
|
if use_admin_priviledge:
|
|
await assert_user_is_admin(self.auth, requester)
|
|
|
|
user_id = requester.user.to_string()
|
|
|
|
if pagin_config.from_token:
|
|
from_token = pagin_config.from_token
|
|
elif pagin_config.direction == Direction.FORWARDS:
|
|
from_token = (
|
|
await self.hs.get_event_sources().get_start_token_for_pagination(
|
|
room_id
|
|
)
|
|
)
|
|
else:
|
|
from_token = (
|
|
await self.hs.get_event_sources().get_current_token_for_pagination(
|
|
room_id
|
|
)
|
|
)
|
|
# We expect `/messages` to use historic pagination tokens by default but
|
|
# `/messages` should still works with live tokens when manually provided.
|
|
assert from_token.room_key.topological is not None
|
|
|
|
room_token = from_token.room_key
|
|
|
|
(membership, member_event_id) = (None, None)
|
|
if not use_admin_priviledge:
|
|
(
|
|
membership,
|
|
member_event_id,
|
|
) = await self.auth.check_user_in_room_or_world_readable(
|
|
room_id, requester, allow_departed_users=True
|
|
)
|
|
|
|
if pagin_config.direction == Direction.BACKWARDS:
|
|
# if we're going backwards, we might need to backfill. This
|
|
# requires that we have a topo token.
|
|
if room_token.topological:
|
|
curr_topo = room_token.topological
|
|
else:
|
|
curr_topo = await self.store.get_current_topological_token(
|
|
room_id, room_token.stream
|
|
)
|
|
|
|
# If they have left the room then clamp the token to be before
|
|
# they left the room, to save the effort of loading from the
|
|
# database.
|
|
if (
|
|
pagin_config.direction == Direction.BACKWARDS
|
|
and not use_admin_priviledge
|
|
and membership == Membership.LEAVE
|
|
):
|
|
# This is only None if the room is world_readable, in which case
|
|
# "Membership.JOIN" would have been returned and we should never hit
|
|
# this branch.
|
|
assert member_event_id
|
|
|
|
leave_token = await self.store.get_topological_token_for_event(
|
|
member_event_id
|
|
)
|
|
assert leave_token.topological is not None
|
|
|
|
if leave_token.topological < curr_topo:
|
|
from_token = from_token.copy_and_replace(
|
|
StreamKeyType.ROOM, leave_token
|
|
)
|
|
|
|
to_room_key = None
|
|
if pagin_config.to_token:
|
|
to_room_key = pagin_config.to_token.room_key
|
|
|
|
# Initially fetch the events from the database. With any luck, we can return
|
|
# these without blocking on backfill (handled below).
|
|
events, next_key = await self.store.paginate_room_events(
|
|
room_id=room_id,
|
|
from_key=from_token.room_key,
|
|
to_key=to_room_key,
|
|
direction=pagin_config.direction,
|
|
limit=pagin_config.limit,
|
|
event_filter=event_filter,
|
|
)
|
|
|
|
if pagin_config.direction == Direction.BACKWARDS:
|
|
# We use a `Set` because there can be multiple events at a given depth
|
|
# and we only care about looking at the unique continum of depths to
|
|
# find gaps.
|
|
event_depths: Set[int] = {event.depth for event in events}
|
|
sorted_event_depths = sorted(event_depths)
|
|
|
|
# Inspect the depths of the returned events to see if there are any gaps
|
|
found_big_gap = False
|
|
number_of_gaps = 0
|
|
previous_event_depth = (
|
|
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
|
|
)
|
|
for event_depth in sorted_event_depths:
|
|
# We don't expect a negative depth but we'll just deal with it in
|
|
# any case by taking the absolute value to get the true gap between
|
|
# any two integers.
|
|
depth_gap = abs(event_depth - previous_event_depth)
|
|
# A `depth_gap` of 1 is a normal continuous chain to the next event
|
|
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
|
|
# also possible there is no event at a given depth but we can't ever
|
|
# know that for sure)
|
|
if depth_gap > 1:
|
|
number_of_gaps += 1
|
|
|
|
# We only tolerate a small number single-event long gaps in the
|
|
# returned events because those are most likely just events we've
|
|
# failed to pull in the past. Anything longer than that is probably
|
|
# a sign that we're missing a decent chunk of history and we should
|
|
# try to backfill it.
|
|
#
|
|
# XXX: It's possible we could tolerate longer gaps if we checked
|
|
# that a given events `prev_events` is one that has failed pull
|
|
# attempts and we could just treat it like a dead branch of history
|
|
# for now or at least something that we don't need the block the
|
|
# client on to try pulling.
|
|
#
|
|
# XXX: If we had something like MSC3871 to indicate gaps in the
|
|
# timeline to the client, we could also get away with any sized gap
|
|
# and just have the client refetch the holes as they see fit.
|
|
if depth_gap > 2:
|
|
found_big_gap = True
|
|
break
|
|
previous_event_depth = event_depth
|
|
|
|
# Backfill in the foreground if we found a big gap, have too many holes,
|
|
# or we don't have enough events to fill the limit that the client asked
|
|
# for.
|
|
missing_too_many_events = (
|
|
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
|
|
)
|
|
not_enough_events_to_fill_response = len(events) < pagin_config.limit
|
|
if (
|
|
found_big_gap
|
|
or missing_too_many_events
|
|
or not_enough_events_to_fill_response
|
|
):
|
|
did_backfill = await self.hs.get_federation_handler().maybe_backfill(
|
|
room_id,
|
|
curr_topo,
|
|
limit=pagin_config.limit,
|
|
)
|
|
|
|
# If we did backfill something, refetch the events from the database to
|
|
# catch anything new that might have been added since we last fetched.
|
|
if did_backfill:
|
|
events, next_key = await self.store.paginate_room_events(
|
|
room_id=room_id,
|
|
from_key=from_token.room_key,
|
|
to_key=to_room_key,
|
|
direction=pagin_config.direction,
|
|
limit=pagin_config.limit,
|
|
event_filter=event_filter,
|
|
)
|
|
else:
|
|
# Otherwise, we can backfill in the background for eventual
|
|
# consistency's sake but we don't need to block the client waiting
|
|
# for a costly federation call and processing.
|
|
run_as_background_process(
|
|
"maybe_backfill_in_the_background",
|
|
self.hs.get_federation_handler().maybe_backfill,
|
|
room_id,
|
|
curr_topo,
|
|
limit=pagin_config.limit,
|
|
)
|
|
|
|
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
|
|
|
|
# if no events are returned from pagination, that implies
|
|
# we have reached the end of the available events.
|
|
# In that case we do not return end, to tell the client
|
|
# there is no need for further queries.
|
|
if not events:
|
|
return {
|
|
"chunk": [],
|
|
"start": await from_token.to_string(self.store),
|
|
}
|
|
|
|
if event_filter:
|
|
events = await event_filter.filter(events)
|
|
|
|
if not use_admin_priviledge:
|
|
events = await filter_events_for_client(
|
|
self._storage_controllers,
|
|
user_id,
|
|
events,
|
|
is_peeking=(member_event_id is None),
|
|
)
|
|
|
|
# if after the filter applied there are no more events
|
|
# return immediately - but there might be more in next_token batch
|
|
if not events:
|
|
return {
|
|
"chunk": [],
|
|
"start": await from_token.to_string(self.store),
|
|
"end": await next_token.to_string(self.store),
|
|
}
|
|
|
|
state = None
|
|
if event_filter and event_filter.lazy_load_members and len(events) > 0:
|
|
# TODO: remove redundant members
|
|
|
|
# FIXME: we also care about invite targets etc.
|
|
state_filter = StateFilter.from_types(
|
|
(EventTypes.Member, event.sender) for event in events
|
|
)
|
|
|
|
state_ids = await self._state_storage_controller.get_state_ids_for_event(
|
|
events[0].event_id, state_filter=state_filter
|
|
)
|
|
|
|
if state_ids:
|
|
state_dict = await self.store.get_events(list(state_ids.values()))
|
|
state = state_dict.values()
|
|
|
|
aggregations = await self._relations_handler.get_bundled_aggregations(
|
|
events, user_id
|
|
)
|
|
|
|
time_now = self.clock.time_msec()
|
|
|
|
serialize_options = SerializeEventConfig(
|
|
as_client_event=as_client_event, requester=requester
|
|
)
|
|
|
|
chunk = {
|
|
"chunk": (
|
|
self._event_serializer.serialize_events(
|
|
events,
|
|
time_now,
|
|
config=serialize_options,
|
|
bundle_aggregations=aggregations,
|
|
)
|
|
),
|
|
"start": await from_token.to_string(self.store),
|
|
"end": await next_token.to_string(self.store),
|
|
}
|
|
|
|
if state:
|
|
chunk["state"] = self._event_serializer.serialize_events(
|
|
state, time_now, config=serialize_options
|
|
)
|
|
|
|
return chunk
|
|
|
|
async def _shutdown_and_purge_room(
|
|
self,
|
|
delete_id: str,
|
|
room_id: str,
|
|
requester_user_id: Optional[str],
|
|
new_room_user_id: Optional[str] = None,
|
|
new_room_name: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
block: bool = False,
|
|
purge: bool = True,
|
|
force_purge: bool = False,
|
|
) -> None:
|
|
"""
|
|
Shuts down and purges a room.
|
|
|
|
See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
|
|
|
|
Args:
|
|
delete_id: The ID for this delete.
|
|
room_id: The ID of the room to shut down.
|
|
requester_user_id:
|
|
User who requested the action. Will be recorded as putting the room on the
|
|
blocking list.
|
|
If None, the action was not manually requested but instead
|
|
triggered automatically, e.g. through a Synapse module
|
|
or some other policy.
|
|
MUST NOT be None if block=True.
|
|
new_room_user_id:
|
|
If set, a new room will be created with this user ID
|
|
as the creator and admin, and all users in the old room will be
|
|
moved into that room. If not set, no new room will be created
|
|
and the users will just be removed from the old room.
|
|
new_room_name:
|
|
A string representing the name of the room that new users will
|
|
be invited to. Defaults to `Content Violation Notification`
|
|
message:
|
|
A string containing the first message that will be sent as
|
|
`new_room_user_id` in the new room. Ideally this will clearly
|
|
convey why the original room was shut down.
|
|
Defaults to `Sharing illegal content on this server is not
|
|
permitted and rooms in violation will be blocked.`
|
|
block:
|
|
If set to `true`, this room will be added to a blocking list,
|
|
preventing future attempts to join the room. Defaults to `false`.
|
|
purge:
|
|
If set to `true`, purge the given room from the database.
|
|
force_purge:
|
|
If set to `true`, the room will be purged from database
|
|
also if it fails to remove some users from room.
|
|
|
|
Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
|
|
"""
|
|
|
|
self._purges_in_progress_by_room.add(room_id)
|
|
try:
|
|
async with self._worker_locks.acquire_read_write_lock(
|
|
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
|
|
):
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
|
|
self._delete_by_id[
|
|
delete_id
|
|
].shutdown_room = await self._room_shutdown_handler.shutdown_room(
|
|
room_id=room_id,
|
|
requester_user_id=requester_user_id,
|
|
new_room_user_id=new_room_user_id,
|
|
new_room_name=new_room_name,
|
|
message=message,
|
|
block=block,
|
|
)
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
|
|
|
|
if purge:
|
|
logger.info("starting purge room_id %s", room_id)
|
|
|
|
# first check that we have no users in this room
|
|
if not force_purge:
|
|
joined = await self.store.is_host_joined(
|
|
room_id, self._server_name
|
|
)
|
|
if joined:
|
|
raise SynapseError(
|
|
400, "Users are still joined to this room"
|
|
)
|
|
|
|
await self._storage_controllers.purge_events.purge_room(room_id)
|
|
|
|
logger.info("purge complete for room_id %s", room_id)
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
|
|
except Exception:
|
|
f = Failure()
|
|
logger.error(
|
|
"failed",
|
|
exc_info=(f.type, f.value, f.getTracebackObject()),
|
|
)
|
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
|
|
self._delete_by_id[delete_id].error = f.getErrorMessage()
|
|
finally:
|
|
self._purges_in_progress_by_room.discard(room_id)
|
|
|
|
# remove the delete from the list 24 hours after it completes
|
|
def clear_delete() -> None:
|
|
del self._delete_by_id[delete_id]
|
|
self._delete_by_room[room_id].remove(delete_id)
|
|
if not self._delete_by_room[room_id]:
|
|
del self._delete_by_room[room_id]
|
|
|
|
self.hs.get_reactor().callLater(
|
|
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
|
|
)
|
|
|
|
def start_shutdown_and_purge_room(
|
|
self,
|
|
room_id: str,
|
|
requester_user_id: Optional[str],
|
|
new_room_user_id: Optional[str] = None,
|
|
new_room_name: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
block: bool = False,
|
|
purge: bool = True,
|
|
force_purge: bool = False,
|
|
) -> str:
|
|
"""Start off shut down and purge on a room.
|
|
|
|
Args:
|
|
room_id: The ID of the room to shut down.
|
|
requester_user_id:
|
|
User who requested the action and put the room on the
|
|
blocking list.
|
|
If None, the action was not manually requested but instead
|
|
triggered automatically, e.g. through a Synapse module
|
|
or some other policy.
|
|
MUST NOT be None if block=True.
|
|
new_room_user_id:
|
|
If set, a new room will be created with this user ID
|
|
as the creator and admin, and all users in the old room will be
|
|
moved into that room. If not set, no new room will be created
|
|
and the users will just be removed from the old room.
|
|
new_room_name:
|
|
A string representing the name of the room that new users will
|
|
be invited to. Defaults to `Content Violation Notification`
|
|
message:
|
|
A string containing the first message that will be sent as
|
|
`new_room_user_id` in the new room. Ideally this will clearly
|
|
convey why the original room was shut down.
|
|
Defaults to `Sharing illegal content on this server is not
|
|
permitted and rooms in violation will be blocked.`
|
|
block:
|
|
If set to `true`, this room will be added to a blocking list,
|
|
preventing future attempts to join the room. Defaults to `false`.
|
|
purge:
|
|
If set to `true`, purge the given room from the database.
|
|
force_purge:
|
|
If set to `true`, the room will be purged from database
|
|
also if it fails to remove some users from room.
|
|
|
|
Returns:
|
|
unique ID for this delete transaction.
|
|
"""
|
|
if room_id in self._purges_in_progress_by_room:
|
|
raise SynapseError(
|
|
400, "History purge already in progress for %s" % (room_id,)
|
|
)
|
|
|
|
# This check is double to `RoomShutdownHandler.shutdown_room`
|
|
# But here the requester get a direct response / error with HTTP request
|
|
# and do not have to check the purge status
|
|
if new_room_user_id is not None:
|
|
if not self.hs.is_mine_id(new_room_user_id):
|
|
raise SynapseError(
|
|
400, "User must be our own: %s" % (new_room_user_id,)
|
|
)
|
|
|
|
delete_id = random_string(16)
|
|
|
|
# we log the delete_id here so that it can be tied back to the
|
|
# request id in the log lines.
|
|
logger.info(
|
|
"starting shutdown room_id %s with delete_id %s",
|
|
room_id,
|
|
delete_id,
|
|
)
|
|
|
|
self._delete_by_id[delete_id] = DeleteStatus()
|
|
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
|
run_as_background_process(
|
|
"shutdown_and_purge_room",
|
|
self._shutdown_and_purge_room,
|
|
delete_id,
|
|
room_id,
|
|
requester_user_id,
|
|
new_room_user_id,
|
|
new_room_name,
|
|
message,
|
|
block,
|
|
purge,
|
|
force_purge,
|
|
)
|
|
return delete_id
|