Merge branch 'develop' into matrix-org-hotfixes

matrix-org-hotfixes-identity
Richard van der Hoff 2019-03-19 12:19:20 +00:00
commit 2561b628af
67 changed files with 2227 additions and 1320 deletions

10
.gitignore vendored
View File

@ -12,11 +12,15 @@ _trial_temp/
_trial_temp*/
# stuff that is likely to exist when you run a server locally
/*.db
/*.log
/*.log.config
/*.pid
/*.signing.key
/*.tls.crt
/*.tls.key
/uploads
/env/
/homeserver*.yaml
/media_store/
/uploads
# IDEs
/.idea/

1
changelog.d/4662.misc Normal file
View File

@ -0,0 +1 @@
Add a systemd setup that supports synapse workers. Contributed by Luca Corbatto.

1
changelog.d/4821.feature Normal file
View File

@ -0,0 +1 @@
Add configurable rate limiting to the /login endpoint.

1
changelog.d/4843.misc Normal file
View File

@ -0,0 +1 @@
Add stuff back to the .gitignore.

1
changelog.d/4852.misc Normal file
View File

@ -0,0 +1 @@
Move client read-receipt processing to federation sender worker.

1
changelog.d/4853.feature Normal file
View File

@ -0,0 +1 @@
Allow passing --daemonize flags to workers in the same way as with master.

1
changelog.d/4855.misc Normal file
View File

@ -0,0 +1 @@
Refactor federation TransactionQueue.

1
changelog.d/4863.misc Normal file
View File

@ -0,0 +1 @@
Comment out most options in the generated config.

1
changelog.d/4864.feature Normal file
View File

@ -0,0 +1 @@
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.

1
changelog.d/4865.feature Normal file
View File

@ -0,0 +1 @@
Add configurable rate limiting to the /login endpoint.

1
changelog.d/4881.misc Normal file
View File

@ -0,0 +1 @@
Update link to federation docs.

1
changelog.d/4886.bugfix Normal file
View File

@ -0,0 +1 @@
fix test_auto_create_auto_join_where_no_consent.

1
changelog.d/4886.misc Normal file
View File

@ -0,0 +1 @@
fix test_auto_create_auto_join_where_no_consent.

1
changelog.d/4887.feature Normal file
View File

@ -0,0 +1 @@
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.

View File

@ -0,0 +1,150 @@
# Setup Synapse with Workers and Systemd
This is a setup for managing synapse with systemd including support for
managing workers. It provides a `matrix-synapse`, as well as a
`matrix-synapse-worker@` service for any workers you require. Additionally to
group the required services it sets up a `matrix.target`. You can use this to
automatically start any bot- or bridge-services. More on this in
[Bots and Bridges](#bots-and-bridges).
See the folder [system](system) for any service and target files.
The folder [workers](workers) contains an example configuration for the
`federation_reader` worker. Pay special attention to the name of the
configuration file. In order to work with the `matrix-synapse-worker@.service`
service, it needs to have the exact same name as the worker app.
This setup expects neither the homeserver nor any workers to fork. Forking is
handled by systemd.
## Setup
1. Adjust your matrix configs. Make sure that the worker config files have the
exact same name as the worker app. Compare `matrix-synapse-worker@.service` for
why. You can find an example worker config in the [workers](workers) folder. See
below for relevant settings in the `homeserver.yaml`.
2. Copy the `*.service` and `*.target` files in [system](system) to
`/etc/systemd/system`.
3. `systemctl enable matrix-synapse.service` this adds the homeserver
app to the `matrix.target`
4. *Optional.* `systemctl enable
matrix-synapse-worker@federation_reader.service` this adds the federation_reader
app to the `matrix-synapse.service`
5. *Optional.* Repeat step 4 for any additional workers you require.
6. *Optional.* Add any bots or bridges by enabling them.
7. Start all matrix related services via `systemctl start matrix.target`
8. *Optional.* Enable autostart of all matrix related services on system boot
via `systemctl enable matrix.target`
## Usage
After you have setup you can use the following commands to manage your synapse
installation:
```
# Start matrix-synapse, all workers and any enabled bots or bridges.
systemctl start matrix.target
# Restart matrix-synapse and all workers (not necessarily restarting bots
# or bridges, see "Bots and Bridges")
systemctl restart matrix-synapse.service
# Stop matrix-synapse and all workers (not necessarily restarting bots
# or bridges, see "Bots and Bridges")
systemctl stop matrix-synapse.service
# Restart a specific worker (i. e. federation_reader), the homeserver is
# unaffected by this.
systemctl restart matrix-synapse-worker@federation_reader.service
# Add a new worker (assuming all configs are setup already)
systemctl enable matrix-synapse-worker@federation_writer.service
systemctl restart matrix-synapse.service
```
## The Configs
Make sure the `worker_app` is set in the `homeserver.yaml` and it does not fork.
```
worker_app: synapse.app.homeserver
daemonize: false
```
None of the workers should fork, as forking is handled by systemd. Hence make
sure this is present in all worker config files.
```
worker_daemonize: false
```
The config files of all workers are expected to be located in
`/etc/matrix-synapse/workers`. If you want to use a different location you have
to edit the provided `*.service` files accordingly.
## Bots and Bridges
Most bots and bridges do not care if the homeserver goes down or is restarted.
Depending on the implementation this may crash them though. So look up the docs
or ask the community of the specific bridge or bot you want to run to make sure
you choose the correct setup.
Whichever configuration you choose, after the setup the following will enable
automatically starting (and potentially restarting) your bot/bridge with the
`matrix.target`.
```
systemctl enable <yourBotOrBridgeName>.service
```
**Note** that from an inactive synapse the bots/bridges will only be started with
synapse if you start the `matrix.target`, not if you start the
`matrix-synapse.service`. This is on purpose. Think of `matrix-synapse.service`
as *just* synapse, but `matrix.target` being anything matrix related, including
synapse and any and all enabled bots and bridges.
### Start with synapse but ignore synapse going down
If the bridge can handle shutdowns of the homeserver you'll want to install the
service in the `matrix.target` and optionally add a
`After=matrix-synapse.service` dependency to have the bot/bridge start after
synapse on starting everything.
In this case the service file should look like this.
```
[Unit]
# ...
# Optional, this will only ensure that if you start everything, synapse will
# be started before the bot/bridge will be started.
After=matrix-synapse.service
[Service]
# ...
[Install]
WantedBy=matrix.target
```
### Stop/restart when synapse stops/restarts
If the bridge can't handle shutdowns of the homeserver you'll still want to
install the service in the `matrix.target` but also have to specify the
`After=matrix-synapse.service` *and* `BindsTo=matrix-synapse.service`
dependencies to have the bot/bridge stop/restart with synapse.
In this case the service file should look like this.
```
[Unit]
# ...
# Mandatory
After=matrix-synapse.service
BindsTo=matrix-synapse.service
[Service]
# ...
[Install]
WantedBy=matrix.target
```

View File

@ -0,0 +1,17 @@
[Unit]
Description=Synapse Matrix Worker
After=matrix-synapse.service
BindsTo=matrix-synapse.service
[Service]
Type=simple
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.%i --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
[Install]
WantedBy=matrix-synapse.service

View File

@ -0,0 +1,16 @@
[Unit]
Description=Synapse Matrix Homeserver
[Service]
Type=simple
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
[Install]
WantedBy=matrix.target

View File

@ -0,0 +1,7 @@
[Unit]
Description=Contains matrix services like synapse, bridges and bots
After=network.target
AllowIsolate=no
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,14 @@
worker_app: synapse.app.federation_reader
worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_replication_http_port: 9093
worker_listeners:
- type: http
port: 8011
resources:
- names: [federation]
worker_daemonize: false
worker_log_config: /etc/matrix-synapse/federation-reader-log.yaml

View File

@ -15,8 +15,8 @@ machine's public DNS hostname, and provide Synapse with a TLS certificate
which is valid for your ``server_name``.
Once you have completed the steps necessary to federate, you should be able to
join a room via federation. (A good place to start is ``#synapse:matrix.org``
- a room for Synapse admins.)
join a room via federation. (A good place to start is ``#synapse:matrix.org`` - a
room for Synapse admins.)
## Delegation
@ -89,7 +89,6 @@ In our example, we would need to add this SRV record in the
_matrix._tcp.example.com. 3600 IN SRV 10 5 443 synapse.example.com.
Once done and set up, you can check the DNS record with ``dig -t srv
_matrix._tcp.<server_name>``. In our example, we would expect this:
@ -117,7 +116,6 @@ you invite them to. This can be caused by an incorrectly-configured reverse
proxy: see [reverse_proxy.rst](<reverse_proxy.rst>) for instructions on how to correctly
configure a reverse proxy.
## Running a Demo Federation of Synapses
If you want to get up and running quickly with a trio of homeservers in a

View File

@ -18,7 +18,7 @@ servers do not necessarily need to connect to your server via the same server
name or port. Indeed, clients will use port 443 by default, whereas servers
default to port 8448. Where these are different, we refer to the 'client port'
and the 'federation port'. See `Setting up federation
<../README.rst#setting-up-federation>`_ for more details of the algorithm used for
<federate.md>`_ for more details of the algorithm used for
federation connections.
Let's assume that we expect clients to connect to our server at

View File

@ -63,11 +63,11 @@ pid_file: DATADIR/homeserver.pid
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
#
soft_file_limit: 0
#soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
#
use_presence: true
#use_presence: false
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
#
@ -359,7 +359,8 @@ database:
database: "DATADIR/homeserver.db"
# Number of events to cache in memory.
event_cache_size: "10K"
#
#event_cache_size: 10K
## Logging ##
@ -373,46 +374,69 @@ log_config: "CONFDIR/SERVERNAME.log.config"
# Number of messages a client can send per second
#
rc_messages_per_second: 0.2
#rc_messages_per_second: 0.2
# Number of message a client can send before being throttled
#
rc_message_burst_count: 10.0
#rc_message_burst_count: 10.0
# Ratelimiting settings for registration and login.
#
# Each ratelimiting configuration is made of two parameters:
# - per_second: number of requests a client can send per second.
# - burst_count: number of requests a client can send before being throttled.
#
# Synapse currently uses the following configurations:
# - one for registration that ratelimits registration requests based on the
# client's IP address.
# - one for login that ratelimits login requests based on the client's IP
# address.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into, based on the amount of failed login
# attempts for this account.
#
# The defaults are as shown below.
#
#rc_registration:
# per_second: 0.17
# burst_count: 3
#
#rc_login:
# address:
# per_second: 0.17
# burst_count: 3
# account:
# per_second: 0.17
# burst_count: 3
# failed_attempts:
# per_second: 0.17
# burst_count: 3
# The federation window size in milliseconds
#
federation_rc_window_size: 1000
#federation_rc_window_size: 1000
# The number of federation requests from a single server in a window
# before the server will delay processing the request.
#
federation_rc_sleep_limit: 10
#federation_rc_sleep_limit: 10
# The duration in milliseconds to delay processing events from
# remote servers by if they go over the sleep limit.
#
federation_rc_sleep_delay: 500
#federation_rc_sleep_delay: 500
# The maximum number of concurrent federation requests allowed
# from a single server
#
federation_rc_reject_limit: 50
#federation_rc_reject_limit: 50
# The number of federation requests to concurrently process from a
# single server
#
federation_rc_concurrent: 3
# Number of registration requests a client can send per second.
# Defaults to 1/minute (0.17).
#
#rc_registration_requests_per_second: 0.17
# Number of registration requests a client can send before being
# throttled.
# Defaults to 3.
#
#rc_registration_request_burst_count: 3.0
#federation_rc_concurrent: 3
@ -441,11 +465,11 @@ uploads_path: "DATADIR/uploads"
# The largest allowed upload size in bytes
#
max_upload_size: "10M"
#max_upload_size: 10M
# Maximum number of pixels that will be thumbnailed
#
max_image_pixels: "32M"
#max_image_pixels: 32M
# Whether to generate new thumbnails on the fly to precisely match
# the resolution requested by the client. If true then whenever
@ -453,32 +477,32 @@ max_image_pixels: "32M"
# generate a new thumbnail. If false the server will pick a thumbnail
# from a precalculated list.
#
dynamic_thumbnails: false
#dynamic_thumbnails: false
# List of thumbnails to precalculate when an image is uploaded.
#
thumbnail_sizes:
- width: 32
height: 32
method: crop
- width: 96
height: 96
method: crop
- width: 320
height: 240
method: scale
- width: 640
height: 480
method: scale
- width: 800
height: 600
method: scale
#thumbnail_sizes:
# - width: 32
# height: 32
# method: crop
# - width: 96
# height: 96
# method: crop
# - width: 320
# height: 240
# method: scale
# - width: 640
# height: 480
# method: scale
# - width: 800
# height: 600
# method: scale
# Is the preview URL API enabled? If enabled, you *must* specify
# an explicit url_preview_ip_range_blacklist of IPs that the spider is
# denied from accessing.
#
url_preview_enabled: False
#url_preview_enabled: false
# List of IP address CIDR ranges that the URL preview spider is denied
# from accessing. There are no defaults: you must explicitly
@ -543,8 +567,8 @@ url_preview_enabled: False
# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
# The largest allowed URL preview spidering size in bytes
max_spider_size: "10M"
#
#max_spider_size: 10M
## Captcha ##
@ -552,23 +576,25 @@ max_spider_size: "10M"
# This Home Server's ReCAPTCHA public key.
#
recaptcha_public_key: "YOUR_PUBLIC_KEY"
#recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key.
#
recaptcha_private_key: "YOUR_PRIVATE_KEY"
#recaptcha_private_key: "YOUR_PRIVATE_KEY"
# Enables ReCaptcha checks when registering, preventing signup
# unless a captcha is answered. Requires a valid ReCaptcha
# public/private key.
#
enable_registration_captcha: False
#enable_registration_captcha: false
# A secret key used to bypass the captcha test entirely.
#
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
#
#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
## TURN ##
@ -589,7 +615,7 @@ recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
# How long generated TURN credentials last
#
turn_user_lifetime: "1h"
#turn_user_lifetime: 1h
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
@ -597,15 +623,17 @@ turn_user_lifetime: "1h"
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
#
turn_allow_guests: True
#turn_allow_guests: True
## Registration ##
#
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users.
enable_registration: False
#
#enable_registration: false
# The user must provide all of the below types of 3PID when registering.
#
@ -616,7 +644,7 @@ enable_registration: False
# Explicitly disable asking for MSISDNs from the registration
# flow (overrides registrations_require_3pid if MSISDNs are set as required)
#
#disable_msisdn_registration: True
#disable_msisdn_registration: true
# Mandate that users are only allowed to associate certain formats of
# 3PIDs with accounts on this server.
@ -640,13 +668,13 @@ enable_registration: False
# N.B. that increasing this will exponentially increase the time required
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
#
bcrypt_rounds: 12
#bcrypt_rounds: 12
# Allows users to register as guests without a password/email/etc, and
# participate in rooms hosted on this server which have been made
# accessible to anonymous users.
#
allow_guest_access: False
#allow_guest_access: false
# The identity server which we suggest that clients should use when users log
# in on this server.
@ -662,9 +690,9 @@ allow_guest_access: False
# Also defines the ID server which will be called when an account is
# deactivated (one will be picked arbitrarily).
#
trusted_third_party_id_servers:
- matrix.org
- vector.im
#trusted_third_party_id_servers:
# - matrix.org
# - vector.im
# Users who register on this homeserver will automatically be joined
# to these rooms
@ -678,14 +706,14 @@ trusted_third_party_id_servers:
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
autocreate_auto_join_rooms: true
#autocreate_auto_join_rooms: true
## Metrics ###
# Enable collection and rendering of performance metrics
#
enable_metrics: False
#enable_metrics: False
# Enable sentry integration
# NOTE: While attempts are made to ensure that the logs don't contain
@ -705,22 +733,24 @@ enable_metrics: False
# A list of event types that will be included in the room_invite_state
#
room_invite_state_types:
- "m.room.join_rules"
- "m.room.canonical_alias"
- "m.room.avatar"
- "m.room.encryption"
- "m.room.name"
#room_invite_state_types:
# - "m.room.join_rules"
# - "m.room.canonical_alias"
# - "m.room.avatar"
# - "m.room.encryption"
# - "m.room.name"
# A list of application service config file to use
# A list of application service config files to use
#
app_service_config_files: []
#app_service_config_files:
# - app_service_1.yaml
# - app_service_2.yaml
# Whether or not to track application service IP addresses. Implicitly
# Uncomment to enable tracking of application service IP addresses. Implicitly
# enables MAU tracking for application service users.
#
track_appservice_user_ips: False
#track_appservice_user_ips: True
# a secret which is used to sign access tokens. If none is specified,
@ -731,7 +761,7 @@ track_appservice_user_ips: False
# Used to enable access token expiration.
#
expire_access_token: False
#expire_access_token: False
# a secret which is used to calculate HMACs for form values, to stop
# falsification of values. Must be specified for the User Consent
@ -760,17 +790,16 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
# Determines how quickly servers will query to check which keys
# are still valid.
#
key_refresh_interval: "1d" # 1 Day.
#key_refresh_interval: 1d
# The trusted servers to download signing keys from.
#
perspectives:
servers:
"matrix.org":
verify_keys:
"ed25519:auto":
key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
#perspectives:
# servers:
# "matrix.org":
# verify_keys:
# "ed25519:auto":
# key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
# Enable SAML2 for registration and login. Uses pysaml2.
@ -835,14 +864,15 @@ perspectives:
# algorithm: "HS256"
# Enable password for login.
#
password_config:
enabled: true
# Uncomment to disable password login
#
#enabled: false
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
#pepper: ""
#
#pepper: "EVEN_MORE_SECRET"
@ -911,9 +941,9 @@ password_config:
# example_option: 'things'
# Whether to allow non server admins to create groups on this server
# Uncomment to allow non-server-admin users to create groups on this server
#
enable_group_creation: false
#enable_group_creation: true
# If enabled, non server admins can only create groups with local parts
# starting with this prefix

View File

@ -14,6 +14,8 @@
import collections
from synapse.api.errors import LimitExceededError
class Ratelimiter(object):
"""
@ -82,3 +84,13 @@ class Ratelimiter(object):
break
else:
del self.message_counts[key]
def ratelimit(self, key, time_now_s, rate_hz, burst_count, update=True):
allowed, time_allowed = self.can_do_action(
key, time_now_s, rate_hz, burst_count, update
)
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now_s)),
)

View File

@ -63,12 +63,13 @@ def start_worker_reactor(appname, config):
start_reactor(
appname,
config.soft_file_limit,
config.gc_thresholds,
config.worker_pid_file,
config.worker_daemonize,
config.worker_cpu_affinity,
logger,
soft_file_limit=config.soft_file_limit,
gc_thresholds=config.gc_thresholds,
pid_file=config.worker_pid_file,
daemonize=config.worker_daemonize,
cpu_affinity=config.worker_cpu_affinity,
print_pidfile=config.print_pidfile,
logger=logger,
)
@ -79,6 +80,7 @@ def start_reactor(
pid_file,
daemonize,
cpu_affinity,
print_pidfile,
logger,
):
""" Run the reactor in the main process
@ -93,6 +95,7 @@ def start_reactor(
pid_file (str): name of pid file to write to if daemonize is True
daemonize (bool): true to run the reactor in a background process
cpu_affinity (int|None): cpu affinity mask
print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
"""
@ -124,6 +127,9 @@ def start_reactor(
reactor.run()
if daemonize:
if print_pidfile:
print(pid_file)
daemon = Daemonize(
app=appname,
pid=pid_file,

View File

@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
@ -202,6 +205,7 @@ class FederationSenderHandler(object):
"""
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
"process_receipts_for_federation", self._on_new_receipts, rows,
)
@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
receipt.data,
)
yield self.federation_sender.send_read_receipt(receipt_info)
@defer.inlineCallbacks
def update_token(self, token):
try:

View File

@ -637,17 +637,15 @@ def run(hs):
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print(hs.config.pid_file)
_base.start_reactor(
"synapse-homeserver",
hs.config.soft_file_limit,
hs.config.gc_thresholds,
hs.config.pid_file,
hs.config.daemonize,
hs.config.cpu_affinity,
logger,
soft_file_limit=hs.config.soft_file_limit,
gc_thresholds=hs.config.gc_thresholds,
pid_file=hs.config.pid_file,
daemonize=hs.config.daemonize,
cpu_affinity=hs.config.cpu_affinity,
print_pidfile=hs.config.print_pidfile,
logger=logger,
)

View File

@ -214,14 +214,20 @@ class Config(object):
" Defaults to the directory containing the last config file",
)
obj = cls()
obj.invoke_all("add_arguments", config_parser)
config_args = config_parser.parse_args(argv)
config_files = find_config_files(search_paths=config_args.config_path)
obj = cls()
obj.read_config_files(
config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
obj.invoke_all("read_arguments", config_args)
return obj
@classmethod

View File

@ -34,10 +34,10 @@ class ApiConfig(Config):
# A list of event types that will be included in the room_invite_state
#
room_invite_state_types:
- "{JoinRules}"
- "{CanonicalAlias}"
- "{RoomAvatar}"
- "{RoomEncryption}"
- "{Name}"
#room_invite_state_types:
# - "{JoinRules}"
# - "{CanonicalAlias}"
# - "{RoomAvatar}"
# - "{RoomEncryption}"
# - "{Name}"
""".format(**vars(EventTypes))

View File

@ -37,14 +37,16 @@ class AppServiceConfig(Config):
def default_config(cls, **kwargs):
return """\
# A list of application service config file to use
# A list of application service config files to use
#
app_service_config_files: []
#app_service_config_files:
# - app_service_1.yaml
# - app_service_2.yaml
# Whether or not to track application service IP addresses. Implicitly
# Uncomment to enable tracking of application service IP addresses. Implicitly
# enables MAU tracking for application service users.
#
track_appservice_user_ips: False
#track_appservice_user_ips: True
"""

View File

@ -18,11 +18,16 @@ from ._base import Config
class CaptchaConfig(Config):
def read_config(self, config):
self.recaptcha_private_key = config["recaptcha_private_key"]
self.recaptcha_public_key = config["recaptcha_public_key"]
self.enable_registration_captcha = config["enable_registration_captcha"]
self.recaptcha_private_key = config.get("recaptcha_private_key")
self.recaptcha_public_key = config.get("recaptcha_public_key")
self.enable_registration_captcha = config.get(
"enable_registration_captcha", False
)
self.captcha_bypass_secret = config.get("captcha_bypass_secret")
self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
self.recaptcha_siteverify_api = config.get(
"recaptcha_siteverify_api",
"https://www.recaptcha.net/recaptcha/api/siteverify",
)
def default_config(self, **kwargs):
return """\
@ -31,21 +36,23 @@ class CaptchaConfig(Config):
# This Home Server's ReCAPTCHA public key.
#
recaptcha_public_key: "YOUR_PUBLIC_KEY"
#recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key.
#
recaptcha_private_key: "YOUR_PRIVATE_KEY"
#recaptcha_private_key: "YOUR_PRIVATE_KEY"
# Enables ReCaptcha checks when registering, preventing signup
# unless a captcha is answered. Requires a valid ReCaptcha
# public/private key.
#
enable_registration_captcha: False
#enable_registration_captcha: false
# A secret key used to bypass the captcha test entirely.
#
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
#
#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
"""

View File

@ -60,7 +60,8 @@ class DatabaseConfig(Config):
database: "%(database_path)s"
# Number of events to cache in memory.
event_cache_size: "10K"
#
#event_cache_size: 10K
""" % locals()
def read_arguments(self, args):

View File

@ -23,9 +23,9 @@ class GroupsConfig(Config):
def default_config(self, **kwargs):
return """\
# Whether to allow non server admins to create groups on this server
# Uncomment to allow non-server-admin users to create groups on this server
#
enable_group_creation: false
#enable_group_creation: true
# If enabled, non server admins can only create groups with local parts
# starting with this prefix

View File

@ -43,10 +43,16 @@ class KeyConfig(Config):
config.get("old_signing_keys", {})
)
self.key_refresh_interval = self.parse_duration(
config["key_refresh_interval"]
config.get("key_refresh_interval", "1d"),
)
self.perspectives = self.read_perspectives(
config["perspectives"]
config.get("perspectives", {}).get("servers", {
"matrix.org": {"verify_keys": {
"ed25519:auto": {
"key": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw",
}
}}
})
)
self.macaroon_secret_key = config.get(
@ -88,7 +94,7 @@ class KeyConfig(Config):
# Used to enable access token expiration.
#
expire_access_token: False
#expire_access_token: False
# a secret which is used to calculate HMACs for form values, to stop
# falsification of values. Must be specified for the User Consent
@ -117,21 +123,21 @@ class KeyConfig(Config):
# Determines how quickly servers will query to check which keys
# are still valid.
#
key_refresh_interval: "1d" # 1 Day.
#key_refresh_interval: 1d
# The trusted servers to download signing keys from.
#
perspectives:
servers:
"matrix.org":
verify_keys:
"ed25519:auto":
key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
#perspectives:
# servers:
# "matrix.org":
# verify_keys:
# "ed25519:auto":
# key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
""" % locals()
def read_perspectives(self, perspectives_config):
def read_perspectives(self, perspectives_servers):
servers = {}
for server_name, server_config in perspectives_config["servers"].items():
for server_name, server_config in perspectives_servers.items():
for key_id, key_data in server_config["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]

View File

@ -24,7 +24,7 @@ MISSING_SENTRY = (
class MetricsConfig(Config):
def read_config(self, config):
self.enable_metrics = config["enable_metrics"]
self.enable_metrics = config.get("enable_metrics", False)
self.report_stats = config.get("report_stats", None)
self.metrics_port = config.get("metrics_port")
self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
@ -48,7 +48,7 @@ class MetricsConfig(Config):
# Enable collection and rendering of performance metrics
#
enable_metrics: False
#enable_metrics: False
# Enable sentry integration
# NOTE: While attempts are made to ensure that the logs don't contain

View File

@ -22,16 +22,21 @@ class PasswordConfig(Config):
def read_config(self, config):
password_config = config.get("password_config", {})
if password_config is None:
password_config = {}
self.password_enabled = password_config.get("enabled", True)
self.password_pepper = password_config.get("pepper", "")
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Enable password for login.
#
return """\
password_config:
enabled: true
# Uncomment to disable password login
#
#enabled: false
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
#pepper: ""
#
#pepper: "EVEN_MORE_SECRET"
"""

View File

@ -15,69 +15,100 @@
from ._base import Config
class RateLimitConfig(object):
def __init__(self, config):
self.per_second = config.get("per_second", 0.17)
self.burst_count = config.get("burst_count", 3.0)
class RatelimitConfig(Config):
def read_config(self, config):
self.rc_messages_per_second = config["rc_messages_per_second"]
self.rc_message_burst_count = config["rc_message_burst_count"]
self.rc_messages_per_second = config.get("rc_messages_per_second", 0.2)
self.rc_message_burst_count = config.get("rc_message_burst_count", 10.0)
self.federation_rc_window_size = config["federation_rc_window_size"]
self.federation_rc_sleep_limit = config["federation_rc_sleep_limit"]
self.federation_rc_sleep_delay = config["federation_rc_sleep_delay"]
self.federation_rc_reject_limit = config["federation_rc_reject_limit"]
self.federation_rc_concurrent = config["federation_rc_concurrent"]
self.rc_registration = RateLimitConfig(config.get("rc_registration", {}))
self.rc_registration_requests_per_second = config.get(
"rc_registration_requests_per_second", 0.17,
)
self.rc_registration_request_burst_count = config.get(
"rc_registration_request_burst_count", 3,
rc_login_config = config.get("rc_login", {})
self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {}))
self.rc_login_account = RateLimitConfig(rc_login_config.get("account", {}))
self.rc_login_failed_attempts = RateLimitConfig(
rc_login_config.get("failed_attempts", {}),
)
self.federation_rc_window_size = config.get("federation_rc_window_size", 1000)
self.federation_rc_sleep_limit = config.get("federation_rc_sleep_limit", 10)
self.federation_rc_sleep_delay = config.get("federation_rc_sleep_delay", 500)
self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50)
self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3)
def default_config(self, **kwargs):
return """\
## Ratelimiting ##
# Number of messages a client can send per second
#
rc_messages_per_second: 0.2
#rc_messages_per_second: 0.2
# Number of message a client can send before being throttled
#
rc_message_burst_count: 10.0
#rc_message_burst_count: 10.0
# Ratelimiting settings for registration and login.
#
# Each ratelimiting configuration is made of two parameters:
# - per_second: number of requests a client can send per second.
# - burst_count: number of requests a client can send before being throttled.
#
# Synapse currently uses the following configurations:
# - one for registration that ratelimits registration requests based on the
# client's IP address.
# - one for login that ratelimits login requests based on the client's IP
# address.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into, based on the amount of failed login
# attempts for this account.
#
# The defaults are as shown below.
#
#rc_registration:
# per_second: 0.17
# burst_count: 3
#
#rc_login:
# address:
# per_second: 0.17
# burst_count: 3
# account:
# per_second: 0.17
# burst_count: 3
# failed_attempts:
# per_second: 0.17
# burst_count: 3
# The federation window size in milliseconds
#
federation_rc_window_size: 1000
#federation_rc_window_size: 1000
# The number of federation requests from a single server in a window
# before the server will delay processing the request.
#
federation_rc_sleep_limit: 10
#federation_rc_sleep_limit: 10
# The duration in milliseconds to delay processing events from
# remote servers by if they go over the sleep limit.
#
federation_rc_sleep_delay: 500
#federation_rc_sleep_delay: 500
# The maximum number of concurrent federation requests allowed
# from a single server
#
federation_rc_reject_limit: 50
#federation_rc_reject_limit: 50
# The number of federation requests to concurrently process from a
# single server
#
federation_rc_concurrent: 3
# Number of registration requests a client can send per second.
# Defaults to 1/minute (0.17).
#
#rc_registration_requests_per_second: 0.17
# Number of registration requests a client can send before being
# throttled.
# Defaults to 3.
#
#rc_registration_request_burst_count: 3.0
#federation_rc_concurrent: 3
"""

View File

@ -24,7 +24,7 @@ class RegistrationConfig(Config):
def read_config(self, config):
self.enable_registration = bool(
strtobool(str(config["enable_registration"]))
strtobool(str(config.get("enable_registration", False)))
)
if "disable_registration" in config:
self.enable_registration = not bool(
@ -36,7 +36,10 @@ class RegistrationConfig(Config):
self.registration_shared_secret = config.get("registration_shared_secret")
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
self.trusted_third_party_id_servers = config.get(
"trusted_third_party_id_servers",
["matrix.org", "vector.im"],
)
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
@ -64,11 +67,13 @@ class RegistrationConfig(Config):
return """\
## Registration ##
#
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users.
enable_registration: False
#
#enable_registration: false
# The user must provide all of the below types of 3PID when registering.
#
@ -79,7 +84,7 @@ class RegistrationConfig(Config):
# Explicitly disable asking for MSISDNs from the registration
# flow (overrides registrations_require_3pid if MSISDNs are set as required)
#
#disable_msisdn_registration: True
#disable_msisdn_registration: true
# Mandate that users are only allowed to associate certain formats of
# 3PIDs with accounts on this server.
@ -103,13 +108,13 @@ class RegistrationConfig(Config):
# N.B. that increasing this will exponentially increase the time required
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
#
bcrypt_rounds: 12
#bcrypt_rounds: 12
# Allows users to register as guests without a password/email/etc, and
# participate in rooms hosted on this server which have been made
# accessible to anonymous users.
#
allow_guest_access: False
#allow_guest_access: false
# The identity server which we suggest that clients should use when users log
# in on this server.
@ -125,9 +130,9 @@ class RegistrationConfig(Config):
# Also defines the ID server which will be called when an account is
# deactivated (one will be picked arbitrarily).
#
trusted_third_party_id_servers:
- matrix.org
- vector.im
#trusted_third_party_id_servers:
# - matrix.org
# - vector.im
# Users who register on this homeserver will automatically be joined
# to these rooms
@ -141,7 +146,7 @@ class RegistrationConfig(Config):
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
autocreate_auto_join_rooms: true
#autocreate_auto_join_rooms: true
""" % locals()
def add_arguments(self, parser):

View File

@ -19,6 +19,36 @@ from synapse.util.module_loader import load_module
from ._base import Config, ConfigError
DEFAULT_THUMBNAIL_SIZES = [
{
"width": 32,
"height": 32,
"method": "crop",
}, {
"width": 96,
"height": 96,
"method": "crop",
}, {
"width": 320,
"height": 240,
"method": "scale",
}, {
"width": 640,
"height": 480,
"method": "scale",
}, {
"width": 800,
"height": 600,
"method": "scale"
},
]
THUMBNAIL_SIZE_YAML = """\
# - width: %(width)i
# height: %(height)i
# method: %(method)s
"""
MISSING_NETADDR = (
"Missing netaddr library. This is required for URL preview API."
)
@ -77,9 +107,9 @@ def parse_thumbnail_requirements(thumbnail_sizes):
class ContentRepositoryConfig(Config):
def read_config(self, config):
self.max_upload_size = self.parse_size(config["max_upload_size"])
self.max_image_pixels = self.parse_size(config["max_image_pixels"])
self.max_spider_size = self.parse_size(config["max_spider_size"])
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
self.media_store_path = self.ensure_directory(config["media_store_path"])
@ -139,9 +169,9 @@ class ContentRepositoryConfig(Config):
)
self.uploads_path = self.ensure_directory(config["uploads_path"])
self.dynamic_thumbnails = config["dynamic_thumbnails"]
self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements(
config["thumbnail_sizes"]
config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES),
)
self.url_preview_enabled = config.get("url_preview_enabled", False)
if self.url_preview_enabled:
@ -178,6 +208,13 @@ class ContentRepositoryConfig(Config):
def default_config(self, data_dir_path, **kwargs):
media_store = os.path.join(data_dir_path, "media_store")
uploads_path = os.path.join(data_dir_path, "uploads")
formatted_thumbnail_sizes = "".join(
THUMBNAIL_SIZE_YAML % s for s in DEFAULT_THUMBNAIL_SIZES
)
# strip final NL
formatted_thumbnail_sizes = formatted_thumbnail_sizes[:-1]
return r"""
# Directory where uploaded images and attachments are stored.
#
@ -204,11 +241,11 @@ class ContentRepositoryConfig(Config):
# The largest allowed upload size in bytes
#
max_upload_size: "10M"
#max_upload_size: 10M
# Maximum number of pixels that will be thumbnailed
#
max_image_pixels: "32M"
#max_image_pixels: 32M
# Whether to generate new thumbnails on the fly to precisely match
# the resolution requested by the client. If true then whenever
@ -216,32 +253,18 @@ class ContentRepositoryConfig(Config):
# generate a new thumbnail. If false the server will pick a thumbnail
# from a precalculated list.
#
dynamic_thumbnails: false
#dynamic_thumbnails: false
# List of thumbnails to precalculate when an image is uploaded.
#
thumbnail_sizes:
- width: 32
height: 32
method: crop
- width: 96
height: 96
method: crop
- width: 320
height: 240
method: scale
- width: 640
height: 480
method: scale
- width: 800
height: 600
method: scale
#thumbnail_sizes:
%(formatted_thumbnail_sizes)s
# Is the preview URL API enabled? If enabled, you *must* specify
# an explicit url_preview_ip_range_blacklist of IPs that the spider is
# denied from accessing.
#
url_preview_enabled: False
#url_preview_enabled: false
# List of IP address CIDR ranges that the URL preview spider is denied
# from accessing. There are no defaults: you must explicitly
@ -306,6 +329,6 @@ class ContentRepositoryConfig(Config):
# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
# The largest allowed URL preview spidering size in bytes
max_spider_size: "10M"
#
#max_spider_size: 10M
""" % locals()

View File

@ -64,7 +64,7 @@ class SAML2Config(Config):
}
def default_config(self, config_dir_path, server_name, **kwargs):
return """
return """\
# Enable SAML2 for registration and login. Uses pysaml2.
#
# `sp_config` is the configuration for the pysaml2 Service Provider.

View File

@ -45,7 +45,7 @@ class ServerConfig(Config):
self.pid_file = self.abspath(config.get("pid_file"))
self.web_client_location = config.get("web_client_location", None)
self.soft_file_limit = config["soft_file_limit"]
self.soft_file_limit = config.get("soft_file_limit", 0)
self.daemonize = config.get("daemonize")
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
@ -307,11 +307,11 @@ class ServerConfig(Config):
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
#
soft_file_limit: 0
#soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
#
use_presence: true
#use_presence: false
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
#

View File

@ -22,7 +22,9 @@ class VoipConfig(Config):
self.turn_shared_secret = config.get("turn_shared_secret")
self.turn_username = config.get("turn_username")
self.turn_password = config.get("turn_password")
self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
self.turn_user_lifetime = self.parse_duration(
config.get("turn_user_lifetime", "1h"),
)
self.turn_allow_guests = config.get("turn_allow_guests", True)
def default_config(self, **kwargs):
@ -45,7 +47,7 @@ class VoipConfig(Config):
# How long generated TURN credentials last
#
turn_user_lifetime: "1h"
#turn_user_lifetime: 1h
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
@ -53,5 +55,5 @@ class VoipConfig(Config):
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
#
turn_allow_guests: True
#turn_allow_guests: True
"""

View File

@ -28,7 +28,7 @@ class WorkerConfig(Config):
if self.worker_app == "synapse.app.homeserver":
self.worker_app = None
self.worker_listeners = config.get("worker_listeners")
self.worker_listeners = config.get("worker_listeners", [])
self.worker_daemonize = config.get("worker_daemonize")
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
@ -48,6 +48,17 @@ class WorkerConfig(Config):
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
self.worker_cpu_affinity = config.get("worker_cpu_affinity")
# This option is really only here to support `--manhole` command line
# argument.
manhole = config.get("worker_manhole")
if manhole:
self.worker_listeners.append({
"port": manhole,
"bind_addresses": ["127.0.0.1"],
"type": "manhole",
"tls": False,
})
if self.worker_listeners:
for listener in self.worker_listeners:
bind_address = listener.pop("bind_address", None)
@ -57,3 +68,18 @@ class WorkerConfig(Config):
bind_addresses.append(bind_address)
elif not bind_addresses:
bind_addresses.append('')
def read_arguments(self, args):
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
# on workers so we also have to override them when command line options
# are specified.
if args.daemonize is not None:
self.worker_daemonize = args.daemonize
if args.log_config is not None:
self.worker_log_config = args.log_config
if args.log_file is not None:
self.worker_log_file = args.log_file
if args.manhole is not None:
self.worker_manhole = args.worker_manhole

View File

@ -46,7 +46,7 @@ logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(object):
"""A drop in replacement for TransactionQueue"""
"""A drop in replacement for FederationSender"""
def __init__(self, hs):
self.server_name = hs.hostname
@ -154,13 +154,13 @@ class FederationRemoteSendQueue(object):
del self.device_messages[key]
def notify_new_events(self, current_id):
"""As per TransactionQueue"""
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""As per TransactionQueue"""
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
@ -183,8 +183,17 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_read_receipt(self, receipt):
"""As per FederationSender
Args:
receipt (synapse.types.ReadReceipt):
"""
# nothing to do here: the replication listener will handle it.
pass
def send_presence(self, states):
"""As per TransactionQueue
"""As per FederationSender
Args:
states (list(UserPresenceState))
@ -201,7 +210,7 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
"""As per TransactionQueue"""
"""As per FederationSender"""
pos = self._next_pos()
self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
@ -439,7 +448,7 @@ def process_rows_for_federation(transaction_queue, rows):
transaction queue ready for sending to the relevant homeservers.
Args:
transaction_queue (TransactionQueue)
transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
"""

View File

@ -0,0 +1,388 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from six import itervalues
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import logcontext
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
"Total number of PDUs queued for sending across all destinations",
)
class FederationSender(object):
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
self._transaction_manager = TransactionManager(hs)
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: dict[str, PerDestinationQueue]
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: sum(
1 for d in self._per_destination_queues.values()
if d.transmission_loop_running
),
)
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
),
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
),
)
self._order = 1
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
def _get_per_destination_queue(self, destination):
queue = self._per_destination_queues.get(destination)
if not queue:
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
self._per_destination_queues[destination] = queue
return queue
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
"""
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
return
# fire off a processing loop in the background
run_as_background_process(
"process_event_queue_for_federation",
self._process_event_queue_loop,
)
@defer.inlineCallbacks
def _process_event_queue_loop(self):
try:
self._is_processing = True
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
if not events and next_token >= self._last_poked_id:
break
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=event.prev_event_ids(),
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
destinations = set(destinations)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in itervalues(events_by_room)
],
consumeErrors=True
))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
order = self._order
self._order += 1
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)
@defer.inlineCallbacks
def send_read_receipt(self, receipt):
"""Send a RR to any other servers in the room
Args:
receipt (synapse.types.ReadReceipt): receipt to be sent
"""
# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
domains = [d for d in domains if d != self.server_name]
if not domains:
return
logger.debug("Sending receipt to: %r", domains)
content = {
receipt.room_id: {
receipt.receipt_type: {
receipt.user_id: {
"event_ids": receipt.event_ids,
"data": receipt.data,
},
},
},
}
key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
for domain in domains:
self.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content=content,
key=key,
)
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""Construct an Edu object, and queue it for sending
Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
self.send_edu(edu, key)
def send_edu(self, edu, key):
"""Queue an EDU for sending
Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
"""
queue = self._get_per_destination_queue(edu.destination)
if key:
queue.send_keyed_edu(edu, key)
else:
queue.send_edu(edu)
def send_device_messages(self, destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
def get_current_token(self):
return 0

View File

@ -0,0 +1,318 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
)
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
logger = logging.getLogger(__name__)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
)
class PerDestinationQueue(object):
"""
Manages the per-destination transmission queues.
Args:
hs (synapse.HomeServer):
transaction_sender (TransactionManager):
destination (str): the server_name of the destination that we are managing
transmission for.
"""
def __init__(self, hs, transaction_manager, destination):
self._server_name = hs.hostname
self._clock = hs.get_clock()
self._store = hs.get_datastore()
self._transaction_manager = transaction_manager
self._destination = destination
self.transmission_loop_running = False
# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: list[tuple[EventBase, int]]
self._pending_edus = [] # type: list[Edu]
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self._last_device_stream_id = 0
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
def pending_pdu_count(self):
return len(self._pending_pdus)
def pending_edu_count(self):
return (
len(self._pending_edus)
+ len(self._pending_presence)
+ len(self._pending_edus_keyed)
)
def send_pdu(self, pdu, order):
"""Add a PDU to the queue, and start the transmission loop if neccessary
Args:
pdu (EventBase): pdu to send
order (int):
"""
self._pending_pdus.append((pdu, order))
self.attempt_new_transaction()
def send_presence(self, states):
"""Add presence updates to the queue. Start the transmission loop if neccessary.
Args:
states (iterable[UserPresenceState]): presence to send
"""
self._pending_presence.update({
state.user_id: state for state in states
})
self.attempt_new_transaction()
def send_keyed_edu(self, edu, key):
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
def send_edu(self, edu):
self._pending_edus.append(edu)
self.attempt_new_transaction()
def attempt_new_transaction(self):
"""Try to start a new transaction to this destination
If there is already a transaction in progress to this destination,
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
# list of (pending_pdu, deferred, order)
if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
self._destination
)
return
logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
)
@defer.inlineCallbacks
def _transaction_transmission_loop(self):
pending_pdus = []
try:
self.transmission_loop_running = True
# This will throw if we wouldn't retry. We do this here so we fail
# quickly, but we will later check this again in the http client,
# hence why we throw the result away.
yield get_retry_limiter(self._destination, self._clock, self._store)
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages()
)
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
pending_edus = self._pending_edus
# We can only include at most 100 EDUs per transactions
pending_edus, self._pending_edus = pending_edus[:100], pending_edus[100:]
pending_edus.extend(
self._pending_edus_keyed.values()
)
self._pending_edus_keyed = {}
pending_edus.extend(device_message_edus)
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination, len(pending_pdus))
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now",
self._destination,
datetime.datetime.fromtimestamp(
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
logger.warning(
"TX [%s] Received %d response to transaction: %s",
self._destination, e.code, e,
)
except RequestSendFailed as e:
logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
self._destination)
except Exception:
logger.exception(
"TX [%s] Failed to send transaction",
self._destination,
)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
self._destination)
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
@defer.inlineCallbacks
def _get_new_device_messages(self):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.direct_to_device",
content=content,
)
for content in contents
]
last_device_list = self._last_device_list_stream_id
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list
)
edus.extend(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.device_list_update",
content=content,
)
for content in results
)
defer.returnValue((edus, stream_id, now_stream_id))

View File

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
class TransactionManager(object):
"""Helper class which handles building and sending transactions
shared between PerDestinationQueue objects
"""
def __init__(self, hs):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastore()
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d)",
destination, txn_id,
len(pdus),
len(edus),
)
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
self._next_txn_id += 1
yield self._transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
raise e
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
yield self._transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
)
success = False
defer.returnValue(success)

View File

@ -1,716 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
from six import itervalues
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
)
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
sent_transactions_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from .persistence import TransactionActions
from .units import Edu, Transaction
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
"Total number of PDUs queued for sending across all destinations",
)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
)
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
a time for a given destination.
It batches pending PDUs into single transactions.
"""
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.transaction_actions = TransactionActions(self.store)
self.transport_layer = hs.get_federation_transport_client()
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
# Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are
# done
self.pending_transactions = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: len(self.pending_transactions),
)
# Is a mapping from destination -> list of
# tuple(pending pdus, deferred, order)
self.pending_pdus_by_dest = pdus = {}
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
# Map of destination -> user_id -> UserPresenceState of pending presence
# to be sent to each destinations
self.pending_presence_by_dest = presence = {}
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(map(len, pdus.values())),
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
+ sum(map(len, edus_keyed.values()))
),
)
# destination -> stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self.last_device_stream_id_by_dest = {}
# destination -> stream_id of last successfully sent device list
# update.
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
self._order = 1
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
"""
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
return
# fire off a processing loop in the background
run_as_background_process(
"process_event_queue_for_federation",
self._process_event_queue_loop,
)
@defer.inlineCallbacks
def _process_event_queue_loop(self):
try:
self._is_processing = True
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
if not events and next_token >= self._last_poked_id:
break
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=event.prev_event_ids(),
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
destinations = set(destinations)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in itervalues(events_by_room)
],
consumeErrors=True
))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
order = self._order
self._order += 1
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, order)
)
self._attempt_new_transaction(destination)
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
self.pending_presence_by_dest.setdefault(
destination, {}
).update({
state.user_id: state for state in states
})
self._attempt_new_transaction(destination)
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""Construct an Edu object, and queue it for sending
Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
self.send_edu(edu, key)
def send_edu(self, edu, key):
"""Queue an EDU for sending
Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
"""
if key:
self.pending_edus_keyed_by_dest.setdefault(
edu.destination, {}
)[(edu.edu_type, key)] = edu
else:
self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)
self._attempt_new_transaction(edu.destination)
def send_device_messages(self, destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
return
self._attempt_new_transaction(destination)
def get_current_token(self):
return 0
def _attempt_new_transaction(self, destination):
"""Try to start a new transaction to this destination
If there is already a transaction in progress to this destination,
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
Args:
destination (str):
Returns:
None
"""
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
destination
)
return
logger.debug("TX [%s] Starting transaction loop", destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
destination,
)
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
pending_pdus = []
try:
self.pending_transactions[destination] = 1
# This will throw if we wouldn't retry. We do this here so we fail
# quickly, but we will later check this again in the http client,
# hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store)
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# pending_transactions flag.
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = (
device_stream_id
)
return
# END CRITICAL SECTION
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus,
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now",
destination,
datetime.datetime.fromtimestamp(
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
logger.warning(
"TX [%s] Received %d response to transaction: %s",
destination, e.code, e,
)
except RequestSendFailed as e:
logger.warning("TX [%s] Failed to send transaction: %s", destination, e)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
except Exception:
logger.exception(
"TX [%s] Failed to send transaction",
destination,
)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
to_device_stream_id = self.store.get_to_device_stream_token()
contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
destination, last_device_stream_id, to_device_stream_id
)
edus = [
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.direct_to_device",
content=content,
)
for content in contents
]
last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
now_stream_id, results = yield self.store.get_devices_by_remote(
destination, last_device_list
)
edus.extend(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.device_list_update",
content=content,
)
for content in results
)
defer.returnValue((edus, stream_id, now_stream_id))
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d)",
destination, txn_id,
len(pdus),
len(edus),
)
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
raise e
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
yield self.transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
)
success = False
defer.returnValue(success)

View File

@ -35,6 +35,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
login_types.append(t)
self._supported_login_types = login_types
self._account_ratelimiter = Ratelimiter()
self._failed_attempts_ratelimiter = Ratelimiter()
self._clock = self.hs.get_clock()
@defer.inlineCallbacks
def validate_user_via_ui_auth(self, requester, request_body, clientip):
"""
@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
Returns:
defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
Raises:
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
"""
self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None:
defer.returnValue(res[0])
@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request
LoginError if there was an authentication problem.
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
"""
if username.startswith('@'):
@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
username, self.hs.hostname
).to_string()
self.ratelimit_login_per_account(qualified_user_id)
login_type = login_submission.get("type")
known_login_type = False
@ -715,9 +730,16 @@ class AuthHandler(BaseHandler):
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
# unknown username or invalid password. We raise a 403 here, but note
# that if we're doing user-interactive login, it turns all LoginErrors
# into a 401 anyway.
# unknown username or invalid password.
self._failed_attempts_ratelimiter.ratelimit(
qualified_user_id.lower(), time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=True,
)
# We raise a 403 here, but note that if we're doing user-interactive
# login, it turns all LoginErrors into a 401 anyway.
raise LoginError(
403, "Invalid password",
errcode=Codes.FORBIDDEN
@ -735,6 +757,10 @@ class AuthHandler(BaseHandler):
password (unicode): the provided password
Returns:
(unicode) the canonical_user_id, or None if unknown user / bad password
Raises:
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@ -763,6 +789,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
self.ratelimit_login_per_account(user_id)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@ -934,6 +961,33 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
def ratelimit_login_per_account(self, user_id):
"""Checks whether the process must be stopped because of ratelimiting.
Checks against two ratelimiters: the generic one for login attempts per
account and the one specific to failed attempts.
Args:
user_id (unicode): complete @user:id
Raises:
LimitExceededError if one of the ratelimiters' login requests count
for this user is too high too proceed.
"""
self._failed_attempts_ratelimiter.ratelimit(
user_id.lower(), time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=False,
)
self._account_ratelimiter.ratelimit(
user_id.lower(), time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_account.per_second,
burst_count=self.hs.config.rc_login_account.burst_count,
update=True,
)
@attr.s
class MacaroonGenerator(object):

View File

@ -243,7 +243,14 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
if self.config.block_events_without_consent_error is not None:
self._block_events_without_consent_error = (
self.config.block_events_without_consent_error
)
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
@defer.inlineCallbacks
@ -378,7 +385,7 @@ class EventCreationHandler(object):
Raises:
ConsentNotGivenError: if the user has not given consent yet
"""
if self.config.block_events_without_consent_error is None:
if self._block_events_without_consent_error is None:
return
# exempt AS users from needing consent
@ -405,7 +412,7 @@ class EventCreationHandler(object):
consent_uri = self._consent_uri_builder.build_user_consent_uri(
requester.user.localpart,
)
msg = self.config.block_events_without_consent_error % {
msg = self._block_events_without_consent_error % {
'consent_uri': consent_uri,
}
raise ConsentNotGivenError(

View File

@ -16,9 +16,8 @@ import logging
from twisted.internet import defer
from synapse.types import get_domain_from_id
from ._base import BaseHandler
from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = [
{
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": user_values["event_ids"],
"data": user_values.get("data", {}),
}
ReadReceipt(
room_id=room_id,
receipt_type=receipt_type,
user_id=user_id,
event_ids=user_values["event_ids"],
data=user_values.get("data", {}),
)
for room_id, room_values in content.items()
for receipt_type, users in room_values.items()
for user_id, user_values in users.items()
@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
max_batch_id = None
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
res = yield self.store.insert_receipt(
room_id, receipt_type, user_id, event_ids, data
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.data,
)
if not res:
@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
affected_room_ids = list(set([r["room_id"] for r in receipts]))
affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
receipt = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": [event_id],
"data": {
receipt = ReadReceipt(
room_id=room_id,
receipt_type=receipt_type,
user_id=user_id,
event_ids=[event_id],
data={
"ts": int(self.clock.time_msec()),
}
}
},
)
is_new = yield self._handle_new_receipts([receipt])
if not is_new:
return
# Work out which remote servers should be poked and poke them.
# TODO: optimise this to move some of the work to the workers.
data = receipt["data"]
# XXX why does this not use state.get_current_hosts_in_room() ?
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": [event_id],
"data": data,
}
}
},
},
key=(room_id, receipt_type, user_id),
)
self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):

View File

@ -23,6 +23,7 @@ from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
InvalidCaptchaError,
LimitExceededError,
RegistrationError,
@ -311,6 +312,10 @@ class RegistrationHandler(BaseHandler):
)
else:
yield self._join_user_to_room(fake_requester, r)
except ConsentNotGivenError as e:
# Technically not necessary to pull out this error though
# moving away from bare excepts is a good thing to do.
logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
@ -629,8 +634,8 @@ class RegistrationHandler(BaseHandler):
allowed, time_allowed = self.ratelimiter.can_do_action(
address, time_now_s=time_now,
rate_hz=self.hs.config.rc_registration_requests_per_second,
burst_count=self.hs.config.rc_registration_request_burst_count,
rate_hz=self.hs.config.rc_registration.per_second,
burst_count=self.hs.config.rc_registration.burst_count,
)
if not allowed:

View File

@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
For each user in the directory we also store a room_id which is public and that the
user is joined to. This allows us to ignore history_visibility and join_rules changes
for that user in all other public rooms, as we know they'll still be in at least
one public room.
"""
INITIAL_ROOM_SLEEP_MS = 50
INITIAL_ROOM_SLEEP_COUNT = 100
INITIAL_ROOM_BATCH_SIZE = 100
INITIAL_USER_SLEEP_MS = 10
def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@ -59,17 +49,6 @@ class UserDirectoryHandler(object):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
# If we're a worker, don't sleep when doing the initial room work, as it
# won't monopolise the master's CPU.
if hs.config.worker_app:
self.INITIAL_ROOM_SLEEP_MS = 0
self.INITIAL_USER_SLEEP_MS = 0
# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()
# The current position in the current_state_delta stream
self.pos = None
@ -132,7 +111,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None
user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
@ -149,10 +128,9 @@ class UserDirectoryHandler(object):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
# If still None then we need to do the initial fill of directory
# If still None then the initial background update hasn't happened yet
if self.pos is None:
yield self._do_initial_spam()
self.pos = yield self.store.get_user_directory_stream_pos()
defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
@ -173,133 +151,6 @@ class UserDirectoryHandler(object):
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
def _do_initial_spam(self):
"""Populates the user_directory from the current state of the DB, used
when synapse first starts with user_directory support
"""
new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
# Delete any existing entries just in case there are any
yield self.store.delete_all_from_user_dir()
# We process by going through each existing room at a time.
room_ids = yield self.store.get_all_rooms()
logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
num_processed_rooms = 0
for room_id in room_ids:
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
logger.info("Processed all rooms.")
if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
logger.info(
"Doing initial update of user directory. %d users", len(user_ids)
)
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
logger.info(
"Handling user %d/%d", num_processed_users + 1, len(user_ids)
)
yield self._handle_local_user(user_id)
num_processed_users += 1
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
logger.info("Processed all users")
self.initially_handled_users = None
yield self.store.update_user_directory_stream_pos(new_pos)
@defer.inlineCallbacks
def _handle_initial_room(self, room_id):
"""
Called when we initially fill out user_directory one room at a time
"""
is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
if not is_in_room:
return
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
{user_id: users_with_profile[user_id] for user_id in unhandled_users}
)
self.initially_handled_users |= unhandled_users
# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
# We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set()
count = 0
if is_public:
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
if self.store.get_if_app_services_interested_in_user(user_id):
count += 1
continue
to_insert.add(user_id)
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_in_public_rooms(room_id, to_insert)
to_insert.clear()
if to_insert:
yield self.store.add_users_in_public_rooms(room_id, to_insert)
to_insert.clear()
else:
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
if not self.is_mine_id(user_id):
count += 1
continue
if self.store.get_if_app_services_interested_in_user(user_id):
count += 1
continue
for other_user_id in user_ids:
if user_id == other_user_id:
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
count += 1
user_set = (user_id, other_user_id)
to_insert.add(user_set)
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_private_room(
room_id, not is_public, to_insert
)
to_insert.clear()
if to_insert:
yield self.store.add_users_who_share_private_room(room_id, to_insert)
to_insert.clear()
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
@ -449,7 +300,9 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir({user_id: profile})
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@ -461,9 +314,9 @@ class UserDirectoryHandler(object):
"""
logger.debug("Adding new user to dir, %r", user_id)
row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir({user_id: profile})
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
@ -479,7 +332,9 @@ class UserDirectoryHandler(object):
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id):
is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
is_appservice = self.store.get_if_app_services_interested_in_user(
user_id
)
# We don't care about appservice users.
if not is_appservice:
@ -546,9 +401,7 @@ class UserDirectoryHandler(object):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(
user_id, new_name, new_avatar, room_id
)
yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):

View File

@ -22,6 +22,7 @@ from twisted.internet import defer
from twisted.web.client import PartialDownloadError
from synapse.api.errors import Codes, LoginError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.http.server import finish_request
from synapse.http.servlet import (
RestServlet,
@ -97,6 +98,7 @@ class LoginRestServlet(ClientV1RestServlet):
self.registration_handler = hs.get_registration_handler()
self.handlers = hs.get_handlers()
self._well_known_builder = WellKnownBuilder(hs)
self._address_ratelimiter = Ratelimiter()
def on_GET(self, request):
flows = []
@ -129,6 +131,13 @@ class LoginRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
self._address_ratelimiter.ratelimit(
request.getClientIP(), time_now_s=self.hs.clock.time(),
rate_hz=self.hs.config.rc_login_address.per_second,
burst_count=self.hs.config.rc_login_address.burst_count,
update=True,
)
login_submission = parse_json_object_from_request(request)
try:
if self.jwt_enabled and (login_submission["type"] ==
@ -285,6 +294,7 @@ class LoginRestServlet(ClientV1RestServlet):
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user_id = UserID(user, self.hs.hostname).to_string()
auth_handler = self.auth_handler
registered_user_id = yield auth_handler.check_user_exists(user_id)
if registered_user_id:

View File

@ -210,8 +210,8 @@ class RegisterRestServlet(RestServlet):
allowed, time_allowed = self.ratelimiter.can_do_action(
client_addr, time_now_s=time_now,
rate_hz=self.hs.config.rc_registration_requests_per_second,
burst_count=self.hs.config.rc_registration_request_burst_count,
rate_hz=self.hs.config.rc_registration.per_second,
burst_count=self.hs.config.rc_registration.burst_count,
update=False,
)

View File

@ -42,7 +42,7 @@ from synapse.federation.federation_server import (
ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.transaction_queue import TransactionQueue
from synapse.federation.sender import FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
from synapse.groups.groups_server import GroupsServerHandler
@ -434,7 +434,7 @@ class HomeServer(object):
def build_federation_sender(self):
if self.should_send_federation():
return TransactionQueue(self)
return FederationSender(self)
elif not self.config.worker_app:
return FederationRemoteSendQueue(self)
else:

View File

@ -1,5 +1,6 @@
import synapse.api.auth
import synapse.config.homeserver
import synapse.federation.sender
import synapse.federation.transaction_queue
import synapse.federation.transport.client
import synapse.handlers
@ -62,7 +63,7 @@ class HomeServer(object):
def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
pass
def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
def get_federation_sender(self) -> synapse.federation.sender.FederationSender:
pass
def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:

View File

@ -52,7 +52,9 @@ class BackgroundUpdatePerformance(object):
Returns:
A duration in ms as a float
"""
if self.total_item_count == 0:
if self.avg_duration_ms == 0:
return 0
elif self.total_item_count == 0:
return None
else:
# Use the exponential moving average so that we can adapt to
@ -64,7 +66,9 @@ class BackgroundUpdatePerformance(object):
Returns:
A duration in ms as a float
"""
if self.total_item_count == 0:
if self.total_duration_ms == 0:
return 0
elif self.total_item_count == 0:
return None
else:
return float(self.total_item_count) / float(self.total_duration_ms)

View File

@ -0,0 +1,30 @@
/* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Set up staging tables
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_user_directory_createtables', '{}');
-- Run through each room and update the user directory according to who is in it
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables');
-- Insert all users, if search_all_users is on
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms');
-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users');

View File

@ -16,12 +16,10 @@
import logging
import re
from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id
@ -30,7 +28,276 @@ from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
class UserDirectoryStore(SQLBaseStore):
TEMP_TABLE = "_temp_populate_user_directory"
class UserDirectoryStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(UserDirectoryStore, self).__init__(db_conn, hs)
self.server_name = hs.hostname
self.register_background_update_handler(
"populate_user_directory_createtables",
self._populate_user_directory_createtables,
)
self.register_background_update_handler(
"populate_user_directory_process_rooms",
self._populate_user_directory_process_rooms,
)
self.register_background_update_handler(
"populate_user_directory_process_users",
self._populate_user_directory_process_users,
)
self.register_background_update_handler(
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
)
@defer.inlineCallbacks
def _populate_user_directory_createtables(self, progress, batch_size):
# Get all the rooms that we want to process.
def _make_staging_area(txn):
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
# Get rooms we want to process from the database
sql = """
SELECT room_id, count(*) FROM current_state_events
GROUP BY room_id
"""
txn.execute(sql)
rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
del rooms
# If search all users is on, get all the users we want to add.
if self.hs.config.user_directory_search_all_users:
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_users(user_id TEXT NOT NULL)"
)
txn.execute(sql)
txn.execute("SELECT name FROM users")
users = [{"user_id": x[0]} for x in txn.fetchall()]
self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction(
"populate_user_directory_temp_build", _make_staging_area
)
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
yield self._end_background_update("populate_user_directory_createtables")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_user_directory_cleanup(self, progress, batch_size):
"""
Update the user directory stream position, then clean up the old tables.
"""
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
yield self.update_user_directory_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
yield self.runInteraction(
"populate_user_directory_cleanup", _delete_staging_area
)
yield self._end_background_update("populate_user_directory_cleanup")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_user_directory_process_rooms(self, progress, batch_size):
state = self.hs.get_state_handler()
# If we don't have progress filed, delete everything.
if not progress:
yield self.delete_all_from_user_dir()
def _get_next_batch(txn):
sql = """
SELECT room_id FROM %s
ORDER BY events DESC
LIMIT %s
""" % (
TEMP_TABLE + "_rooms",
str(batch_size),
)
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
if not rooms_to_work_on:
return None
rooms_to_work_on = [x[0] for x in rooms_to_work_on]
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_user_directory_process_rooms")
defer.returnValue(1)
logger.info(
"Processing the next %d rooms of %d remaining"
% (len(rooms_to_work_on), progress["remaining"])
)
for room_id in rooms_to_work_on:
is_in_room = yield self.is_host_joined(room_id, self.server_name)
if is_in_room:
is_public = yield self.is_room_world_readable_or_publicly_joinable(
room_id
)
users_with_profile = yield state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
# Update each user in the user directory.
for user_id, profile in users_with_profile.items():
yield self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
to_insert = set()
if is_public:
for user_id in user_ids:
if self.get_if_app_services_interested_in_user(user_id):
continue
to_insert.add(user_id)
if to_insert:
yield self.add_users_in_public_rooms(room_id, to_insert)
to_insert.clear()
else:
for user_id in user_ids:
if not self.hs.is_mine_id(user_id):
continue
if self.get_if_app_services_interested_in_user(user_id):
continue
for other_user_id in user_ids:
if user_id == other_user_id:
continue
user_set = (user_id, other_user_id)
to_insert.add(user_set)
if to_insert:
yield self.add_users_who_share_private_room(room_id, to_insert)
to_insert.clear()
# We've finished a room. Delete it from the table.
yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_user_directory",
self._background_update_progress_txn,
"populate_user_directory_process_rooms",
progress,
)
defer.returnValue(len(rooms_to_work_on))
@defer.inlineCallbacks
def _populate_user_directory_process_users(self, progress, batch_size):
"""
If search_all_users is enabled, add all of the users to the user directory.
"""
if not self.hs.config.user_directory_search_all_users:
yield self._end_background_update("populate_user_directory_process_users")
defer.returnValue(1)
def _get_next_batch(txn):
sql = "SELECT user_id FROM %s LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
users_to_work_on = txn.fetchall()
if not users_to_work_on:
return None
users_to_work_on = [x[0] for x in users_to_work_on]
# Get how many are left to process, so we can give status on how
# far we are in processing
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
txn.execute(sql)
progress["remaining"] = txn.fetchone()[0]
return users_to_work_on
users_to_work_on = yield self.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
# No more users -- complete the transaction.
if not users_to_work_on:
yield self._end_background_update("populate_user_directory_process_users")
defer.returnValue(1)
logger.info(
"Processing the next %d users of %d remaining"
% (len(users_to_work_on), progress["remaining"])
)
for user_id in users_to_work_on:
profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
yield self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
# We've finished processing a user. Delete it from the table.
yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_user_directory",
self._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)
defer.returnValue(len(users_to_work_on))
@defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
@ -62,89 +329,16 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(False)
def add_profiles_to_user_dir(self, users_with_profile):
"""Add profiles to the user directory
Args:
users_with_profile (dict): Users to add to directory in the form of
mapping of user_id -> ProfileInfo
def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
"""
Update or add a user's profile in the user directory.
"""
if isinstance(self.database_engine, PostgresEngine):
# We weight the loclpart most highly, then display name and finally
# server name
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
)
"""
args = (
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
profile.display_name,
)
for user_id, profile in iteritems(users_with_profile)
)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = """
INSERT INTO user_directory_search(user_id, value)
VALUES (?,?)
"""
args = tuple(
(
user_id,
"%s %s" % (user_id, p.display_name) if p.display_name else user_id,
)
for user_id, p in iteritems(users_with_profile)
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
def _add_profiles_to_user_dir_txn(txn):
txn.executemany(sql, args)
self._simple_insert_many_txn(
txn,
table="user_directory",
values=[
{
"user_id": user_id,
"room_id": None,
"display_name": profile.display_name,
"avatar_url": profile.avatar_url,
}
for user_id, profile in iteritems(users_with_profile)
],
)
for user_id in users_with_profile:
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
return self.runInteraction(
"add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
)
@defer.inlineCallbacks
def update_user_in_user_dir(self, user_id, room_id):
yield self._simple_update_one(
table="user_directory",
keyvalues={"user_id": user_id},
updatevalues={"room_id": room_id},
desc="update_user_in_user_dir",
)
self.get_user_in_directory.invalidate((user_id,))
def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
def _update_profile_in_user_dir_txn(txn):
new_entry = self._simple_upsert_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
insertion_values={"room_id": room_id},
values={"display_name": display_name, "avatar_url": avatar_url},
lock=False, # We're only inserter
)
@ -281,18 +475,6 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(user_ids)
@defer.inlineCallbacks
def get_all_rooms(self):
"""Get all room_ids we've ever known about, in ascending order of "size"
"""
sql = """
SELECT room_id FROM current_state_events
GROUP BY room_id
ORDER BY count(*) ASC
"""
rows = yield self._execute("get_all_rooms", None, sql)
defer.returnValue([room_id for room_id, in rows])
@defer.inlineCallbacks
def get_all_local_users(self):
"""Get all local users
@ -553,8 +735,8 @@ class UserDirectoryStore(SQLBaseStore):
"""
if self.hs.config.user_directory_search_all_users:
join_args = ()
where_clause = "1=1"
join_args = (user_id,)
where_clause = "user_id != ?"
else:
join_args = (user_id,)
where_clause = """

View File

@ -16,6 +16,8 @@ import re
import string
from collections import namedtuple
import attr
from synapse.api.errors import SynapseError
@ -455,3 +457,13 @@ class ThirdPartyInstanceID(
@classmethod
def create(cls, appservice_id, network_id,):
return cls(appservice_id=appservice_id, network_id=network_id)
@attr.s(slots=True)
class ReadReceipt(object):
"""Information about a read-receipt"""
room_id = attr.ib()
receipt_type = attr.ib()
user_id = attr.ib()
event_ids = attr.ib()
data = attr.ib()

View File

@ -187,12 +187,32 @@ class RegistrationTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_auto_create_auto_join_where_no_consent(self):
self.hs.config.user_consent_at_registration = True
self.hs.config.block_events_without_consent_error = "Error"
"""Test to ensure that the first user is not auto-joined to a room if
they have not given general consent.
"""
# Given:-
# * a user must give consent,
# * they have not given that consent
# * The server is configured to auto-join to a room
# (and autocreate if necessary)
event_creation_handler = self.hs.get_event_creation_handler()
# (Messing with the internals of event_creation_handler is fragile
# but can't see a better way to do this. One option could be to subclass
# the test with custom config.)
event_creation_handler._block_events_without_consent_error = ("Error")
event_creation_handler._consent_uri_builder = Mock()
room_alias_str = "#room:test"
self.hs.config.auto_join_rooms = [room_alias_str]
# When:-
# * the user is registered and post consent actions are called
res = yield self.handler.register(localpart='jeff')
yield self.handler.post_consent_actions(res[0])
# Then:-
# * Ensure that they have not been joined to the room
rooms = yield self.store.get_rooms_for_user(res[0])
self.assertEqual(len(rooms), 0)

View File

@ -163,9 +163,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
def get_users_in_public_rooms(self):
r = self.get_success(
self.store._simple_select_list(
"users_in_public_rooms",
None,
("user_id", "room_id"),
"users_in_public_rooms", None, ("user_id", "room_id")
)
)
retval = []
@ -182,6 +180,53 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
)
)
def _add_background_updates(self):
"""
Add the background updates we need to run.
"""
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_user_directory_createtables",
"progress_json": "{}",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_user_directory_process_rooms",
"progress_json": "{}",
"depends_on": "populate_user_directory_createtables",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_user_directory_process_users",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_rooms",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_user_directory_cleanup",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_users",
},
)
)
def test_initial(self):
"""
The user directory's initial handler correctly updates the search tables.
@ -211,26 +256,17 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.assertEqual(shares_private, [])
self.assertEqual(public_users, [])
# Reset the handled users caches
self.handler.initially_handled_users = set()
# Do the initial population of the user directory via the background update
self._add_background_updates()
# Do the initial population
d = self.handler._do_initial_spam()
# This takes a while, so pump it a bunch of times to get through the
# sleep delays
for i in range(10):
self.pump(1)
self.get_success(d)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()
# User 1 and User 2 are in the same public room
self.assertEqual(
set(public_users), set([(u1, room), (u2, room)])
)
self.assertEqual(set(public_users), set([(u1, room), (u2, room)]))
# User 1 and User 3 share private rooms
self.assertEqual(
@ -238,7 +274,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
set([(u1, u3, private_room), (u3, u1, private_room)]),
)
def test_search_all_users(self):
def test_initial_share_all_users(self):
"""
Search all users = True means that a user does not have to share a
private room with the searching user or be in a public room to be search
@ -248,33 +284,36 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.hs.config.user_directory_search_all_users = True
u1 = self.register_user("user1", "pass")
u1_token = self.login(u1, "pass")
u2 = self.register_user("user2", "pass")
u2_token = self.login(u2, "pass")
self.register_user("user2", "pass")
u3 = self.register_user("user3", "pass")
# User 1 and User 2 join a room. User 3 never does.
room = self.helper.create_room_as(u1, is_public=True, tok=u1_token)
self.helper.invite(room, src=u1, targ=u2, tok=u1_token)
self.helper.join(room, user=u2, tok=u2_token)
# Wipe the user dir
self.get_success(self.store.update_user_directory_stream_pos(None))
self.get_success(self.store.delete_all_from_user_dir())
# Reset the handled users caches
self.handler.initially_handled_users = set()
# Do the initial population of the user directory via the background update
self._add_background_updates()
# Do the initial population
d = self.handler._do_initial_spam()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
# This takes a while, so pump it a bunch of times to get through the
# sleep delays
for i in range(10):
self.pump(1)
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()
self.get_success(d)
# No users share rooms
self.assertEqual(public_users, [])
self.assertEqual(self._compress_shared(shares_private), set([]))
# Despite not sharing a room, search_all_users means we get a search
# result.
s = self.get_success(self.handler.search_users(u1, u3, 10))
self.assertEqual(len(s["results"]), 1)
# We can find the other two users
s = self.get_success(self.handler.search_users(u1, "user", 10))
self.assertEqual(len(s["results"]), 2)
# Registering a user and then searching for them works.
u4 = self.register_user("user4", "pass")
s = self.get_success(self.handler.search_users(u1, u4, 10))
self.assertEqual(len(s["results"]), 1)

View File

@ -0,0 +1,163 @@
import json
from synapse.rest.client.v1 import admin, login
from tests import unittest
LOGIN_URL = b"/_matrix/client/r0/login"
class LoginRestServletTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
]
def make_homeserver(self, reactor, clock):
self.hs = self.setup_test_homeserver()
self.hs.config.enable_registration = True
self.hs.config.registrations_require_3pid = []
self.hs.config.auto_join_rooms = []
self.hs.config.enable_registration_captcha = False
return self.hs
def test_POST_ratelimiting_per_address(self):
self.hs.config.rc_login_address.burst_count = 5
self.hs.config.rc_login_address.per_second = 0.17
# Create different users so we're sure not to be bothered by the per-user
# ratelimiter.
for i in range(0, 6):
self.register_user("kermit" + str(i), "monkey")
for i in range(0, 6):
params = {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "kermit" + str(i),
},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
self.render(request)
if i == 5:
self.assertEquals(channel.result["code"], b"429", channel.result)
retry_after_ms = int(channel.json_body["retry_after_ms"])
else:
self.assertEquals(channel.result["code"], b"200", channel.result)
# Since we're ratelimiting at 1 request/min, retry_after_ms should be lower
# than 1min.
self.assertTrue(retry_after_ms < 6000)
self.reactor.advance(retry_after_ms / 1000.)
params = {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "kermit" + str(i),
},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)
self.assertEquals(channel.result["code"], b"200", channel.result)
def test_POST_ratelimiting_per_account(self):
self.hs.config.rc_login_account.burst_count = 5
self.hs.config.rc_login_account.per_second = 0.17
self.register_user("kermit", "monkey")
for i in range(0, 6):
params = {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "kermit",
},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
self.render(request)
if i == 5:
self.assertEquals(channel.result["code"], b"429", channel.result)
retry_after_ms = int(channel.json_body["retry_after_ms"])
else:
self.assertEquals(channel.result["code"], b"200", channel.result)
# Since we're ratelimiting at 1 request/min, retry_after_ms should be lower
# than 1min.
self.assertTrue(retry_after_ms < 6000)
self.reactor.advance(retry_after_ms / 1000.)
params = {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "kermit",
},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)
self.assertEquals(channel.result["code"], b"200", channel.result)
def test_POST_ratelimiting_per_account_failed_attempts(self):
self.hs.config.rc_login_failed_attempts.burst_count = 5
self.hs.config.rc_login_failed_attempts.per_second = 0.17
self.register_user("kermit", "monkey")
for i in range(0, 6):
params = {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "kermit",
},
"password": "notamonkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
self.render(request)
if i == 5:
self.assertEquals(channel.result["code"], b"429", channel.result)
retry_after_ms = int(channel.json_body["retry_after_ms"])
else:
self.assertEquals(channel.result["code"], b"403", channel.result)
# Since we're ratelimiting at 1 request/min, retry_after_ms should be lower
# than 1min.
self.assertTrue(retry_after_ms < 6000)
self.reactor.advance(retry_after_ms / 1000.)
params = {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "kermit",
},
"password": "notamonkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)
self.assertEquals(channel.result["code"], b"403", channel.result)

View File

@ -132,7 +132,8 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
self.assertEquals(channel.json_body["error"], "Guest access is disabled")
def test_POST_ratelimiting_guest(self):
self.hs.config.rc_registration_request_burst_count = 5
self.hs.config.rc_registration.burst_count = 5
self.hs.config.rc_registration.per_second = 0.17
for i in range(0, 6):
url = self.url + b"?kind=guest"
@ -153,7 +154,8 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
self.assertEquals(channel.result["code"], b"200", channel.result)
def test_POST_ratelimiting(self):
self.hs.config.rc_registration_request_burst_count = 5
self.hs.config.rc_registration.burst_count = 5
self.hs.config.rc_registration.per_second = 0.17
for i in range(0, 6):
params = {

View File

@ -16,7 +16,6 @@
from twisted.internet import defer
from synapse.storage import UserDirectoryStore
from synapse.storage.roommember import ProfileInfo
from tests import unittest
from tests.utils import setup_test_homeserver
@ -34,13 +33,9 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
# alice and bob are both in !room_id. bobby is not but shares
# a homeserver with alice.
yield self.store.add_profiles_to_user_dir(
{
ALICE: ProfileInfo(None, "alice"),
BOB: ProfileInfo(None, "bob"),
BOBBY: ProfileInfo(None, "bobby"),
},
)
yield self.store.update_profile_in_user_dir(ALICE, "alice", None)
yield self.store.update_profile_in_user_dir(BOB, "bob", None)
yield self.store.update_profile_in_user_dir(BOBBY, "bobby", None)
yield self.store.add_users_in_public_rooms(
"!room:id", (ALICE, BOB)
)

View File

@ -330,10 +330,10 @@ class HomeserverTestCase(TestCase):
"""
self.reactor.pump([by] * 100)
def get_success(self, d):
def get_success(self, d, by=0.0):
if not isinstance(d, Deferred):
return d
self.pump()
self.pump(by=by)
return self.successResultOf(d)
def register_user(self, username, password, admin=False):

View File

@ -151,8 +151,14 @@ def default_config(name):
config.admin_contact = None
config.rc_messages_per_second = 10000
config.rc_message_burst_count = 10000
config.rc_registration_request_burst_count = 3.0
config.rc_registration_requests_per_second = 0.17
config.rc_registration.per_second = 10000
config.rc_registration.burst_count = 10000
config.rc_login_address.per_second = 10000
config.rc_login_address.burst_count = 10000
config.rc_login_account.per_second = 10000
config.rc_login_account.burst_count = 10000
config.rc_login_failed_attempts.per_second = 10000
config.rc_login_failed_attempts.burst_count = 10000
config.saml2_enabled = False
config.public_baseurl = None
config.default_identity_server = None