Merge branch 'develop' of github.com:matrix-org/synapse into erikj/catchup_on_worker

pull/7024/head
Erik Johnston 2020-03-23 14:49:07 +00:00
commit 4f2a803c66
22 changed files with 410 additions and 248 deletions

View File

@ -1,3 +1,63 @@
Synapse 1.12.0 (2020-03-23)
===========================
No significant changes since 1.12.0rc1.
Debian packages and Docker images are rebuilt using the latest versions of
dependency libraries, including Twisted 20.3.0. **Please see security advisory
below**.
Security advisory
-----------------
Synapse may be vulnerable to request-smuggling attacks when it is used with a
reverse-proxy. The vulnerabilties are fixed in Twisted 20.3.0, and are
described in
[CVE-2020-10108](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-10108)
and
[CVE-2020-10109](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-10109).
For a good introduction to this class of request-smuggling attacks, see
https://portswigger.net/research/http-desync-attacks-request-smuggling-reborn.
We are not aware of these vulnerabilities being exploited in the wild, and
do not believe that they are exploitable with current versions of any reverse
proxies. Nevertheless, we recommend that all Synapse administrators ensure that
they have the latest versions of the Twisted library to ensure that their
installation remains secure.
* Administrators using the [`matrix.org` Docker
image](https://hub.docker.com/r/matrixdotorg/synapse/) or the [Debian/Ubuntu
packages from
`matrix.org`](https://github.com/matrix-org/synapse/blob/master/INSTALL.md#matrixorg-packages)
should ensure that they have version 1.12.0 installed: these images include
Twisted 20.3.0.
* Administrators who have [installed Synapse from
source](https://github.com/matrix-org/synapse/blob/master/INSTALL.md#installing-from-source)
should upgrade Twisted within their virtualenv by running:
```sh
<path_to_virtualenv>/bin/pip install 'Twisted>=20.3.0'
```
* Administrators who have installed Synapse from distribution packages should
consult the information from their distributions.
The `matrix.org` Synapse instance was not vulnerable to these vulnerabilities.
Advance notice of change to the default `git` branch for Synapse
----------------------------------------------------------------
Currently, the default `git` branch for Synapse is `master`, which tracks the
latest release.
After the release of Synapse 1.13.0, we intend to change this default to
`develop`, which is the development tip. This is more consistent with common
practice and modern `git` usage.
Although we try to keep `develop` in a stable state, there may be occasions
where regressions creep in. Developers and distributors who have scripts which
run builds using the default branch of `Synapse` should therefore consider
pinning their scripts to `master`.
Synapse 1.12.0rc1 (2020-03-19) Synapse 1.12.0rc1 (2020-03-19)
============================== ==============================

1
changelog.d/6988.doc Normal file
View File

@ -0,0 +1 @@
Improve the documentation for database configuration.

1
changelog.d/7009.feature Normal file
View File

@ -0,0 +1 @@
Set `Referrer-Policy` header to `no-referrer` on media downloads.

1
changelog.d/7115.misc Normal file
View File

@ -0,0 +1 @@
De-duplicate / remove unused REST code for login and auth.

1
changelog.d/7116.misc Normal file
View File

@ -0,0 +1 @@
Convert `*StreamRow` classes to inner classes.

1
changelog.d/7117.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug which meant that groups updates were not correctly replicated between workers.

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.12.0) stable; urgency=medium
* New synapse release 1.12.0.
-- Synapse Packaging team <packages@matrix.org> Mon, 23 Mar 2020 12:13:03 +0000
matrix-synapse-py3 (1.11.1) stable; urgency=medium matrix-synapse-py3 (1.11.1) stable; urgency=medium
* New synapse release 1.11.1. * New synapse release 1.11.1.

View File

@ -72,8 +72,7 @@ underneath the database, or if a different version of the locale is used on any
replicas. replicas.
The safest way to fix the issue is to take a dump and recreate the database with The safest way to fix the issue is to take a dump and recreate the database with
the correct `COLLATE` and `CTYPE` parameters (as per the correct `COLLATE` and `CTYPE` parameters (as shown above). It is also possible to change the
[docs/postgres.md](docs/postgres.md)). It is also possible to change the
parameters on a live database and run a `REINDEX` on the entire database, parameters on a live database and run a `REINDEX` on the entire database,
however extreme care must be taken to avoid database corruption. however extreme care must be taken to avoid database corruption.
@ -105,19 +104,41 @@ of free memory the database host has available.
When you are ready to start using PostgreSQL, edit the `database` When you are ready to start using PostgreSQL, edit the `database`
section in your config file to match the following lines: section in your config file to match the following lines:
database: ```yaml
name: psycopg2 database:
args: name: psycopg2
user: <user> args:
password: <pass> user: <user>
database: <db> password: <pass>
host: <host> database: <db>
cp_min: 5 host: <host>
cp_max: 10 cp_min: 5
cp_max: 10
```
All key, values in `args` are passed to the `psycopg2.connect(..)` All key, values in `args` are passed to the `psycopg2.connect(..)`
function, except keys beginning with `cp_`, which are consumed by the function, except keys beginning with `cp_`, which are consumed by the
twisted adbapi connection pool. twisted adbapi connection pool. See the [libpq
documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS)
for a list of options which can be passed.
You should consider tuning the `args.keepalives_*` options if there is any danger of
the connection between your homeserver and database dropping, otherwise Synapse
may block for an extended period while it waits for a response from the
database server. Example values might be:
```yaml
# seconds of inactivity after which TCP should send a keepalive message to the server
keepalives_idle: 10
# the number of seconds after which a TCP keepalive message that is not
# acknowledged by the server should be retransmitted
keepalives_interval: 10
# the number of TCP keepalives that can be lost before the client's connection
# to the server is considered dead
keepalives_count: 3
```
## Porting from SQLite ## Porting from SQLite

View File

@ -578,13 +578,46 @@ acme:
## Database ## ## Database ##
# The 'database' setting defines the database that synapse uses to store all of
# its data.
#
# 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or
# 'psycopg2' (for PostgreSQL).
#
# 'args' gives options which are passed through to the database engine,
# except for options starting 'cp_', which are used to configure the Twisted
# connection pool. For a reference to valid arguments, see:
# * for sqlite: https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
# * for postgres: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
# * for the connection pool: https://twistedmatrix.com/documents/current/api/twisted.enterprise.adbapi.ConnectionPool.html#__init__
#
#
# Example SQLite configuration:
#
#database:
# name: sqlite3
# args:
# database: /path/to/homeserver.db
#
#
# Example Postgres configuration:
#
#database:
# name: psycopg2
# args:
# user: synapse
# password: secretpassword
# database: synapse
# host: localhost
# cp_min: 5
# cp_max: 10
#
# For more information on using Synapse with Postgres, see `docs/postgres.md`.
#
database: database:
# The database engine name name: sqlite3
name: "sqlite3"
# Arguments to pass to the engine
args: args:
# Path to the database database: DATADIR/homeserver.db
database: "DATADIR/homeserver.db"
# Number of events to cache in memory. # Number of events to cache in memory.
# #

View File

@ -36,7 +36,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.12.0rc1" __version__ = "1.12.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View File

@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ( from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream, DeviceListsStream,
GroupServerStream,
PresenceStream,
PushersStream,
PushRulesStream,
ReceiptsStream, ReceiptsStream,
TagAccountDataStream,
ToDeviceStream, ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
) )
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@ -629,7 +640,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
if self.send_handler: if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows) self.send_handler.process_replication_rows(stream_name, token, rows)
if stream_name == "events": if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows. # we don't need to optimise this for multiple rows.
for row in rows: for row in rows:
@ -652,44 +663,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
) )
await self.pusher_pool.on_new_notifications(token, token) await self.pusher_pool.on_new_notifications(token, token)
elif stream_name == "push_rules": elif stream_name == PushRulesStream.NAME:
self.notifier.on_new_event( self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows] "push_rules_key", token, users=[row.user_id for row in rows]
) )
elif stream_name in ("account_data", "tag_account_data"): elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event( self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows] "account_data_key", token, users=[row.user_id for row in rows]
) )
elif stream_name == "receipts": elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event( self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows] "receipt_key", token, rooms=[row.room_id for row in rows]
) )
await self.pusher_pool.on_new_receipts( await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows} token, token, {row.room_id for row in rows}
) )
elif stream_name == "typing": elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows) self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event( self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows] "typing_key", token, rooms=[row.room_id for row in rows]
) )
elif stream_name == "to_device": elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")] entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities: if entities:
self.notifier.on_new_event("to_device_key", token, users=entities) self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == "device_lists": elif stream_name == DeviceListsStream.NAME:
all_room_ids = set() all_room_ids = set()
for row in rows: for row in rows:
if row.entity.startswith("@"): if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity) room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids) all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence": elif stream_name == PresenceStream.NAME:
await self.presence_handler.process_replication_rows(token, rows) await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts": elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event( self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows] "groups_key", token, users=[row.user_id for row in rows]
) )
elif stream_name == "pushers": elif stream_name == PushersStream.NAME:
for row in rows: for row in rows:
if row.deleted: if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey) self.stop_pusher(row.user_id, row.app_id, row.pushkey)
@ -796,7 +807,7 @@ class FederationSenderHandler(object):
async def _on_new_receipts(self, rows): async def _on_new_receipts(self, rows):
""" """
Args: Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
new receipts to be processed new receipts to be processed
""" """
for receipt in rows: for receipt in rows:

View File

@ -294,7 +294,6 @@ class RootConfig(object):
report_stats=None, report_stats=None,
open_private_ports=False, open_private_ports=False,
listeners=None, listeners=None,
database_conf=None,
tls_certificate_path=None, tls_certificate_path=None,
tls_private_key_path=None, tls_private_key_path=None,
acme_domain=None, acme_domain=None,
@ -367,7 +366,6 @@ class RootConfig(object):
report_stats=report_stats, report_stats=report_stats,
open_private_ports=open_private_ports, open_private_ports=open_private_ports,
listeners=listeners, listeners=listeners,
database_conf=database_conf,
tls_certificate_path=tls_certificate_path, tls_certificate_path=tls_certificate_path,
tls_private_key_path=tls_private_key_path, tls_private_key_path=tls_private_key_path,
acme_domain=acme_domain, acme_domain=acme_domain,

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -14,14 +15,60 @@
# limitations under the License. # limitations under the License.
import logging import logging
import os import os
from textwrap import indent
import yaml
from synapse.config._base import Config, ConfigError from synapse.config._base import Config, ConfigError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_CONFIG = """\
## Database ##
# The 'database' setting defines the database that synapse uses to store all of
# its data.
#
# 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or
# 'psycopg2' (for PostgreSQL).
#
# 'args' gives options which are passed through to the database engine,
# except for options starting 'cp_', which are used to configure the Twisted
# connection pool. For a reference to valid arguments, see:
# * for sqlite: https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
# * for postgres: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
# * for the connection pool: https://twistedmatrix.com/documents/current/api/twisted.enterprise.adbapi.ConnectionPool.html#__init__
#
#
# Example SQLite configuration:
#
#database:
# name: sqlite3
# args:
# database: /path/to/homeserver.db
#
#
# Example Postgres configuration:
#
#database:
# name: psycopg2
# args:
# user: synapse
# password: secretpassword
# database: synapse
# host: localhost
# cp_min: 5
# cp_max: 10
#
# For more information on using Synapse with Postgres, see `docs/postgres.md`.
#
database:
name: sqlite3
args:
database: %(database_path)s
# Number of events to cache in memory.
#
#event_cache_size: 10K
"""
class DatabaseConnectionConfig: class DatabaseConnectionConfig:
"""Contains the connection config for a particular database. """Contains the connection config for a particular database.
@ -36,10 +83,12 @@ class DatabaseConnectionConfig:
""" """
def __init__(self, name: str, db_config: dict): def __init__(self, name: str, db_config: dict):
if db_config["name"] not in ("sqlite3", "psycopg2"): db_engine = db_config.get("name", "sqlite3")
raise ConfigError("Unsupported database type %r" % (db_config["name"],))
if db_config["name"] == "sqlite3": if db_engine not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_engine,))
if db_engine == "sqlite3":
db_config.setdefault("args", {}).update( db_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False} {"cp_min": 1, "cp_max": 1, "check_same_thread": False}
) )
@ -97,34 +146,10 @@ class DatabaseConfig(Config):
self.set_databasepath(config.get("database_path")) self.set_databasepath(config.get("database_path"))
def generate_config_section(self, data_dir_path, database_conf, **kwargs): def generate_config_section(self, data_dir_path, **kwargs):
if not database_conf: return DEFAULT_CONFIG % {
database_path = os.path.join(data_dir_path, "homeserver.db") "database_path": os.path.join(data_dir_path, "homeserver.db")
database_conf = ( }
"""# The database engine name
name: "sqlite3"
# Arguments to pass to the engine
args:
# Path to the database
database: "%(database_path)s"
"""
% locals()
)
else:
database_conf = indent(yaml.dump(database_conf), " " * 10).lstrip()
return (
"""\
## Database ##
database:
%(database_conf)s
# Number of events to cache in memory.
#
#event_cache_size: 10K
"""
% locals()
)
def read_arguments(self, args): def read_arguments(self, args):
self.set_databasepath(args.database_path) self.set_databasepath(args.database_path)

View File

@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
Args: Args:
transaction_queue (FederationSender) transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow)) rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
""" """
# The federation stream contains a bunch of different types of # The federation stream contains a bunch of different types of

View File

@ -27,30 +27,64 @@ Each stream is defined by the following information:
from typing import Dict, Type from typing import Dict, Type
from synapse.replication.tcp.streams import _base, events, federation from synapse.replication.tcp.streams._base import (
from synapse.replication.tcp.streams._base import Stream AccountDataStream,
BackfillStream,
CachesStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PublicRoomsStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
Stream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
UserSignatureStream,
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
STREAMS_MAP = { STREAMS_MAP = {
stream.NAME: stream stream.NAME: stream
for stream in ( for stream in (
events.EventsStream, EventsStream,
_base.BackfillStream, BackfillStream,
_base.PresenceStream, PresenceStream,
_base.TypingStream, TypingStream,
_base.ReceiptsStream, ReceiptsStream,
_base.PushRulesStream, PushRulesStream,
_base.PushersStream, PushersStream,
_base.CachesStream, CachesStream,
_base.PublicRoomsStream, PublicRoomsStream,
_base.DeviceListsStream, DeviceListsStream,
_base.ToDeviceStream, ToDeviceStream,
federation.FederationStream, FederationStream,
_base.TagAccountDataStream, TagAccountDataStream,
_base.AccountDataStream, AccountDataStream,
_base.GroupServerStream, GroupServerStream,
_base.UserSignatureStream, UserSignatureStream,
) )
} # type: Dict[str, Type[_base.Stream]] } # type: Dict[str, Type[Stream]]
__all__ = ["Stream", "STREAMS_MAP"] __all__ = [
"STREAMS_MAP",
"Stream",
"BackfillStream",
"PresenceStream",
"TypingStream",
"ReceiptsStream",
"PushRulesStream",
"PushersStream",
"CachesStream",
"PublicRoomsStream",
"DeviceListsStream",
"ToDeviceStream",
"TagAccountDataStream",
"AccountDataStream",
"GroupServerStream",
"UserSignatureStream",
]

View File

@ -28,94 +28,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 500000 MAX_EVENTS_BEHIND = 500000
BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""
cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)
@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
class Stream(object): class Stream(object):
"""Base class for the streams. """Base class for the streams.
@ -244,6 +156,18 @@ class BackfillStream(Stream):
or it went from being an outlier to not. or it went from being an outlier to not.
""" """
BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)
NAME = "backfill" NAME = "backfill"
ROW_TYPE = BackfillStreamRow ROW_TYPE = BackfillStreamRow
@ -256,6 +180,19 @@ class BackfillStream(Stream):
class PresenceStream(Stream): class PresenceStream(Stream):
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)
NAME = "presence" NAME = "presence"
ROW_TYPE = PresenceStreamRow ROW_TYPE = PresenceStreamRow
_QUERY_MASTER = True _QUERY_MASTER = True
@ -273,6 +210,10 @@ class PresenceStream(Stream):
class TypingStream(Stream): class TypingStream(Stream):
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)
NAME = "typing" NAME = "typing"
ROW_TYPE = TypingStreamRow ROW_TYPE = TypingStreamRow
_QUERY_MASTER = True _QUERY_MASTER = True
@ -289,6 +230,17 @@ class TypingStream(Stream):
class ReceiptsStream(Stream): class ReceiptsStream(Stream):
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)
NAME = "receipts" NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow ROW_TYPE = ReceiptsStreamRow
@ -305,6 +257,8 @@ class PushRulesStream(Stream):
"""A user has changed their push rules """A user has changed their push rules
""" """
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
NAME = "push_rules" NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow ROW_TYPE = PushRulesStreamRow
@ -325,6 +279,11 @@ class PushersStream(Stream):
"""A user has added/changed/removed a pusher """A user has added/changed/removed a pusher
""" """
PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
NAME = "pushers" NAME = "pushers"
ROW_TYPE = PushersStreamRow ROW_TYPE = PushersStreamRow
@ -342,6 +301,21 @@ class CachesStream(Stream):
the cache on the workers the cache on the workers
""" """
@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""
cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)
NAME = "caches" NAME = "caches"
ROW_TYPE = CachesStreamRow ROW_TYPE = CachesStreamRow
@ -358,6 +332,16 @@ class PublicRoomsStream(Stream):
"""The public rooms list changed """The public rooms list changed
""" """
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)
NAME = "public_rooms" NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow ROW_TYPE = PublicRoomsStreamRow
@ -375,6 +359,10 @@ class DeviceListsStream(Stream):
told about a device update. told about a device update.
""" """
@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)
NAME = "device_lists" NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow ROW_TYPE = DeviceListsStreamRow
@ -391,6 +379,8 @@ class ToDeviceStream(Stream):
"""New to_device messages for a client """New to_device messages for a client
""" """
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
NAME = "to_device" NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow ROW_TYPE = ToDeviceStreamRow
@ -407,6 +397,10 @@ class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room """Someone added/removed a tag for a room
""" """
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
NAME = "tag_account_data" NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow ROW_TYPE = TagAccountDataStreamRow
@ -423,6 +417,10 @@ class AccountDataStream(Stream):
"""Global or per room account data was changed """Global or per room account data was changed
""" """
AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
NAME = "account_data" NAME = "account_data"
ROW_TYPE = AccountDataStreamRow ROW_TYPE = AccountDataStreamRow
@ -448,6 +446,11 @@ class AccountDataStream(Stream):
class GroupServerStream(Stream): class GroupServerStream(Stream):
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
NAME = "groups" NAME = "groups"
ROW_TYPE = GroupsStreamRow ROW_TYPE = GroupsStreamRow
@ -464,6 +467,8 @@ class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key """A user has signed their own device with their user-signing key
""" """
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
NAME = "user_signature" NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow ROW_TYPE = UserSignatureStreamRow

View File

@ -19,20 +19,20 @@ from twisted.internet import defer
from synapse.replication.tcp.streams._base import Stream from synapse.replication.tcp.streams._base import Stream
FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)
class FederationStream(Stream): class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation """Data to be sent over federation. Only available when master has federation
sending disabled. sending disabled.
""" """
FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)
NAME = "federation" NAME = "federation"
ROW_TYPE = FederationStreamRow ROW_TYPE = FederationStreamRow
_QUERY_MASTER = True _QUERY_MASTER = True

View File

@ -28,7 +28,6 @@ from synapse.http.servlet import (
parse_json_object_from_request, parse_json_object_from_request,
parse_string, parse_string,
) )
from synapse.push.mailer import load_jinja2_templates
from synapse.rest.client.v2_alpha._base import client_patterns from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.well_known import WellKnownBuilder from synapse.rest.well_known import WellKnownBuilder
from synapse.types import UserID, map_username_to_mxid_localpart from synapse.types import UserID, map_username_to_mxid_localpart
@ -548,13 +547,6 @@ class SSOAuthHandler(object):
self._registration_handler = hs.get_registration_handler() self._registration_handler = hs.get_registration_handler()
self._macaroon_gen = hs.get_macaroon_generator() self._macaroon_gen = hs.get_macaroon_generator()
# Load the redirect page HTML template
self._template = load_jinja2_templates(
hs.config.sso_redirect_confirm_template_dir, ["sso_redirect_confirm.html"],
)[0]
self._server_name = hs.config.server_name
# cast to tuple for use with str.startswith # cast to tuple for use with str.startswith
self._whitelisted_sso_clients = tuple(hs.config.sso_client_whitelist) self._whitelisted_sso_clients = tuple(hs.config.sso_client_whitelist)

View File

@ -142,14 +142,6 @@ class AuthRestServlet(RestServlet):
% (CLIENT_API_PREFIX, LoginType.RECAPTCHA), % (CLIENT_API_PREFIX, LoginType.RECAPTCHA),
"sitekey": self.hs.config.recaptcha_public_key, "sitekey": self.hs.config.recaptcha_public_key,
} }
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return None
elif stagetype == LoginType.TERMS: elif stagetype == LoginType.TERMS:
html = TERMS_TEMPLATE % { html = TERMS_TEMPLATE % {
"session": session, "session": session,
@ -158,17 +150,19 @@ class AuthRestServlet(RestServlet):
"myurl": "%s/r0/auth/%s/fallback/web" "myurl": "%s/r0/auth/%s/fallback/web"
% (CLIENT_API_PREFIX, LoginType.TERMS), % (CLIENT_API_PREFIX, LoginType.TERMS),
} }
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return None
else: else:
raise SynapseError(404, "Unknown auth stage type") raise SynapseError(404, "Unknown auth stage type")
# Render the HTML and return.
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return None
async def on_POST(self, request, stagetype): async def on_POST(self, request, stagetype):
session = parse_string(request, "session") session = parse_string(request, "session")
@ -196,15 +190,6 @@ class AuthRestServlet(RestServlet):
% (CLIENT_API_PREFIX, LoginType.RECAPTCHA), % (CLIENT_API_PREFIX, LoginType.RECAPTCHA),
"sitekey": self.hs.config.recaptcha_public_key, "sitekey": self.hs.config.recaptcha_public_key,
} }
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return None
elif stagetype == LoginType.TERMS: elif stagetype == LoginType.TERMS:
authdict = {"session": session} authdict = {"session": session}
@ -225,17 +210,19 @@ class AuthRestServlet(RestServlet):
"myurl": "%s/r0/auth/%s/fallback/web" "myurl": "%s/r0/auth/%s/fallback/web"
% (CLIENT_API_PREFIX, LoginType.TERMS), % (CLIENT_API_PREFIX, LoginType.TERMS),
} }
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return None
else: else:
raise SynapseError(404, "Unknown auth stage type") raise SynapseError(404, "Unknown auth stage type")
# Render the HTML and return.
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return None
def on_OPTIONS(self, _): def on_OPTIONS(self, _):
return 200, {} return 200, {}

View File

@ -50,6 +50,9 @@ class DownloadResource(DirectServeResource):
b" media-src 'self';" b" media-src 'self';"
b" object-src 'self';", b" object-src 'self';",
) )
request.setHeader(
b"Referrer-Policy", b"no-referrer",
)
server_name, media_id, name = parse_media_id(request) server_name, media_id, name = parse_media_id(request)
if server_name == self.server_name: if server_name == self.server_name:
await self.media_repo.get_local_media(request, media_id, name) await self.media_repo.get_local_media(request, media_id, name)

View File

@ -21,9 +21,9 @@ from tests import unittest
class DatabaseConfigTestCase(unittest.TestCase): class DatabaseConfigTestCase(unittest.TestCase):
def test_database_configured_correctly_no_database_conf_param(self): def test_database_configured_correctly(self):
conf = yaml.safe_load( conf = yaml.safe_load(
DatabaseConfig().generate_config_section("/data_dir_path", None) DatabaseConfig().generate_config_section(data_dir_path="/data_dir_path")
) )
expected_database_conf = { expected_database_conf = {
@ -32,21 +32,3 @@ class DatabaseConfigTestCase(unittest.TestCase):
} }
self.assertEqual(conf["database"], expected_database_conf) self.assertEqual(conf["database"], expected_database_conf)
def test_database_configured_correctly_database_conf_param(self):
database_conf = {
"name": "my super fast datastore",
"args": {
"user": "matrix",
"password": "synapse_database_password",
"host": "synapse_database_host",
"database": "matrix",
},
}
conf = yaml.safe_load(
DatabaseConfig().generate_config_section("/data_dir_path", database_conf)
)
self.assertEqual(conf["database"], database_conf)

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.replication.tcp.streams._base import ReceiptsStreamRow from synapse.replication.tcp.streams._base import ReceiptsStream
from tests.replication.tcp.streams._base import BaseStreamTestCase from tests.replication.tcp.streams._base import BaseStreamTestCase
@ -40,7 +40,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
stream_name, token, rdata_rows = self.test_handler.on_rdata.call_args[0] stream_name, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
self.assertEqual(stream_name, "receipts") self.assertEqual(stream_name, "receipts")
self.assertEqual(1, len(rdata_rows)) self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0] # type: ReceiptsStreamRow row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow
self.assertEqual("!room:blue", row.room_id) self.assertEqual("!room:blue", row.room_id)
self.assertEqual("m.read", row.receipt_type) self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id) self.assertEqual(USER_ID, row.user_id)
@ -72,7 +72,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
self.assertEqual(token, 3) self.assertEqual(token, 3)
self.assertEqual(1, len(rdata_rows)) self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0] # type: ReceiptsStreamRow row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow
self.assertEqual("!room2:blue", row.room_id) self.assertEqual("!room2:blue", row.room_id)
self.assertEqual("m.read", row.receipt_type) self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id) self.assertEqual(USER_ID, row.user_id)