Merge remote-tracking branch 'origin/develop' into hawkowl/structured-logging-perf

pull/6266/head
Amber H. Brown 2019-12-02 17:42:26 +11:00
commit d70be1871c
73 changed files with 2084 additions and 303 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -ex set -e
if [[ "$BUILDKITE_BRANCH" =~ ^(develop|master|dinsic|shhs|release-.*)$ ]]; then if [[ "$BUILDKITE_BRANCH" =~ ^(develop|master|dinsic|shhs|release-.*)$ ]]; then
echo "Not merging forward, as this is a release branch" echo "Not merging forward, as this is a release branch"
@ -18,6 +18,8 @@ else
GITBASE=$BUILDKITE_PULL_REQUEST_BASE_BRANCH GITBASE=$BUILDKITE_PULL_REQUEST_BASE_BRANCH
fi fi
echo "--- merge_base_branch $GITBASE"
# Show what we are before # Show what we are before
git --no-pager show -s git --no-pager show -s

View File

@ -28,3 +28,39 @@ User sees updates to presence from other users in the incremental sync.
Gapped incremental syncs include all state changes Gapped incremental syncs include all state changes
Old members are included in gappy incr LL sync if they start speaking Old members are included in gappy incr LL sync if they start speaking
# new failures as of https://github.com/matrix-org/sytest/pull/732
Device list doesn't change if remote server is down
Remote servers cannot set power levels in rooms without existing powerlevels
Remote servers should reject attempts by non-creators to set the power levels
# new failures as of https://github.com/matrix-org/sytest/pull/753
GET /rooms/:room_id/messages returns a message
GET /rooms/:room_id/messages lazy loads members correctly
Read receipts are sent as events
Only original members of the room can see messages from erased users
Device deletion propagates over federation
If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes
Changing user-signing key notifies local users
Newly updated tags appear in an incremental v2 /sync
Server correctly handles incoming m.device_list_update
Local device key changes get to remote servers with correct prev_id
AS-ghosted users can use rooms via AS
Ghost user must register before joining room
Test that a message is pushed
Invites are pushed
Rooms with aliases are correctly named in pushed
Rooms with names are correctly named in pushed
Rooms with canonical alias are correctly named in pushed
Rooms with many users are correctly pushed
Don't get pushed for rooms you've muted
Rejected events are not pushed
Test that rejected pushers are removed.
Events come down the correct room
# https://buildkite.com/matrix-dot-org/sytest/builds/326#cca62404-a88a-4fcb-ad41-175fd3377603
Presence changes to UNAVAILABLE are reported to remote room members
If remote user leaves room, changes device and rejoins we see update in sync
uploading self-signing key notifies over federation
Inbound federation can receive redacted events
Outbound federation can request missing events

View File

@ -1,3 +1,36 @@
Synapse 1.6.1 (2019-11-28)
==========================
Security updates
----------------
This release includes a security fix ([\#6426](https://github.com/matrix-org/synapse/issues/6426), below). Administrators are encouraged to upgrade as soon as possible.
Bugfixes
--------
- Clean up local threepids from user on account deactivation. ([\#6426](https://github.com/matrix-org/synapse/issues/6426))
- Fix startup error when http proxy is defined. ([\#6421](https://github.com/matrix-org/synapse/issues/6421))
Synapse 1.6.0 (2019-11-26)
==========================
Bugfixes
--------
- Fix phone home stats reporting. ([\#6418](https://github.com/matrix-org/synapse/issues/6418))
Synapse 1.6.0rc2 (2019-11-25)
=============================
Bugfixes
--------
- Fix a bug which could cause the background database update hander for event labels to get stuck in a loop raising exceptions. ([\#6407](https://github.com/matrix-org/synapse/issues/6407))
Synapse 1.6.0rc1 (2019-11-20) Synapse 1.6.0rc1 (2019-11-20)
============================= =============================

1
changelog.d/5815.feature Normal file
View File

@ -0,0 +1 @@
Implement per-room message retention policies.

1
changelog.d/5858.feature Normal file
View File

@ -0,0 +1 @@
Add etag and count fields to key backup endpoints to help clients guess if there are new keys.

1
changelog.d/6119.feature Normal file
View File

@ -0,0 +1 @@
Require User-Interactive Authentication for `/account/3pid/add`, meaning the user's password will be required to add a third-party ID to their account.

1
changelog.d/6176.feature Normal file
View File

@ -0,0 +1 @@
Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314.

1
changelog.d/6332.bugfix Normal file
View File

@ -0,0 +1 @@
Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices.

1
changelog.d/6333.bugfix Normal file
View File

@ -0,0 +1 @@
Prevent account data syncs getting lost across TCP replication.

1
changelog.d/6343.misc Normal file
View File

@ -0,0 +1 @@
Refactor some code in the event authentication path for clarity.

1
changelog.d/6379.misc Normal file
View File

@ -0,0 +1 @@
Complain on startup instead of 500'ing during runtime when `public_baseurl` isn't set when necessary.

1
changelog.d/6408.bugfix Normal file
View File

@ -0,0 +1 @@
Fix an intermittent exception when handling read-receipts.

1
changelog.d/6420.bugfix Normal file
View File

@ -0,0 +1 @@
Fix broken guest registration when there are existing blocks of numeric user IDs.

1
changelog.d/6421.bugfix Normal file
View File

@ -0,0 +1 @@
Fix startup error when http proxy is defined.

1
changelog.d/6423.misc Normal file
View File

@ -0,0 +1 @@
Clarifications for the email configuration settings.

1
changelog.d/6426.bugfix Normal file
View File

@ -0,0 +1 @@
Clean up local threepids from user on account deactivation.

1
changelog.d/6429.misc Normal file
View File

@ -0,0 +1 @@
Add more tests to the blacklist when running in worker mode.

1
changelog.d/6434.feature Normal file
View File

@ -0,0 +1 @@
Add support for MSC 2367, which allows specifying a reason on all membership events.

1
changelog.d/6436.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug where a room could become unusable with a low retention policy and a low activity.

12
debian/changelog vendored
View File

@ -1,3 +1,15 @@
matrix-synapse-py3 (1.6.1) stable; urgency=medium
* New synapse release 1.6.1.
-- Synapse Packaging team <packages@matrix.org> Thu, 28 Nov 2019 11:10:40 +0000
matrix-synapse-py3 (1.6.0) stable; urgency=medium
* New synapse release 1.6.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Nov 2019 12:15:40 +0000
matrix-synapse-py3 (1.5.1) stable; urgency=medium matrix-synapse-py3 (1.5.1) stable; urgency=medium
* New synapse release 1.5.1. * New synapse release 1.5.1.

View File

@ -328,6 +328,69 @@ listeners:
# #
#user_ips_max_age: 14d #user_ips_max_age: 14d
# Message retention policy at the server level.
#
# Room admins and mods can define a retention period for their rooms using the
# 'm.room.retention' state event, and server admins can cap this period by setting
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
#
# If this feature is enabled, Synapse will regularly look for and purge events
# which are older than the room's maximum retention period. Synapse will also
# filter events received over federation so that events that should have been
# purged are ignored and not stored again.
#
retention:
# The message retention policies feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# Default retention policy. If set, Synapse will apply it to rooms that lack the
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
# matter much because Synapse doesn't take it into account yet.
#
#default_policy:
# min_lifetime: 1d
# max_lifetime: 1y
# Retention policy limits. If set, a user won't be able to send a
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
# that's not within this range. This is especially useful in closed federations,
# in which server admins can make sure every federating server applies the same
# rules.
#
#allowed_lifetime_min: 1d
#allowed_lifetime_max: 1y
# Server admins can define the settings of the background jobs purging the
# events which lifetime has expired under the 'purge_jobs' section.
#
# If no configuration is provided, a single job will be set up to delete expired
# events in every room daily.
#
# Each job's configuration defines which range of message lifetimes the job
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
# lower than or equal to 3 days. Both the minimum and the maximum value of a
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
# which 'max_lifetime' is lower than or equal to three days.
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
## TLS ## ## TLS ##
@ -1270,8 +1333,23 @@ password_config:
# smtp_user: "exampleusername" # smtp_user: "exampleusername"
# smtp_pass: "examplepassword" # smtp_pass: "examplepassword"
# require_transport_security: false # require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>" # notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name: Matrix #
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
# #
# # Enable email notifications by default # # Enable email notifications by default
# # # #

View File

@ -36,7 +36,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.6.0rc1" __version__ = "1.6.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View File

@ -94,6 +94,8 @@ class EventTypes(object):
ServerACL = "m.room.server_acl" ServerACL = "m.room.server_acl"
Pinned = "m.room.pinned_events" Pinned = "m.room.pinned_events"
Retention = "m.room.retention"
class RejectedReason(object): class RejectedReason(object):
AUTH_ERROR = "auth_error" AUTH_ERROR = "auth_error"

View File

@ -585,7 +585,7 @@ def run(hs):
def performance_stats_init(): def performance_stats_init():
_stats_process.clear() _stats_process.clear()
_stats_process.append( _stats_process.append(
(int(hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF))) (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
) )
def start_phone_stats_home(): def start_phone_stats_home():

View File

@ -146,6 +146,8 @@ class EmailConfig(Config):
if k not in email_config: if k not in email_config:
missing.append("email." + k) missing.append("email." + k)
# public_baseurl is required to build password reset and validation links that
# will be emailed to users
if config.get("public_baseurl") is None: if config.get("public_baseurl") is None:
missing.append("public_baseurl") missing.append("public_baseurl")
@ -305,8 +307,23 @@ class EmailConfig(Config):
# smtp_user: "exampleusername" # smtp_user: "exampleusername"
# smtp_pass: "examplepassword" # smtp_pass: "examplepassword"
# require_transport_security: false # require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>" # notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name: Matrix #
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
# #
# # Enable email notifications by default # # Enable email notifications by default
# # # #

View File

@ -106,6 +106,13 @@ class RegistrationConfig(Config):
account_threepid_delegates = config.get("account_threepid_delegates") or {} account_threepid_delegates = config.get("account_threepid_delegates") or {}
self.account_threepid_delegate_email = account_threepid_delegates.get("email") self.account_threepid_delegate_email = account_threepid_delegates.get("email")
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn") self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
if self.account_threepid_delegate_msisdn and not self.public_baseurl:
raise ConfigError(
"The configuration option `public_baseurl` is required if "
"`account_threepid_delegate.msisdn` is set, such that "
"clients know where to submit validation tokens to. Please "
"configure `public_baseurl`."
)
self.default_identity_server = config.get("default_identity_server") self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False) self.allow_guest_access = config.get("allow_guest_access", False)

View File

@ -19,7 +19,7 @@ import logging
import os.path import os.path
import re import re
from textwrap import indent from textwrap import indent
from typing import List from typing import Dict, List, Optional
import attr import attr
import yaml import yaml
@ -246,6 +246,124 @@ class ServerConfig(Config):
# events with profile information that differ from the target's global profile. # events with profile information that differ from the target's global profile.
self.allow_per_room_profiles = config.get("allow_per_room_profiles", True) self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
retention_config = config.get("retention")
if retention_config is None:
retention_config = {}
self.retention_enabled = retention_config.get("enabled", False)
retention_default_policy = retention_config.get("default_policy")
if retention_default_policy is not None:
self.retention_default_min_lifetime = retention_default_policy.get(
"min_lifetime"
)
if self.retention_default_min_lifetime is not None:
self.retention_default_min_lifetime = self.parse_duration(
self.retention_default_min_lifetime
)
self.retention_default_max_lifetime = retention_default_policy.get(
"max_lifetime"
)
if self.retention_default_max_lifetime is not None:
self.retention_default_max_lifetime = self.parse_duration(
self.retention_default_max_lifetime
)
if (
self.retention_default_min_lifetime is not None
and self.retention_default_max_lifetime is not None
and (
self.retention_default_min_lifetime
> self.retention_default_max_lifetime
)
):
raise ConfigError(
"The default retention policy's 'min_lifetime' can not be greater"
" than its 'max_lifetime'"
)
else:
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None
self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
if self.retention_allowed_lifetime_min is not None:
self.retention_allowed_lifetime_min = self.parse_duration(
self.retention_allowed_lifetime_min
)
self.retention_allowed_lifetime_max = retention_config.get(
"allowed_lifetime_max"
)
if self.retention_allowed_lifetime_max is not None:
self.retention_allowed_lifetime_max = self.parse_duration(
self.retention_allowed_lifetime_max
)
if (
self.retention_allowed_lifetime_min is not None
and self.retention_allowed_lifetime_max is not None
and self.retention_allowed_lifetime_min
> self.retention_allowed_lifetime_max
):
raise ConfigError(
"Invalid retention policy limits: 'allowed_lifetime_min' can not be"
" greater than 'allowed_lifetime_max'"
)
self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]]
for purge_job_config in retention_config.get("purge_jobs", []):
interval_config = purge_job_config.get("interval")
if interval_config is None:
raise ConfigError(
"A retention policy's purge jobs configuration must have the"
" 'interval' key set."
)
interval = self.parse_duration(interval_config)
shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime")
if shortest_max_lifetime is not None:
shortest_max_lifetime = self.parse_duration(shortest_max_lifetime)
longest_max_lifetime = purge_job_config.get("longest_max_lifetime")
if longest_max_lifetime is not None:
longest_max_lifetime = self.parse_duration(longest_max_lifetime)
if (
shortest_max_lifetime is not None
and longest_max_lifetime is not None
and shortest_max_lifetime > longest_max_lifetime
):
raise ConfigError(
"A retention policy's purge jobs configuration's"
" 'shortest_max_lifetime' value can not be greater than its"
" 'longest_max_lifetime' value."
)
self.retention_purge_jobs.append(
{
"interval": interval,
"shortest_max_lifetime": shortest_max_lifetime,
"longest_max_lifetime": longest_max_lifetime,
}
)
if not self.retention_purge_jobs:
self.retention_purge_jobs = [
{
"interval": self.parse_duration("1d"),
"shortest_max_lifetime": None,
"longest_max_lifetime": None,
}
]
self.listeners = [] # type: List[dict] self.listeners = [] # type: List[dict]
for listener in config.get("listeners", []): for listener in config.get("listeners", []):
if not isinstance(listener.get("port", None), int): if not isinstance(listener.get("port", None), int):
@ -761,6 +879,69 @@ class ServerConfig(Config):
# Defaults to `28d`. Set to `null` to disable clearing out of old rows. # Defaults to `28d`. Set to `null` to disable clearing out of old rows.
# #
#user_ips_max_age: 14d #user_ips_max_age: 14d
# Message retention policy at the server level.
#
# Room admins and mods can define a retention period for their rooms using the
# 'm.room.retention' state event, and server admins can cap this period by setting
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
#
# If this feature is enabled, Synapse will regularly look for and purge events
# which are older than the room's maximum retention period. Synapse will also
# filter events received over federation so that events that should have been
# purged are ignored and not stored again.
#
retention:
# The message retention policies feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# Default retention policy. If set, Synapse will apply it to rooms that lack the
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
# matter much because Synapse doesn't take it into account yet.
#
#default_policy:
# min_lifetime: 1d
# max_lifetime: 1y
# Retention policy limits. If set, a user won't be able to send a
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
# that's not within this range. This is especially useful in closed federations,
# in which server admins can make sure every federating server applies the same
# rules.
#
#allowed_lifetime_min: 1d
#allowed_lifetime_max: 1y
# Server admins can define the settings of the background jobs purging the
# events which lifetime has expired under the 'purge_jobs' section.
#
# If no configuration is provided, a single job will be set up to delete expired
# events in every room daily.
#
# Each job's configuration defines which range of message lifetimes the job
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
# lower than or equal to 3 days. Both the minimum and the maximum value of a
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
# which 'max_lifetime' is lower than or equal to three days.
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
""" """
% locals() % locals()
) )

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from six import string_types from six import integer_types, string_types
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
@ -22,11 +22,12 @@ from synapse.types import EventID, RoomID, UserID
class EventValidator(object): class EventValidator(object):
def validate_new(self, event): def validate_new(self, event, config):
"""Validates the event has roughly the right format """Validates the event has roughly the right format
Args: Args:
event (FrozenEvent) event (FrozenEvent): The event to validate.
config (Config): The homeserver's configuration.
""" """
self.validate_builder(event) self.validate_builder(event)
@ -67,6 +68,99 @@ class EventValidator(object):
Codes.INVALID_PARAM, Codes.INVALID_PARAM,
) )
if event.type == EventTypes.Retention:
self._validate_retention(event, config)
def _validate_retention(self, event, config):
"""Checks that an event that defines the retention policy for a room respects the
boundaries imposed by the server's administrator.
Args:
event (FrozenEvent): The event to validate.
config (Config): The homeserver's configuration.
"""
min_lifetime = event.content.get("min_lifetime")
max_lifetime = event.content.get("max_lifetime")
if min_lifetime is not None:
if not isinstance(min_lifetime, integer_types):
raise SynapseError(
code=400,
msg="'min_lifetime' must be an integer",
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_min is not None
and min_lifetime < config.retention_allowed_lifetime_min
):
raise SynapseError(
code=400,
msg=(
"'min_lifetime' can't be lower than the minimum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_max is not None
and min_lifetime > config.retention_allowed_lifetime_max
):
raise SynapseError(
code=400,
msg=(
"'min_lifetime' can't be greater than the maximum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if max_lifetime is not None:
if not isinstance(max_lifetime, integer_types):
raise SynapseError(
code=400,
msg="'max_lifetime' must be an integer",
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_min is not None
and max_lifetime < config.retention_allowed_lifetime_min
):
raise SynapseError(
code=400,
msg=(
"'max_lifetime' can't be lower than the minimum allowed value"
" enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_max is not None
and max_lifetime > config.retention_allowed_lifetime_max
):
raise SynapseError(
code=400,
msg=(
"'max_lifetime' can't be greater than the maximum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
min_lifetime is not None
and max_lifetime is not None
and min_lifetime > max_lifetime
):
raise SynapseError(
code=400,
msg="'min_lifetime' can't be greater than 'max_lifetime",
errcode=Codes.BAD_JSON,
)
def validate_builder(self, event): def validate_builder(self, event):
"""Validates that the builder/event has roughly the right format. Only """Validates that the builder/event has roughly the right format. Only
checks values that we expect a proto event to have, rather than all the checks values that we expect a proto event to have, rather than all the

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd # Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd # Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -73,6 +74,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
self._server_linearizer = Linearizer("fed_server") self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler") self._transaction_linearizer = Linearizer("fed_txn_handler")
@ -264,9 +266,6 @@ class FederationServer(FederationBase):
await self.registry.on_edu(edu_type, origin, content) await self.registry.on_edu(edu_type, origin, content)
async def on_context_state_request(self, origin, room_id, event_id): async def on_context_state_request(self, origin, room_id, event_id):
if not event_id:
raise NotImplementedError("Specify an event")
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
@ -280,12 +279,17 @@ class FederationServer(FederationBase):
# - but that's non-trivial to get right, and anyway somewhat defeats # - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer. # the point of the linearizer.
with (await self._server_linearizer.queue((origin, room_id))): with (await self._server_linearizer.queue((origin, room_id))):
resp = await self._state_resp_cache.wrap( resp = dict(
await self._state_resp_cache.wrap(
(room_id, event_id), (room_id, event_id),
self._on_context_state_request_compute, self._on_context_state_request_compute,
room_id, room_id,
event_id, event_id,
) )
)
room_version = await self.store.get_room_version(room_id)
resp["room_version"] = room_version
return 200, resp return 200, resp
@ -306,7 +310,11 @@ class FederationServer(FederationBase):
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
async def _on_context_state_request_compute(self, room_id, event_id): async def _on_context_state_request_compute(self, room_id, event_id):
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id) pdus = await self.handler.get_state_for_pdu(room_id, event_id)
else:
pdus = (await self.state.get_current_state(room_id)).values()
auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus]) auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
return { return {

View File

@ -421,7 +421,7 @@ class FederationEventServlet(BaseFederationServlet):
return await self.handler.on_pdu_request(origin, event_id) return await self.handler.on_pdu_request(origin, event_id)
class FederationStateServlet(BaseFederationServlet): class FederationStateV1Servlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?" PATH = "/state/(?P<context>[^/]*)/?"
# This is when someone asks for all data for a given context. # This is when someone asks for all data for a given context.
@ -429,7 +429,7 @@ class FederationStateServlet(BaseFederationServlet):
return await self.handler.on_context_state_request( return await self.handler.on_context_state_request(
origin, origin,
context, context,
parse_string_from_args(query, "event_id", None, required=True), parse_string_from_args(query, "event_id", None, required=False),
) )
@ -1360,7 +1360,7 @@ class RoomComplexityServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = ( FEDERATION_SERVLET_CLASSES = (
FederationSendServlet, FederationSendServlet,
FederationEventServlet, FederationEventServlet,
FederationStateServlet, FederationStateV1Servlet,
FederationStateIdsServlet, FederationStateIdsServlet,
FederationBackfillServlet, FederationBackfillServlet,
FederationQueryServlet, FederationQueryServlet,

View File

@ -95,6 +95,9 @@ class DeactivateAccountHandler(BaseHandler):
user_id, threepid["medium"], threepid["address"] user_id, threepid["medium"], threepid["address"]
) )
# Remove all 3PIDs this user has bound to the homeserver
yield self.store.user_delete_threepids(user_id)
# delete any devices belonging to the user, which will also # delete any devices belonging to the user, which will also
# delete corresponding access tokens. # delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id) yield self._device_handler.delete_all_devices_for_user(user_id)

View File

@ -30,6 +30,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import ( from synapse.types import (
UserID, UserID,
get_domain_from_id, get_domain_from_id,
@ -53,6 +54,12 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self) self._edu_updater = SigningKeyEduUpdater(hs, self)
self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
federation_registry = hs.get_federation_registry() federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@ -191,9 +198,15 @@ class E2eKeysHandler(object):
# probably be tracking their device lists. However, we haven't # probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now. # done an initial sync on the device list so we do it now.
try: try:
if self._is_master:
user_devices = yield self.device_handler.device_list_updater.user_device_resync( user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id user_id
) )
else:
user_devices = yield self._user_device_resync_client(
user_id=user_id
)
user_devices = user_devices["devices"] user_devices = user_devices["devices"]
for device in user_devices: for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]} results[user_id] = {device["device_id"]: device["keys"]}

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd # Copyright 2017, 2018 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
rooms rooms
session_id(string): session ID to delete keys for, for None to delete keys session_id(string): session ID to delete keys for, for None to delete keys
for all sessions for all sessions
Raises:
NotFoundError: if the backup version does not exist
Returns: Returns:
A deferred of the deletion transaction A dict containing the count and etag for the backup version
""" """
# lock for consistency with uploading # lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)): with (yield self._upload_linearizer.queue(user_id)):
# make sure the backup version exists
try:
version_info = yield self.store.get_e2e_room_keys_version_info(
user_id, version
)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown backup version")
else:
raise
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
version_etag = version_info["etag"] + 1
yield self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
)
count = yield self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@trace @trace
@defer.inlineCallbacks @defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys): def upload_room_keys(self, user_id, version, room_keys):
@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
} }
} }
Returns:
A dict containing the count and etag for the backup version
Raises: Raises:
NotFoundError: if there are no versions defined NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version RoomKeysVersionError: if the uploaded version is not the current version
@ -171,26 +196,17 @@ class E2eRoomKeysHandler(object):
else: else:
raise raise
# go through the room_keys. # Fetch any existing room keys for the sessions that have been
# XXX: this should/could be done concurrently, given we're in a lock. # submitted. Then compare them with the submitted keys. If the
for room_id, room in iteritems(room_keys["rooms"]): # key is new, insert it; if the key should be updated, then update
for session_id, session in iteritems(room["sessions"]): # it; otherwise, drop it.
yield self._upload_room_key( existing_keys = yield self.store.get_e2e_room_keys_multi(
user_id, version, room_id, session_id, session user_id, version, room_keys["rooms"]
) )
to_insert = [] # batch the inserts together
@defer.inlineCallbacks changed = False # if anything has changed, we need to update the etag
def _upload_room_key(self, user_id, version, room_id, session_id, room_key): for room_id, room in iteritems(room_keys["rooms"]):
"""Upload a given room_key for a given room and session into a given for session_id, room_key in iteritems(room["sessions"]):
version of the backup. Merges the key with any which might already exist.
Args:
user_id(str): the user whose backup we're setting
version(str): the version ID of the backup we're updating
room_id(str): the ID of the room whose keys we're setting
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
log_kv( log_kv(
{ {
"message": "Trying to upload room key", "message": "Trying to upload room key",
@ -199,14 +215,20 @@ class E2eRoomKeysHandler(object):
"user_id": user_id, "user_id": user_id,
} }
) )
# get the room_key for this particular row current_room_key = existing_keys.get(room_id, {}).get(session_id)
current_room_key = None if current_room_key:
try: if self._should_replace_room_key(current_room_key, room_key):
current_room_key = yield self.store.get_e2e_room_key( log_kv({"message": "Replacing room key."})
user_id, version, room_id, session_id # updates are done one at a time in the DB, so send
# updates right away rather than batching them up,
# like we do with the inserts
yield self.store.update_e2e_room_key(
user_id, version, room_id, session_id, room_key
) )
except StoreError as e: changed = True
if e.code == 404: else:
log_kv({"message": "Not replacing room_key."})
else:
log_kv( log_kv(
{ {
"message": "Room key not found.", "message": "Room key not found.",
@ -214,16 +236,22 @@ class E2eRoomKeysHandler(object):
"user_id": user_id, "user_id": user_id,
} }
) )
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."}) log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key( to_insert.append((room_id, session_id, room_key))
user_id, version, room_id, session_id, room_key changed = True
if len(to_insert):
yield self.store.add_e2e_room_keys(user_id, version, to_insert)
version_etag = version_info["etag"]
if changed:
version_etag = version_etag + 1
yield self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
) )
else:
log_kv({"message": "Not replacing room_key."}) count = yield self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@staticmethod @staticmethod
def _should_replace_room_key(current_room_key, room_key): def _should_replace_room_key(current_room_key, room_key):
@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
raise NotFoundError("Unknown backup version") raise NotFoundError("Unknown backup version")
else: else:
raise raise
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
return res return res
@trace @trace

View File

@ -1428,9 +1428,9 @@ class FederationHandler(BaseHandler):
return event return event
@defer.inlineCallbacks @defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id): def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
origin, event, event_format_version = yield self._make_and_verify_event( origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts, room_id, user_id, "leave" target_hosts, room_id, user_id, "leave", content=content,
) )
# Mark as outlier as we don't have any state for this event; we're not # Mark as outlier as we don't have any state for this event; we're not
# even in the room. # even in the room.
@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]): auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event Map from (event_type, state_key) to event
What we expect the event's auth_events to be, based on the event's Normally, our calculated auth_events based on the state of the room
position in the dag. I think? maybe?? at the event's position in the DAG, though occasionally (eg if the
event is an outlier), may be the auth events claimed by the remote
server.
Also NB that this function adds entries to it. Also NB that this function adds entries to it.
Returns: Returns:
@ -2091,30 +2093,35 @@ class FederationHandler(BaseHandler):
origin (str): origin (str):
event (synapse.events.EventBase): event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext): context (synapse.events.snapshot.EventContext):
auth_events (dict[(str, str)->synapse.events.EventBase]): auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
Normally, our calculated auth_events based on the state of the room
at the event's position in the DAG, though occasionally (eg if the
event is an outlier), may be the auth events claimed by the remote
server.
Also NB that this function adds entries to it.
Returns: Returns:
defer.Deferred[EventContext]: updated context defer.Deferred[EventContext]: updated context
""" """
event_auth_events = set(event.auth_event_ids()) event_auth_events = set(event.auth_event_ids())
if event.is_state(): # missing_auth is the set of the event's auth_events which we don't yet have
event_key = (event.type, event.state_key) # in auth_events.
else:
event_key = None
# if the event's auth_events refers to events which are not in our
# calculated auth_events, we need to fetch those events from somewhere.
#
# we start by fetching them from the store, and then try calling /event_auth/.
missing_auth = event_auth_events.difference( missing_auth = event_auth_events.difference(
e.event_id for e in auth_events.values() e.event_id for e in auth_events.values()
) )
# if we have missing events, we need to fetch those events from somewhere.
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth: if missing_auth:
# TODO: can we use store.have_seen_events here instead? # TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(missing_auth) have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
logger.debug("Got events %s from store", have_events) logger.debug("Found events %s in the store", have_events)
missing_auth.difference_update(have_events.keys()) missing_auth.difference_update(have_events.keys())
else: else:
have_events = {} have_events = {}
@ -2169,15 +2176,17 @@ class FederationHandler(BaseHandler):
event.auth_event_ids() event.auth_event_ids()
) )
except Exception: except Exception:
# FIXME:
logger.exception("Failed to get auth chain") logger.exception("Failed to get auth chain")
if event.internal_metadata.is_outlier(): if event.internal_metadata.is_outlier():
# XXX: given that, for an outlier, we'll be working with the
# event's *claimed* auth events rather than those we calculated:
# (a) is there any point in this test, since different_auth below will
# obviously be empty
# (b) alternatively, why don't we do it earlier?
logger.info("Skipping auth_event fetch for outlier") logger.info("Skipping auth_event fetch for outlier")
return context return context
# FIXME: Assumes we have and stored all the state for all the
# prev_events
different_auth = event_auth_events.difference( different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values() e.event_id for e in auth_events.values()
) )
@ -2191,27 +2200,22 @@ class FederationHandler(BaseHandler):
different_auth, different_auth,
) )
# now we state-resolve between our own idea of the auth events, and the remote's
# idea of them.
room_version = yield self.store.get_room_version(event.room_id) room_version = yield self.store.get_room_version(event.room_id)
different_event_ids = [
d for d in different_auth if d in have_events and not have_events[d]
]
different_events = yield make_deferred_yieldable( if different_event_ids:
defer.gatherResults( # XXX: currently this checks for redactions but I'm not convinced that is
[ # necessary?
run_in_background( different_events = yield self.store.get_events_as_list(different_event_ids)
self.store.get_event, d, allow_none=True, allow_rejected=False
)
for d in different_auth
if d in have_events and not have_events[d]
],
consumeErrors=True,
)
).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events) local_view = dict(auth_events)
remote_view = dict(auth_events) remote_view = dict(auth_events)
remote_view.update( remote_view.update({(d.type, d.state_key): d for d in different_events})
{(d.type, d.state_key): d for d in different_events if d}
)
new_state = yield self.state_handler.resolve_events( new_state = yield self.state_handler.resolve_events(
room_version, room_version,
@ -2231,13 +2235,13 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state) auth_events.update(new_state)
context = yield self._update_context_for_auth_events( context = yield self._update_context_for_auth_events(
event, context, auth_events, event_key event, context, auth_events
) )
return context return context
@defer.inlineCallbacks @defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events, event_key): def _update_context_for_auth_events(self, event, context, auth_events):
"""Update the state_ids in an event context after auth event resolution, """Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group. storing the changes as a new state group.
@ -2246,18 +2250,21 @@ class FederationHandler(BaseHandler):
context (synapse.events.snapshot.EventContext): initial event context context (synapse.events.snapshot.EventContext): initial event context
auth_events (dict[(str, str)->str]): Events to update in the event auth_events (dict[(str, str)->EventBase]): Events to update in the event
context. context.
event_key ((str, str)): (type, state_key) for the current event.
this will not be included in the current_state in the context.
Returns: Returns:
Deferred[EventContext]: new event context Deferred[EventContext]: new event context
""" """
# exclude the state key of the new event from the current_state in the context.
if event.is_state():
event_key = (event.type, event.state_key)
else:
event_key = None
state_updates = { state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key k: a.event_id for k, a in iteritems(auth_events) if k != event_key
} }
current_state_ids = yield context.get_current_state_ids(self.store) current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids) current_state_ids = dict(current_state_ids)
@ -2459,7 +2466,7 @@ class FederationHandler(BaseHandler):
room_version, event_dict, event, context room_version, event_dict, event, context
) )
EventValidator().validate_new(event) EventValidator().validate_new(event, self.config)
# We need to tell the transaction queue to send this out, even # We need to tell the transaction queue to send this out, even
# though the sender isn't a local user. # though the sender isn't a local user.
@ -2574,7 +2581,7 @@ class FederationHandler(BaseHandler):
event, context = yield self.event_creation_handler.create_new_client_event( event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder builder=builder
) )
EventValidator().validate_new(event) EventValidator().validate_new(event, self.config)
return (event, context) return (event, context)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -138,7 +138,7 @@ class MessageHandler(object):
raise NotFoundError("Can't find event for token %s" % (at_token,)) raise NotFoundError("Can't find event for token %s" % (at_token,))
visible_events = yield filter_events_for_client( visible_events = yield filter_events_for_client(
self.storage, user_id, last_events self.storage, user_id, last_events, apply_retention_policies=False
) )
event = last_events[0] event = last_events[0]
@ -417,7 +417,7 @@ class EventCreationHandler(object):
403, "You must be in the room to create an alias for it" 403, "You must be in the room to create an alias for it"
) )
self.validator.validate_new(event) self.validator.validate_new(event, self.config)
return (event, context) return (event, context)
@ -634,7 +634,7 @@ class EventCreationHandler(object):
if requester: if requester:
context.app_service = requester.app_service context.app_service = requester.app_service
self.validator.validate_new(event) self.validator.validate_new(event, self.config)
# If this event is an annotation then we check that that the sender # If this event is an annotation then we check that that the sender
# can't annotate the same way twice (e.g. stops users from liking an # can't annotate the same way twice (e.g. stops users from liking an

View File

@ -15,12 +15,15 @@
# limitations under the License. # limitations under the License.
import logging import logging
from six import iteritems
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock from synapse.util.async_helpers import ReadWriteLock
@ -80,6 +83,109 @@ class PaginationHandler(object):
self._purges_by_id = {} self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer() self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
self.clock.looping_call(
run_as_background_process,
job["interval"],
"purge_history_for_rooms_in_range",
self.purge_history_for_rooms_in_range,
job["shortest_max_lifetime"],
job["longest_max_lifetime"],
)
@defer.inlineCallbacks
def purge_history_for_rooms_in_range(self, min_ms, max_ms):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
'max_lifetime' is within this range, also targets rooms which don't have a
retention policy.
Args:
min_ms (int|None): Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, it means that the range has no
lower limit.
max_ms (int|None): Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, it means that the range has no
upper limit.
"""
# We want the storage layer to to include rooms with no retention policy in its
# return value only if a default retention policy is defined in the server's
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
# max_ms or higher than min_ms (or both).
if self._retention_default_max_lifetime is not None:
include_null = True
if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
# The default max_lifetime is lower than (or equal to) min_ms.
include_null = False
if max_ms is not None and max_ms < self._retention_default_max_lifetime:
# The default max_lifetime is higher than max_ms.
include_null = False
else:
include_null = False
rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
for room_id, retention_policy in iteritems(rooms):
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
max_lifetime = retention_policy["max_lifetime"]
if max_lifetime is None:
# If max_lifetime is None, it means that include_null equals True,
# therefore we can safely assume that there is a default policy defined
# in the server's configuration.
max_lifetime = self._retention_default_max_lifetime
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
r = yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
ts,
stream_ordering,
)
continue
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
purge_id = random_string(16)
self._purges_by_id[purge_id] = PurgeStatus()
logger.info(
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
"_purge_history", self._purge_history, purge_id, room_id, token, True,
)
def start_purge_history(self, room_id, token, delete_local_events=False): def start_purge_history(self, room_id, token, delete_local_events=False):
"""Start off a history purge on a room. """Start off a history purge on a room.

View File

@ -94,7 +94,9 @@ class RoomMemberHandler(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Attempt to reject an invite for a room this server is not in. If we """Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected. fail to do so we locally mark the invite as rejected.
@ -104,6 +106,7 @@ class RoomMemberHandler(object):
reject invite reject invite
room_id (str) room_id (str)
target (UserID): The user rejecting the invite target (UserID): The user rejecting the invite
content (dict): The content for the rejection event
Returns: Returns:
Deferred[dict]: A dictionary to be returned to the client, may Deferred[dict]: A dictionary to be returned to the client, may
@ -471,7 +474,7 @@ class RoomMemberHandler(object):
# send the rejection to the inviter's HS. # send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain] remote_room_hosts = remote_room_hosts + [inviter.domain]
res = yield self._remote_reject_invite( res = yield self._remote_reject_invite(
requester, remote_room_hosts, room_id, target requester, remote_room_hosts, room_id, target, content,
) )
return res return res
@ -971,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite
""" """
fed_handler = self.federation_handler fed_handler = self.federation_handler
try: try:
ret = yield fed_handler.do_remotely_reject_invite( ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string() remote_room_hosts, room_id, target.to_string(), content=content,
) )
return ret return ret
except Exception as e: except Exception as e:

View File

@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret return ret
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite
""" """
return self._remote_reject_client( return self._remote_reject_client(
@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
user_id=target.to_string(), user_id=target.to_string(),
content=content,
) )
def _user_joined_room(self, target, room_id): def _user_joined_room(self, target, room_id):

View File

@ -14,7 +14,14 @@
# limitations under the License. # limitations under the License.
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.replication.http import federation, login, membership, register, send_event from synapse.replication.http import (
devices,
federation,
login,
membership,
register,
send_event,
)
REPLICATION_PREFIX = "/_synapse/replication" REPLICATION_PREFIX = "/_synapse/replication"
@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
federation.register_servlets(hs, self) federation.register_servlets(hs, self)
login.register_servlets(hs, self) login.register_servlets(hs, self)
register.register_servlets(hs, self) register.register_servlets(hs, self)
devices.register_servlets(hs, self)

View File

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.replication.http._base import ReplicationEndpoint
logger = logging.getLogger(__name__)
class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.
This must happen on master so that the results can be correctly cached in
the database and streamed to workers.
Request format:
POST /_synapse/replication/user_device_resync/:user_id
{}
Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:
{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""
NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False
def __init__(self, hs):
super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
self.device_list_updater = hs.get_device_handler().device_list_updater
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(user_id):
return {}
async def _handle_request(self, request, user_id):
user_devices = await self.device_list_updater.user_device_resync(user_id)
return 200, user_devices
def register_servlets(hs, http_server):
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)

View File

@ -93,6 +93,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
{ {
"requester": ..., "requester": ...,
"remote_room_hosts": [...], "remote_room_hosts": [...],
"content": { ... }
} }
""" """
@ -107,7 +108,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod @staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts): def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
""" """
Args: Args:
requester(Requester) requester(Requester)
@ -118,12 +119,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
return { return {
"requester": requester.serialize(), "requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts, "remote_room_hosts": remote_room_hosts,
"content": content,
} }
async def _handle_request(self, request, room_id, user_id): async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
@ -134,7 +137,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
try: try:
event = await self.federation_handler.do_remotely_reject_invite( event = await self.federation_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, user_id remote_room_hosts, room_id, user_id, event_content,
) )
ret = event.get_pdu_json() ret = event.get_pdu_json()
except Exception as e: except Exception as e:

View File

@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
) )
AccountDataStreamRow = namedtuple( AccountDataStreamRow = namedtuple(
"AccountDataStream", "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
("user_id", "room_id", "data_type", "data"), # str # str # str # dict
) )
GroupsStreamRow = namedtuple( GroupsStreamRow = namedtuple(
"GroupsStreamRow", "GroupsStreamRow",
@ -421,8 +420,8 @@ class AccountDataStream(Stream):
results = list(room_results) results = list(room_results)
results.extend( results.extend(
(stream_id, user_id, None, account_data_type, content) (stream_id, user_id, None, account_data_type)
for stream_id, user_id, account_data_type, content in global_results for stream_id, user_id, account_data_type in global_results
) )
return results return results

View File

@ -714,7 +714,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
target = UserID.from_string(content["user_id"]) target = UserID.from_string(content["user_id"])
event_content = None event_content = None
if "reason" in content and membership_action in ["kick", "ban"]: if "reason" in content:
event_content = {"reason": content["reason"]} event_content = {"reason": content["reason"]}
await self.room_member_handler.update_membership( await self.room_member_handler.update_membership(

View File

@ -642,6 +642,7 @@ class ThreepidAddRestServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler() self.auth_handler = hs.get_auth_handler()
@interactive_auth_handler
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def on_POST(self, request):
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(request)
@ -652,6 +653,10 @@ class ThreepidAddRestServlet(RestServlet):
client_secret = body["client_secret"] client_secret = body["client_secret"]
sid = body["sid"] sid = body["sid"]
yield self.auth_handler.validate_user_via_ui_auth(
requester, body, self.hs.get_ip_from_request(request)
)
validation_session = yield self.identity_handler.validate_threepid_session( validation_session = yield self.identity_handler.validate_threepid_session(
client_secret, sid client_secret, sid
) )

View File

@ -134,8 +134,8 @@ class RoomKeysServlet(RestServlet):
if room_id: if room_id:
body = {"rooms": {room_id: body}} body = {"rooms": {room_id: body}}
yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body) ret = yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
return 200, {} return 200, ret
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id, session_id): def on_GET(self, request, room_id, session_id):
@ -239,10 +239,10 @@ class RoomKeysServlet(RestServlet):
user_id = requester.user.to_string() user_id = requester.user.to_string()
version = parse_string(request, "version") version = parse_string(request, "version")
yield self.e2e_room_keys_handler.delete_room_keys( ret = yield self.e2e_room_keys_handler.delete_room_keys(
user_id, version, room_id, session_id user_id, version, room_id, session_id
) )
return 200, {} return 200, ret
class RoomKeysNewVersionServlet(RestServlet): class RoomKeysNewVersionServlet(RestServlet):

View File

@ -77,8 +77,8 @@ class PreviewUrlResource(DirectServeResource):
treq_args={"browser_like_redirects": True}, treq_args={"browser_like_redirects": True},
ip_whitelist=hs.config.url_preview_ip_range_whitelist, ip_whitelist=hs.config.url_preview_ip_range_whitelist,
ip_blacklist=hs.config.url_preview_ip_range_blacklist, ip_blacklist=hs.config.url_preview_ip_range_blacklist,
http_proxy=os.getenv("http_proxy"), http_proxy=os.getenvb(b"http_proxy"),
https_proxy=os.getenv("HTTPS_PROXY"), https_proxy=os.getenvb(b"HTTPS_PROXY"),
) )
self.media_repo = media_repo self.media_repo = media_repo
self.primary_base_path = media_repo.primary_base_path self.primary_base_path = media_repo.primary_base_path

View File

@ -318,8 +318,8 @@ class HomeServer(object):
def build_proxied_http_client(self): def build_proxied_http_client(self):
return SimpleHttpClient( return SimpleHttpClient(
self, self,
http_proxy=os.getenv("http_proxy"), http_proxy=os.getenvb(b"http_proxy"),
https_proxy=os.getenv("HTTPS_PROXY"), https_proxy=os.getenvb(b"HTTPS_PROXY"),
) )
def build_room_creation_handler(self): def build_room_creation_handler(self):

View File

@ -409,16 +409,15 @@ class SQLBaseStore(object):
i = 0 i = 0
N = 5 N = 5
while True: while True:
try: cursor = LoggingTransaction(
txn = conn.cursor() conn.cursor(),
txn = LoggingTransaction(
txn,
name, name,
self.database_engine, self.database_engine,
after_callbacks, after_callbacks,
exception_callbacks, exception_callbacks,
) )
r = func(txn, *args, **kwargs) try:
r = func(cursor, *args, **kwargs)
conn.commit() conn.commit()
return r return r
except self.database_engine.module.OperationalError as e: except self.database_engine.module.OperationalError as e:
@ -456,6 +455,40 @@ class SQLBaseStore(object):
) )
continue continue
raise raise
finally:
# we're either about to retry with a new cursor, or we're about to
# release the connection. Once we release the connection, it could
# get used for another query, which might do a conn.rollback().
#
# In the latter case, even though that probably wouldn't affect the
# results of this transaction, python's sqlite will reset all
# statements on the connection [1], which will make our cursor
# invalid [2].
#
# In any case, continuing to read rows after commit()ing seems
# dubious from the PoV of ACID transactional semantics
# (sqlite explicitly says that once you commit, you may see rows
# from subsequent updates.)
#
# In psycopg2, cursors are essentially a client-side fabrication -
# all the data is transferred to the client side when the statement
# finishes executing - so in theory we could go on streaming results
# from the cursor, but attempting to do so would make us
# incompatible with sqlite, so let's make sure we're not doing that
# by closing the cursor.
#
# (*named* cursors in psycopg2 are different and are proper server-
# side things, but (a) we don't use them and (b) they are implicitly
# closed by ending the transaction anyway.)
#
# In short, if we haven't finished with the cursor yet, that's a
# problem waiting to bite us.
#
# TL;DR: we're done with the cursor, so we can close it.
#
# [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
# [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
cursor.close()
except Exception as e: except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", name, e) logger.debug("[TXN FAIL] {%s} %s", name, e)
raise raise

View File

@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
current_id(int): The position to fetch up to. current_id(int): The position to fetch up to.
Returns: Returns:
A deferred pair of lists of tuples of stream_id int, user_id string, A deferred pair of lists of tuples of stream_id int, user_id string,
room_id string, type string, and content string. room_id string, and type string.
""" """
if last_room_id == current_id and last_global_id == current_id: if last_room_id == current_id and last_global_id == current_id:
return defer.succeed(([], [])) return defer.succeed(([], []))
def get_updated_account_data_txn(txn): def get_updated_account_data_txn(txn):
sql = ( sql = (
"SELECT stream_id, user_id, account_data_type, content" "SELECT stream_id, user_id, account_data_type"
" FROM account_data WHERE ? < stream_id AND stream_id <= ?" " FROM account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?" " ORDER BY stream_id ASC LIMIT ?"
) )
@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
global_results = txn.fetchall() global_results = txn.fetchall()
sql = ( sql = (
"SELECT stream_id, user_id, room_id, account_data_type, content" "SELECT stream_id, user_id, room_id, account_data_type"
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?" " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?" " ORDER BY stream_id ASC LIMIT ?"
) )

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd # Copyright 2017 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
class EndToEndRoomKeyStore(SQLBaseStore): class EndToEndRoomKeyStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_e2e_room_key(self, user_id, version, room_id, session_id): def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
"""Get the encrypted E2E room key for a given session from a given """Replaces the encrypted E2E room key for a given session in a given backup
backup version of room_keys. We only store the 'best' room key for a given
session at a given time, as determined by the handler.
Args:
user_id(str): the user whose backup we're querying
version(str): the version ID of the backup for the set of keys we're querying
room_id(str): the ID of the room whose keys we're querying.
This is a bit redundant as it's implied by the session_id, but
we include for consistency with the rest of the API.
session_id(str): the session whose room_key we're querying.
Returns:
A deferred dict giving the session_data and message metadata for
this room key.
"""
row = yield self._simple_select_one(
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
"version": version,
"room_id": room_id,
"session_id": session_id,
},
retcols=(
"first_message_index",
"forwarded_count",
"is_verified",
"session_data",
),
desc="get_e2e_room_key",
)
row["session_data"] = json.loads(row["session_data"])
return row
@defer.inlineCallbacks
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
"""Replaces or inserts the encrypted E2E room key for a given session in
a given backup
Args: Args:
user_id(str): the user whose backup we're setting user_id(str): the user whose backup we're setting
@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
StoreError StoreError
""" """
yield self._simple_upsert( yield self._simple_update_one(
table="e2e_room_keys", table="e2e_room_keys",
keyvalues={ keyvalues={
"user_id": user_id, "user_id": user_id,
@ -86,13 +46,39 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"room_id": room_id, "room_id": room_id,
"session_id": session_id, "session_id": session_id,
}, },
values={ updatevalues={
"first_message_index": room_key["first_message_index"], "first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"], "forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"], "is_verified": room_key["is_verified"],
"session_data": json.dumps(room_key["session_data"]), "session_data": json.dumps(room_key["session_data"]),
}, },
lock=False, desc="update_e2e_room_key",
)
@defer.inlineCallbacks
def add_e2e_room_keys(self, user_id, version, room_keys):
"""Bulk add room keys to a given backup.
Args:
user_id (str): the user whose backup we're adding to
version (str): the version ID of the backup for the set of keys we're adding to
room_keys (iterable[(str, str, dict)]): the keys to add, in the form
(roomID, sessionID, keyData)
"""
values = []
for (room_id, session_id, room_key) in room_keys:
values.append(
{
"user_id": user_id,
"version": version,
"room_id": room_id,
"session_id": session_id,
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json.dumps(room_key["session_data"]),
}
) )
log_kv( log_kv(
{ {
@ -103,6 +89,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
} }
) )
yield self._simple_insert_many(
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
)
@trace @trace
@defer.inlineCallbacks @defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions return sessions
def get_e2e_room_keys_multi(self, user_id, version, room_keys):
"""Get multiple room keys at a time. The difference between this function and
get_e2e_room_keys is that this function can be used to retrieve
multiple specific keys at a time, whereas get_e2e_room_keys is used for
getting all the keys in a backup version, all the keys for a room, or a
specific key.
Args:
user_id (str): the user whose backup we're querying
version (str): the version ID of the backup we're querying about
room_keys (dict[str, dict[str, iterable[str]]]): a map from
room ID -> {"session": [session ids]} indicating the session IDs
that we want to query
Returns:
Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
"""
return self.runInteraction(
"get_e2e_room_keys_multi",
self._get_e2e_room_keys_multi_txn,
user_id,
version,
room_keys,
)
@staticmethod
def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
if not room_keys:
return {}
where_clauses = []
params = [user_id, version]
for room_id, room in room_keys.items():
sessions = list(room["sessions"])
if not sessions:
continue
params.append(room_id)
params.extend(sessions)
where_clauses.append(
"(room_id = ? AND session_id IN (%s))"
% (",".join(["?" for _ in sessions]),)
)
# check if we're actually querying something
if not where_clauses:
return {}
sql = """
SELECT room_id, session_id, first_message_index, forwarded_count,
is_verified, session_data
FROM e2e_room_keys
WHERE user_id = ? AND version = ? AND (%s)
""" % (
" OR ".join(where_clauses)
)
txn.execute(sql, params)
ret = {}
for row in txn:
room_id = row[0]
session_id = row[1]
ret.setdefault(room_id, {})
ret[room_id][session_id] = {
"first_message_index": row[2],
"forwarded_count": row[3],
"is_verified": row[4],
"session_data": json.loads(row[5]),
}
return ret
def count_e2e_room_keys(self, user_id, version):
"""Get the number of keys in a backup version.
Args:
user_id (str): the user whose backup we're querying
version (str): the version ID of the backup we're querying about
"""
return self._simple_select_one_onecol(
table="e2e_room_keys",
keyvalues={"user_id": user_id, "version": version},
retcol="COUNT(*)",
desc="count_e2e_room_keys",
)
@trace @trace
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
version(str) version(str)
algorithm(str) algorithm(str)
auth_data(object): opaque dict supplied by the client auth_data(object): opaque dict supplied by the client
etag(int): tag of the keys in the backup
""" """
def _get_e2e_room_keys_version_info_txn(txn): def _get_e2e_room_keys_version_info_txn(txn):
@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
txn, txn,
table="e2e_room_keys_versions", table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0}, keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
retcols=("version", "algorithm", "auth_data"), retcols=("version", "algorithm", "auth_data", "etag"),
) )
result["auth_data"] = json.loads(result["auth_data"]) result["auth_data"] = json.loads(result["auth_data"])
result["version"] = str(result["version"]) result["version"] = str(result["version"])
if result["etag"] is None:
result["etag"] = 0
return result return result
return self.runInteraction( return self.runInteraction(
@ -288,19 +370,31 @@ class EndToEndRoomKeyStore(SQLBaseStore):
) )
@trace @trace
def update_e2e_room_keys_version(self, user_id, version, info): def update_e2e_room_keys_version(
self, user_id, version, info=None, version_etag=None
):
"""Update a given backup version """Update a given backup version
Args: Args:
user_id(str): the user whose backup version we're updating user_id(str): the user whose backup version we're updating
version(str): the version ID of the backup version we're updating version(str): the version ID of the backup version we're updating
info(dict): the new backup version info to store info (dict): the new backup version info to store. If None, then
the backup version info is not updated
version_etag (Optional[int]): etag of the keys in the backup. If
None, then the etag is not updated
""" """
updatevalues = {}
if info is not None and "auth_data" in info:
updatevalues["auth_data"] = json.dumps(info["auth_data"])
if version_etag is not None:
updatevalues["etag"] = version_etag
if updatevalues:
return self._simple_update( return self._simple_update(
table="e2e_room_keys_versions", table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": version}, keyvalues={"user_id": user_id, "version": version},
updatevalues={"auth_data": json.dumps(info["auth_data"])}, updatevalues=updatevalues,
desc="update_e2e_room_keys_version", desc="update_e2e_room_keys_version",
) )

View File

@ -927,6 +927,9 @@ class EventsStore(
elif event.type == EventTypes.Redaction: elif event.type == EventTypes.Redaction:
# Insert into the redactions table. # Insert into the redactions table.
self._store_redaction(txn, event) self._store_redaction(txn, event)
elif event.type == EventTypes.Retention:
# Update the room_retention table.
self._store_retention_policy_for_room_txn(txn, event)
self._handle_event_relations(txn, event) self._handle_event_relations(txn, event)

View File

@ -530,6 +530,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
nbrows = 0 nbrows = 0
last_row_event_id = "" last_row_event_id = ""
for (event_id, event_json_raw) in results: for (event_id, event_json_raw) in results:
try:
event_json = json.loads(event_json_raw) event_json = json.loads(event_json_raw)
self._simple_insert_many_txn( self._simple_insert_many_txn(
@ -548,6 +549,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
if isinstance(label, str) if isinstance(label, str)
], ],
) )
except Exception as e:
logger.warning(
"Unable to load event %s (no labels will be imported): %s",
event_id,
e,
)
nbrows += 1 nbrows += 1
last_row_event_id = event_id last_row_event_id = event_id

View File

@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
args.append(limit) args.append(limit)
txn.execute(sql, args) txn.execute(sql, args)
return (r[0:5] + (json.loads(r[5]),) for r in txn) return list(r[0:5] + (json.loads(r[5]),) for r in txn)
return self.runInteraction( return self.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn "get_all_updated_receipts", get_all_updated_receipts_txn

View File

@ -19,7 +19,6 @@ import logging
import re import re
from six import iterkeys from six import iterkeys
from six.moves import range
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
@ -482,12 +481,8 @@ class RegistrationWorkerStore(SQLBaseStore):
""" """
Gets the localpart of the next generated user ID. Gets the localpart of the next generated user ID.
Generated user IDs are integers, and we aim for them to be as small as Generated user IDs are integers, so we find the largest integer user ID
we can. Unfortunately, it's possible some of them are already taken by already taken and return that plus one.
existing users, and there may be gaps in the already taken range. This
function returns the start of the first allocatable gap. This is to
avoid the case of ID 1000 being pre-allocated and starting at 1001 while
0-999 are available.
""" """
def _find_next_generated_user_id(txn): def _find_next_generated_user_id(txn):
@ -497,15 +492,14 @@ class RegistrationWorkerStore(SQLBaseStore):
regex = re.compile(r"^@(\d+):") regex = re.compile(r"^@(\d+):")
found = set() max_found = 0
for (user_id,) in txn: for (user_id,) in txn:
match = regex.search(user_id) match = regex.search(user_id)
if match: if match:
found.add(int(match.group(1))) max_found = max(int(match.group(1)), max_found)
for i in range(len(found) + 1):
if i not in found: return max_found + 1
return i
return ( return (
( (
@ -575,6 +569,19 @@ class RegistrationWorkerStore(SQLBaseStore):
return self._simple_delete( return self._simple_delete(
"user_threepids", "user_threepids",
keyvalues={"user_id": user_id, "medium": medium, "address": address}, keyvalues={"user_id": user_id, "medium": medium, "address": address},
desc="user_delete_threepid",
)
def user_delete_threepids(self, user_id: str):
"""Delete all threepid this user has bound
Args:
user_id: The user id to delete all threepids of
"""
return self._simple_delete(
"user_threepids",
keyvalues={"user_id": user_id},
desc="user_delete_threepids", desc="user_delete_threepids",
) )

View File

@ -19,10 +19,13 @@ import logging
import re import re
from typing import Optional, Tuple from typing import Optional, Tuple
from six import integer_types
from canonicaljson import json from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.search import SearchStore from synapse.storage.data_stores.main.search import SearchStore
@ -300,8 +303,141 @@ class RoomWorkerStore(SQLBaseStore):
else: else:
return None return None
@cachedInlineCallbacks()
def get_retention_policy_for_room(self, room_id):
"""Get the retention policy for a given room.
If no retention policy has been found for this room, returns a policy defined
by the configured default policy (which has None as both the 'min_lifetime' and
the 'max_lifetime' if no default policy has been defined in the server's
configuration).
Args:
room_id (str): The ID of the room to get the retention policy of.
Returns:
dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
"""
def get_retention_policy_for_room_txn(txn):
txn.execute(
"""
SELECT min_lifetime, max_lifetime FROM room_retention
INNER JOIN current_state_events USING (event_id, room_id)
WHERE room_id = ?;
""",
(room_id,),
)
return self.cursor_to_dict(txn)
ret = yield self.runInteraction(
"get_retention_policy_for_room", get_retention_policy_for_room_txn,
)
# If we don't know this room ID, ret will be None, in this case return the default
# policy.
if not ret:
defer.returnValue(
{
"min_lifetime": self.config.retention_default_min_lifetime,
"max_lifetime": self.config.retention_default_max_lifetime,
}
)
row = ret[0]
# If one of the room's policy's attributes isn't defined, use the matching
# attribute from the default policy.
# The default values will be None if no default policy has been defined, or if one
# of the attributes is missing from the default policy.
if row["min_lifetime"] is None:
row["min_lifetime"] = self.config.retention_default_min_lifetime
if row["max_lifetime"] is None:
row["max_lifetime"] = self.config.retention_default_max_lifetime
defer.returnValue(row)
class RoomStore(RoomWorkerStore, SearchStore): class RoomStore(RoomWorkerStore, SearchStore):
def __init__(self, db_conn, hs):
super(RoomStore, self).__init__(db_conn, hs)
self.config = hs.config
self.register_background_update_handler(
"insert_room_retention", self._background_insert_retention,
)
@defer.inlineCallbacks
def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
them into the room_retention table.
NULLs the property's columns if missing from the retention event in the room's
state (or NULLs all of them if there's no retention event in the room's state),
so that we fall back to the server's retention policy.
"""
last_room = progress.get("room_id", "")
def _background_insert_retention_txn(txn):
txn.execute(
"""
SELECT state.room_id, state.event_id, events.json
FROM current_state_events as state
LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
WHERE state.room_id > ? AND state.type = '%s'
ORDER BY state.room_id ASC
LIMIT ?;
"""
% EventTypes.Retention,
(last_room, batch_size),
)
rows = self.cursor_to_dict(txn)
if not rows:
return True
for row in rows:
if not row["json"]:
retention_policy = {}
else:
ev = json.loads(row["json"])
retention_policy = json.dumps(ev["content"])
self._simple_insert_txn(
txn=txn,
table="room_retention",
values={
"room_id": row["room_id"],
"event_id": row["event_id"],
"min_lifetime": retention_policy.get("min_lifetime"),
"max_lifetime": retention_policy.get("max_lifetime"),
},
)
logger.info("Inserted %d rows into room_retention", len(rows))
self._background_update_progress_txn(
txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
)
if batch_size > len(rows):
return True
else:
return False
end = yield self.runInteraction(
"insert_room_retention", _background_insert_retention_txn,
)
if end:
yield self._end_background_update("insert_room_retention")
defer.returnValue(batch_size)
@defer.inlineCallbacks @defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public): def store_room(self, room_id, room_creator_user_id, is_public):
"""Stores a room. """Stores a room.
@ -502,6 +638,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
txn, event, "content.body", event.content["body"] txn, event, "content.body", event.content["body"]
) )
def _store_retention_policy_for_room_txn(self, txn, event):
if hasattr(event, "content") and (
"min_lifetime" in event.content or "max_lifetime" in event.content
):
if (
"min_lifetime" in event.content
and not isinstance(event.content.get("min_lifetime"), integer_types)
) or (
"max_lifetime" in event.content
and not isinstance(event.content.get("max_lifetime"), integer_types)
):
# Ignore the event if one of the value isn't an integer.
return
self._simple_insert_txn(
txn=txn,
table="room_retention",
values={
"room_id": event.room_id,
"event_id": event.event_id,
"min_lifetime": event.content.get("min_lifetime"),
"max_lifetime": event.content.get("max_lifetime"),
},
)
self._invalidate_cache_and_stream(
txn, self.get_retention_policy_for_room, (event.room_id,)
)
def add_event_report( def add_event_report(
self, room_id, event_id, user_id, reason, content, received_ts self, room_id, event_id, user_id, reason, content, received_ts
): ):
@ -683,3 +848,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
remote_media_mxcs.append((hostname, media_id)) remote_media_mxcs.append((hostname, media_id))
return local_media_mxcs, remote_media_mxcs return local_media_mxcs, remote_media_mxcs
@defer.inlineCallbacks
def get_rooms_for_retention_period_in_range(
self, min_ms, max_ms, include_null=False
):
"""Retrieves all of the rooms within the given retention range.
Optionally includes the rooms which don't have a retention policy.
Args:
min_ms (int|None): Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, doesn't set a lower limit.
max_ms (int|None): Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, doesn't set an upper limit.
include_null (bool): Whether to include rooms which retention policy is NULL
in the returned set.
Returns:
dict[str, dict]: The rooms within this range, along with their retention
policy. The key is "room_id", and maps to a dict describing the retention
policy associated with this room ID. The keys for this nested dict are
"min_lifetime" (int|None), and "max_lifetime" (int|None).
"""
def get_rooms_for_retention_period_in_range_txn(txn):
range_conditions = []
args = []
if min_ms is not None:
range_conditions.append("max_lifetime > ?")
args.append(min_ms)
if max_ms is not None:
range_conditions.append("max_lifetime <= ?")
args.append(max_ms)
# Do a first query which will retrieve the rooms that have a retention policy
# in their current state.
sql = """
SELECT room_id, min_lifetime, max_lifetime FROM room_retention
INNER JOIN current_state_events USING (event_id, room_id)
"""
if len(range_conditions):
sql += " WHERE (" + " AND ".join(range_conditions) + ")"
if include_null:
sql += " OR max_lifetime IS NULL"
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
rooms_dict = {}
for row in rows:
rooms_dict[row["room_id"]] = {
"min_lifetime": row["min_lifetime"],
"max_lifetime": row["max_lifetime"],
}
if include_null:
# If required, do a second query that retrieves all of the rooms we know
# of so we can handle rooms with no retention policy.
sql = "SELECT DISTINCT room_id FROM current_state_events"
txn.execute(sql)
rows = self.cursor_to_dict(txn)
# If a room isn't already in the dict (i.e. it doesn't have a retention
# policy in its state), add it with a null policy.
for row in rows:
if row["room_id"] not in rooms_dict:
rooms_dict[row["room_id"]] = {
"min_lifetime": None,
"max_lifetime": None,
}
return rooms_dict
rooms = yield self.runInteraction(
"get_rooms_for_retention_period_in_range",
get_rooms_for_retention_period_in_range_txn,
)
defer.returnValue(rooms)

View File

@ -0,0 +1,17 @@
/* Copyright 2019 Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- store the current etag of backup version
ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;

View File

@ -0,0 +1,33 @@
/* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Tracks the retention policy of a room.
-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
-- the room's retention policy state event.
-- If a room doesn't have a retention policy state event in its state, both max_lifetime
-- and min_lifetime are NULL.
CREATE TABLE IF NOT EXISTS room_retention(
room_id TEXT,
event_id TEXT,
min_lifetime BIGINT,
max_lifetime BIGINT,
PRIMARY KEY(room_id, event_id)
);
CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
INSERT INTO background_updates (update_name, progress_json) VALUES
('insert_room_retention', '{}');

View File

@ -44,7 +44,12 @@ MEMBERSHIP_PRIORITY = (
@defer.inlineCallbacks @defer.inlineCallbacks
def filter_events_for_client( def filter_events_for_client(
storage: Storage, user_id, events, is_peeking=False, always_include_ids=frozenset() storage: Storage,
user_id,
events,
is_peeking=False,
always_include_ids=frozenset(),
apply_retention_policies=True,
): ):
""" """
Check which events a user is allowed to see Check which events a user is allowed to see
@ -59,6 +64,10 @@ def filter_events_for_client(
events events
always_include_ids (set(event_id)): set of event ids to specifically always_include_ids (set(event_id)): set of event ids to specifically
include (unless sender is ignored) include (unless sender is ignored)
apply_retention_policies (bool): Whether to filter out events that's older than
allowed by the room's retention policy. Useful when this function is called
to e.g. check whether a user should be allowed to see the state at a given
event rather than to know if it should send an event to a user's client(s).
Returns: Returns:
Deferred[list[synapse.events.EventBase]] Deferred[list[synapse.events.EventBase]]
@ -86,6 +95,15 @@ def filter_events_for_client(
erased_senders = yield storage.main.are_users_erased((e.sender for e in events)) erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
if apply_retention_policies:
room_ids = set(e.room_id for e in events)
retention_policies = {}
for room_id in room_ids:
retention_policies[
room_id
] = yield storage.main.get_retention_policy_for_room(room_id)
def allowed(event): def allowed(event):
""" """
Args: Args:
@ -103,6 +121,18 @@ def filter_events_for_client(
if not event.is_state() and event.sender in ignore_list: if not event.is_state() and event.sender in ignore_list:
return None return None
# Don't try to apply the room's retention policy if the event is a state event, as
# MSC1763 states that retention is only considered for non-state events.
if apply_retention_policies and not event.is_state():
retention_policy = retention_policies[event.room_id]
max_lifetime = retention_policy.get("max_lifetime")
if max_lifetime is not None:
oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
if event.origin_server_ts < oldest_allowed_ts:
return None
if event.event_id in always_include_ids: if event.event_id in always_include_ids:
return event return event

View File

@ -29,3 +29,7 @@ Enabling an unknown default rule fails with 404
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1663 # Blacklisted due to https://github.com/matrix-org/synapse/issues/1663
New federated private chats get full presence information (SYN-115) New federated private chats get full presence information (SYN-115)
# Blacklisted due to https://github.com/matrix-org/matrix-doc/pull/2314 removing
# this requirement from the spec
Inbound federation of state requires event_id as a mandatory paramater

View File

@ -18,17 +18,14 @@ from mock import Mock
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client.v1 import login, room from synapse.rest.client.v1 import login, room
from synapse.types import UserID from synapse.types import UserID
from synapse.util.ratelimitutils import FederationRateLimiter
from tests import unittest from tests import unittest
class RoomComplexityTests(unittest.HomeserverTestCase): class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
servlets = [ servlets = [
admin.register_servlets, admin.register_servlets,
@ -41,25 +38,6 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
config["limit_remote_rooms"] = {"enabled": True, "complexity": 0.05} config["limit_remote_rooms"] = {"enabled": True, "complexity": 0.05}
return config return config
def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return defer.succeed("otherserver.nottld")
ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)
def test_complexity_simple(self): def test_complexity_simple(self):
u1 = self.register_user("u1", "pass") u1 = self.register_user("u1", "pass")
@ -105,7 +83,7 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
d = handler._remote_join( d = handler._remote_join(
None, None,
["otherserver.example"], ["other.example.com"],
"roomid", "roomid",
UserID.from_string(u1), UserID.from_string(u1),
{"membership": "join"}, {"membership": "join"},
@ -146,7 +124,7 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
d = handler._remote_join( d = handler._remote_join(
None, None,
["otherserver.example"], ["other.example.com"],
room_1, room_1,
UserID.from_string(u1), UserID.from_string(u1),
{"membership": "join"}, {"membership": "join"},

View File

@ -19,7 +19,7 @@ from twisted.internet import defer
from synapse.types import ReadReceipt from synapse.types import ReadReceipt
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase, override_config
class FederationSenderTestCases(HomeserverTestCase): class FederationSenderTestCases(HomeserverTestCase):
@ -29,6 +29,7 @@ class FederationSenderTestCases(HomeserverTestCase):
federation_transport_client=Mock(spec=["send_transaction"]), federation_transport_client=Mock(spec=["send_transaction"]),
) )
@override_config({"send_federation": True})
def test_send_receipts(self): def test_send_receipts(self):
mock_state_handler = self.hs.get_state_handler() mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"] mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
@ -69,6 +70,7 @@ class FederationSenderTestCases(HomeserverTestCase):
], ],
) )
@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self): def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but """Send two receipts in quick succession; the second should be flushed, but
only after 20ms""" only after 20ms"""

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd # Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -16,6 +17,8 @@ import logging
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.federation.federation_server import server_matches_acl_event from synapse.federation.federation_server import server_matches_acl_event
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest from tests import unittest
@ -41,6 +44,66 @@ class ServerACLsTestCase(unittest.TestCase):
self.assertTrue(server_matches_acl_event("1:2:3:4", e)) self.assertTrue(server_matches_acl_event("1:2:3:4", e))
class StateQueryTests(unittest.FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def test_without_event_id(self):
"""
Querying v1/state/<room_id> without an event ID will return the current
known state.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.inject_room_member(room_1, "@user:other.example.com", "join")
request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertEqual(
channel.json_body["room_version"],
self.hs.config.default_room_version.identifier,
)
members = set(
map(
lambda x: x["state_key"],
filter(
lambda x: x["type"] == "m.room.member", channel.json_body["pdus"]
),
)
)
self.assertEqual(members, set(["@user:other.example.com", u1]))
self.assertEqual(len(channel.json_body["pdus"]), 6)
def test_needs_to_be_in_room(self):
"""
Querying v1/state/<room_id> requires the server
be in the room to provide data.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(403, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
def _create_acl_event(content): def _create_acl_event(content):
return FrozenEvent( return FrozenEvent(
{ {

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd # Copyright 2016 OpenMarket Ltd
# Copyright 2017 New Vector Ltd # Copyright 2017 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -94,23 +95,29 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version # check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user) res = yield self.handler.get_version_info(self.local_user)
version_etag = res["etag"]
del res["etag"]
self.assertDictEqual( self.assertDictEqual(
res, res,
{ {
"version": "1", "version": "1",
"algorithm": "m.megolm_backup.v1", "algorithm": "m.megolm_backup.v1",
"auth_data": "first_version_auth_data", "auth_data": "first_version_auth_data",
"count": 0,
}, },
) )
# check we can retrieve it as a specific version # check we can retrieve it as a specific version
res = yield self.handler.get_version_info(self.local_user, "1") res = yield self.handler.get_version_info(self.local_user, "1")
self.assertEqual(res["etag"], version_etag)
del res["etag"]
self.assertDictEqual( self.assertDictEqual(
res, res,
{ {
"version": "1", "version": "1",
"algorithm": "m.megolm_backup.v1", "algorithm": "m.megolm_backup.v1",
"auth_data": "first_version_auth_data", "auth_data": "first_version_auth_data",
"count": 0,
}, },
) )
@ -126,12 +133,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version # check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user) res = yield self.handler.get_version_info(self.local_user)
del res["etag"]
self.assertDictEqual( self.assertDictEqual(
res, res,
{ {
"version": "2", "version": "2",
"algorithm": "m.megolm_backup.v1", "algorithm": "m.megolm_backup.v1",
"auth_data": "second_version_auth_data", "auth_data": "second_version_auth_data",
"count": 0,
}, },
) )
@ -158,12 +167,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version # check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user) res = yield self.handler.get_version_info(self.local_user)
del res["etag"]
self.assertDictEqual( self.assertDictEqual(
res, res,
{ {
"algorithm": "m.megolm_backup.v1", "algorithm": "m.megolm_backup.v1",
"auth_data": "revised_first_version_auth_data", "auth_data": "revised_first_version_auth_data",
"version": version, "version": version,
"count": 0,
}, },
) )
@ -207,12 +218,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version # check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user) res = yield self.handler.get_version_info(self.local_user)
del res["etag"] # etag is opaque, so don't test its contents
self.assertDictEqual( self.assertDictEqual(
res, res,
{ {
"algorithm": "m.megolm_backup.v1", "algorithm": "m.megolm_backup.v1",
"auth_data": "revised_first_version_auth_data", "auth_data": "revised_first_version_auth_data",
"version": version, "version": version,
"count": 0,
}, },
) )
@ -409,6 +422,11 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
yield self.handler.upload_room_keys(self.local_user, version, room_keys) yield self.handler.upload_room_keys(self.local_user, version, room_keys)
# get the etag to compare to future versions
res = yield self.handler.get_version_info(self.local_user)
backup_etag = res["etag"]
self.assertEqual(res["count"], 1)
new_room_keys = copy.deepcopy(room_keys) new_room_keys = copy.deepcopy(room_keys)
new_room_key = new_room_keys["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"] new_room_key = new_room_keys["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]
@ -423,6 +441,10 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
"SSBBTSBBIEZJU0gK", "SSBBTSBBIEZJU0gK",
) )
# the etag should be the same since the session did not change
res = yield self.handler.get_version_info(self.local_user)
self.assertEqual(res["etag"], backup_etag)
# test that marking the session as verified however /does/ replace it # test that marking the session as verified however /does/ replace it
new_room_key["is_verified"] = True new_room_key["is_verified"] = True
yield self.handler.upload_room_keys(self.local_user, version, new_room_keys) yield self.handler.upload_room_keys(self.local_user, version, new_room_keys)
@ -432,6 +454,11 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new" res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new"
) )
# the etag should NOT be equal now, since the key changed
res = yield self.handler.get_version_info(self.local_user)
self.assertNotEqual(res["etag"], backup_etag)
backup_etag = res["etag"]
# test that a session with a higher forwarded_count doesn't replace one # test that a session with a higher forwarded_count doesn't replace one
# with a lower forwarding count # with a lower forwarding count
new_room_key["forwarded_count"] = 2 new_room_key["forwarded_count"] = 2
@ -443,6 +470,10 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new" res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new"
) )
# the etag should be the same since the session did not change
res = yield self.handler.get_version_info(self.local_user)
self.assertEqual(res["etag"], backup_etag)
# TODO: check edge cases as well as the common variations here # TODO: check edge cases as well as the common variations here
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -24,6 +24,7 @@ from synapse.api.errors import AuthError
from synapse.types import UserID from synapse.types import UserID
from tests import unittest from tests import unittest
from tests.unittest import override_config
from tests.utils import register_federation_servlets from tests.utils import register_federation_servlets
# Some local users to test with # Some local users to test with
@ -174,6 +175,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
], ],
) )
@override_config({"send_federation": True})
def test_started_typing_remote_send(self): def test_started_typing_remote_send(self):
self.room_members = [U_APPLE, U_ONION] self.room_members = [U_APPLE, U_ONION]
@ -237,6 +239,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
], ],
) )
@override_config({"send_federation": True})
def test_stopped_typing(self): def test_stopped_typing(self):
self.room_members = [U_APPLE, U_BANANA, U_ONION] self.room_members = [U_APPLE, U_BANANA, U_ONION]

View File

@ -48,7 +48,10 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
server_factory = ReplicationStreamProtocolFactory(self.hs) server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = server_factory.streamer self.streamer = server_factory.streamer
handler_factory = Mock()
self.replication_handler = ReplicationClientHandler(self.slaved_store) self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler.factory = handler_factory
client_factory = ReplicationClientFactory( client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler self.hs, "client_name", self.replication_handler
) )

View File

@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from mock import Mock
from synapse.replication.tcp.commands import ReplicateCommand from synapse.replication.tcp.commands import ReplicateCommand
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@ -30,7 +32,9 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
server = server_factory.buildProtocol(None) server = server_factory.buildProtocol(None)
# build a replication client, with a dummy handler # build a replication client, with a dummy handler
handler_factory = Mock()
self.test_handler = TestReplicationClientHandler() self.test_handler = TestReplicationClientHandler()
self.test_handler.factory = handler_factory
self.client = ClientReplicationStreamProtocol( self.client = ClientReplicationStreamProtocol(
"client", "test", clock, self.test_handler "client", "test", clock, self.test_handler
) )

View File

@ -0,0 +1,293 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from synapse.api.constants import EventTypes
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.visibility import filter_events_for_client
from tests import unittest
one_hour_ms = 3600000
one_day_ms = one_hour_ms * 24
class RetentionTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
]
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["retention"] = {
"enabled": True,
"default_policy": {
"min_lifetime": one_day_ms,
"max_lifetime": one_day_ms * 3,
},
"allowed_lifetime_min": one_day_ms,
"allowed_lifetime_max": one_day_ms * 3,
}
self.hs = self.setup_test_homeserver(config=config)
return self.hs
def prepare(self, reactor, clock, homeserver):
self.user_id = self.register_user("user", "password")
self.token = self.login("user", "password")
def test_retention_state_event(self):
"""Tests that the server configuration can limit the values a user can set to the
room's retention policy.
"""
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
self.helper.send_state(
room_id=room_id,
event_type=EventTypes.Retention,
body={"max_lifetime": one_day_ms * 4},
tok=self.token,
expect_code=400,
)
self.helper.send_state(
room_id=room_id,
event_type=EventTypes.Retention,
body={"max_lifetime": one_hour_ms},
tok=self.token,
expect_code=400,
)
def test_retention_event_purged_with_state_event(self):
"""Tests that expired events are correctly purged when the room's retention policy
is defined by a state event.
"""
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
# Set the room's retention period to 2 days.
lifetime = one_day_ms * 2
self.helper.send_state(
room_id=room_id,
event_type=EventTypes.Retention,
body={"max_lifetime": lifetime},
tok=self.token,
)
self._test_retention_event_purged(room_id, one_day_ms * 1.5)
def test_retention_event_purged_without_state_event(self):
"""Tests that expired events are correctly purged when the room's retention policy
is defined by the server's configuration's default retention policy.
"""
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
self._test_retention_event_purged(room_id, one_day_ms * 2)
def test_visibility(self):
"""Tests that synapse.visibility.filter_events_for_client correctly filters out
outdated events
"""
store = self.hs.get_datastore()
storage = self.hs.get_storage()
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
events = []
# Send a first event, which should be filtered out at the end of the test.
resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
# Get the event from the store so that we end up with a FrozenEvent that we can
# give to filter_events_for_client. We need to do this now because the event won't
# be in the database anymore after it has expired.
events.append(self.get_success(store.get_event(resp.get("event_id"))))
# Advance the time by 2 days. We're using the default retention policy, therefore
# after this the first event will still be valid.
self.reactor.advance(one_day_ms * 2 / 1000)
# Send another event, which shouldn't get filtered out.
resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
valid_event_id = resp.get("event_id")
events.append(self.get_success(store.get_event(valid_event_id)))
# Advance the time by anothe 2 days. After this, the first event should be
# outdated but not the second one.
self.reactor.advance(one_day_ms * 2 / 1000)
# Run filter_events_for_client with our list of FrozenEvents.
filtered_events = self.get_success(
filter_events_for_client(storage, self.user_id, events)
)
# We should only get one event back.
self.assertEqual(len(filtered_events), 1, filtered_events)
# That event should be the second, not outdated event.
self.assertEqual(filtered_events[0].event_id, valid_event_id, filtered_events)
def _test_retention_event_purged(self, room_id, increment):
# Get the create event to, later, check that we can still access it.
message_handler = self.hs.get_message_handler()
create_event = self.get_success(
message_handler.get_room_data(self.user_id, room_id, EventTypes.Create)
)
# Send a first event to the room. This is the event we'll want to be purged at the
# end of the test.
resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
expired_event_id = resp.get("event_id")
# Check that we can retrieve the event.
expired_event = self.get_event(room_id, expired_event_id)
self.assertEqual(
expired_event.get("content", {}).get("body"), "1", expired_event
)
# Advance the time.
self.reactor.advance(increment / 1000)
# Send another event. We need this because the purge job won't purge the most
# recent event in the room.
resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
valid_event_id = resp.get("event_id")
# Advance the time again. Now our first event should have expired but our second
# one should still be kept.
self.reactor.advance(increment / 1000)
# Check that the event has been purged from the database.
self.get_event(room_id, expired_event_id, expected_code=404)
# Check that the event that hasn't been purged can still be retrieved.
valid_event = self.get_event(room_id, valid_event_id)
self.assertEqual(valid_event.get("content", {}).get("body"), "2", valid_event)
# Check that we can still access state events that were sent before the event that
# has been purged.
self.get_event(room_id, create_event.event_id)
def get_event(self, room_id, event_id, expected_code=200):
url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
request, channel = self.make_request("GET", url, access_token=self.token)
self.render(request)
self.assertEqual(channel.code, expected_code, channel.result)
return channel.json_body
class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
]
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["retention"] = {
"enabled": True,
}
mock_federation_client = Mock(spec=["backfill"])
self.hs = self.setup_test_homeserver(
config=config, federation_client=mock_federation_client,
)
return self.hs
def prepare(self, reactor, clock, homeserver):
self.user_id = self.register_user("user", "password")
self.token = self.login("user", "password")
def test_no_default_policy(self):
"""Tests that an event doesn't get expired if there is neither a default retention
policy nor a policy specific to the room.
"""
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
self._test_retention(room_id)
def test_state_policy(self):
"""Tests that an event gets correctly expired if there is no default retention
policy but there's a policy specific to the room.
"""
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
# Set the maximum lifetime to 35 days so that the first event gets expired but not
# the second one.
self.helper.send_state(
room_id=room_id,
event_type=EventTypes.Retention,
body={"max_lifetime": one_day_ms * 35},
tok=self.token,
)
self._test_retention(room_id, expected_code_for_first_event=404)
def _test_retention(self, room_id, expected_code_for_first_event=200):
# Send a first event to the room. This is the event we'll want to be purged at the
# end of the test.
resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
first_event_id = resp.get("event_id")
# Check that we can retrieve the event.
expired_event = self.get_event(room_id, first_event_id)
self.assertEqual(
expired_event.get("content", {}).get("body"), "1", expired_event
)
# Advance the time by a month.
self.reactor.advance(one_day_ms * 30 / 1000)
# Send another event. We need this because the purge job won't purge the most
# recent event in the room.
resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
second_event_id = resp.get("event_id")
# Advance the time by another month.
self.reactor.advance(one_day_ms * 30 / 1000)
# Check if the event has been purged from the database.
first_event = self.get_event(
room_id, first_event_id, expected_code=expected_code_for_first_event
)
if expected_code_for_first_event == 200:
self.assertEqual(
first_event.get("content", {}).get("body"), "1", first_event
)
# Check that the event that hasn't been purged can still be retrieved.
second_event = self.get_event(room_id, second_event_id)
self.assertEqual(second_event.get("content", {}).get("body"), "2", second_event)
def get_event(self, room_id, event_id, expected_code=200):
url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
request, channel = self.make_request("GET", url, access_token=self.token)
self.render(request)
self.assertEqual(channel.code, expected_code, channel.result)
return channel.json_body

View File

@ -1180,3 +1180,143 @@ class PerRoomProfilesForbiddenTestCase(unittest.HomeserverTestCase):
res_displayname = channel.json_body["content"]["displayname"] res_displayname = channel.json_body["content"]["displayname"]
self.assertEqual(res_displayname, self.displayname, channel.result) self.assertEqual(res_displayname, self.displayname, channel.result)
class RoomMembershipReasonTestCase(unittest.HomeserverTestCase):
"""Tests that clients can add a "reason" field to membership events and
that they get correctly added to the generated events and propagated.
"""
servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, homeserver):
self.creator = self.register_user("creator", "test")
self.creator_tok = self.login("creator", "test")
self.second_user_id = self.register_user("second", "test")
self.second_tok = self.login("second", "test")
self.room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok)
def test_join_reason(self):
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/join".format(self.room_id),
content={"reason": reason},
access_token=self.second_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def test_leave_reason(self):
self.helper.join(self.room_id, user=self.second_user_id, tok=self.second_tok)
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/leave".format(self.room_id),
content={"reason": reason},
access_token=self.second_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def test_kick_reason(self):
self.helper.join(self.room_id, user=self.second_user_id, tok=self.second_tok)
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/kick".format(self.room_id),
content={"reason": reason, "user_id": self.second_user_id},
access_token=self.second_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def test_ban_reason(self):
self.helper.join(self.room_id, user=self.second_user_id, tok=self.second_tok)
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/ban".format(self.room_id),
content={"reason": reason, "user_id": self.second_user_id},
access_token=self.creator_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def test_unban_reason(self):
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/unban".format(self.room_id),
content={"reason": reason, "user_id": self.second_user_id},
access_token=self.creator_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def test_invite_reason(self):
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/invite".format(self.room_id),
content={"reason": reason, "user_id": self.second_user_id},
access_token=self.creator_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def test_reject_invite_reason(self):
self.helper.invite(
self.room_id,
src=self.creator,
targ=self.second_user_id,
tok=self.creator_tok,
)
reason = "hello"
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/{}/leave".format(self.room_id),
content={"reason": reason},
access_token=self.second_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
self._check_for_reason(reason)
def _check_for_reason(self, reason):
request, channel = self.make_request(
"GET",
"/_matrix/client/r0/rooms/{}/state/m.room.member/{}".format(
self.room_id, self.second_user_id
),
access_token=self.creator_tok,
)
self.render(request)
self.assertEqual(channel.code, 200, channel.result)
event_content = channel.json_body
self.assertEqual(event_content.get("reason"), reason, channel.result)

View File

@ -203,6 +203,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
@unittest.override_config( @unittest.override_config(
{ {
"public_baseurl": "https://test_server",
"enable_registration_captcha": True, "enable_registration_captcha": True,
"user_consent": { "user_consent": {
"version": "1", "version": "1",

View File

@ -39,8 +39,8 @@ class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase):
) )
self.get_success( self.get_success(
self.store.set_e2e_room_key( self.store.add_e2e_room_keys(
"user_id", version1, "room", "session", room_key "user_id", version1, [("room", "session", room_key)]
) )
) )
@ -51,8 +51,8 @@ class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase):
) )
self.get_success( self.get_success(
self.store.set_e2e_room_key( self.store.add_e2e_room_keys(
"user_id", version2, "room", "session", room_key "user_id", version2, [("room", "session", room_key)]
) )
) )

View File

@ -16,8 +16,7 @@
from unittest.mock import Mock from unittest.mock import Mock
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import Membership
from synapse.api.room_versions import RoomVersions
from synapse.rest.admin import register_servlets_for_client_rest_resource from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client.v1 import login, room from synapse.rest.client.v1 import login, room
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
@ -44,9 +43,6 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
# We can't test the RoomMemberStore on its own without the other event # We can't test the RoomMemberStore on its own without the other event
# storage logic # storage logic
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler()
self.u_alice = self.register_user("alice", "pass") self.u_alice = self.register_user("alice", "pass")
self.t_alice = self.login("alice", "pass") self.t_alice = self.login("alice", "pass")
@ -55,26 +51,6 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
# User elsewhere on another host # User elsewhere on another host
self.u_charlie = UserID.from_string("@charlie:elsewhere") self.u_charlie = UserID.from_string("@charlie:elsewhere")
def inject_room_member(self, room, user, membership, replaces_state=None):
builder = self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Member,
"sender": user,
"state_key": user,
"room_id": room,
"content": {"membership": membership},
},
)
event, context = self.get_success(
self.event_creation_handler.create_new_client_event(builder)
)
self.get_success(self.storage.persistence.persist_event(event, context))
return event
def test_one_member(self): def test_one_member(self):
# Alice creates the room, and is automatically joined # Alice creates the room, and is automatically joined

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector # Copyright 2018 New Vector
# Copyright 2019 Matrix.org Federation C.I.C
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -13,6 +14,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import gc import gc
import hashlib import hashlib
import hmac import hmac
@ -27,13 +29,17 @@ from twisted.internet.defer import Deferred, succeed
from twisted.python.threadpool import ThreadPool from twisted.python.threadpool import ThreadPool
from twisted.trial import unittest from twisted.trial import unittest
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server as federation_server
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.context import LoggingContext from synapse.logging.context import LoggingContext
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import Requester, UserID, create_requester from synapse.types import Requester, UserID, create_requester
from synapse.util.ratelimitutils import FederationRateLimiter
from tests.server import get_clock, make_request, render, setup_test_homeserver from tests.server import get_clock, make_request, render, setup_test_homeserver
from tests.test_utils.logging_setup import setup_logging from tests.test_utils.logging_setup import setup_logging
@ -559,6 +565,66 @@ class HomeserverTestCase(TestCase):
self.render(request) self.render(request)
self.assertEqual(channel.code, 403, channel.result) self.assertEqual(channel.code, 403, channel.result)
def inject_room_member(self, room: str, user: str, membership: Membership) -> None:
"""
Inject a membership event into a room.
Args:
room: Room ID to inject the event into.
user: MXID of the user to inject the membership for.
membership: The membership type.
"""
event_builder_factory = self.hs.get_event_builder_factory()
event_creation_handler = self.hs.get_event_creation_handler()
room_version = self.get_success(self.hs.get_datastore().get_room_version(room))
builder = event_builder_factory.for_room_version(
KNOWN_ROOM_VERSIONS[room_version],
{
"type": EventTypes.Member,
"sender": user,
"state_key": user,
"room_id": room,
"content": {"membership": membership},
},
)
event, context = self.get_success(
event_creation_handler.create_new_client_event(builder)
)
self.get_success(
self.hs.get_storage().persistence.persist_event(event, context)
)
class FederatingHomeserverTestCase(HomeserverTestCase):
"""
A federating homeserver that authenticates incoming requests as `other.example.com`.
"""
def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return succeed("other.example.com")
ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
federation_server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)
return super().prepare(reactor, clock, homeserver)
def override_config(extra_config): def override_config(extra_config):
"""A decorator which can be applied to test functions to give additional HS config """A decorator which can be applied to test functions to give additional HS config

View File

@ -109,6 +109,7 @@ def default_config(name, parse=False):
""" """
config_dict = { config_dict = {
"server_name": name, "server_name": name,
"send_federation": False,
"media_store_path": "media", "media_store_path": "media",
"uploads_path": "uploads", "uploads_path": "uploads",
# the test signing key is just an arbitrary ed25519 key to keep the config # the test signing key is just an arbitrary ed25519 key to keep the config