Merge remote-tracking branch 'origin/release-v1.31.0' into matrix-org-hotfixes

erikj/disable_catchup_to_hq
Erik Johnston 2021-03-30 11:41:52 +01:00
commit f46b864748
86 changed files with 604 additions and 334 deletions

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# this script is run by buildkite in a plain `xenial` container; it installs the
# minimal requirements for tox and hands over to the py35-old tox environment.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
# with additional dependencies needed for the test (such as coverage or the PostgreSQL

View File

@ -1,3 +1,71 @@
Synapse 1.31.0rc1 (2021-03-30)
==============================
**Note:** As announced in v1.25.0, and in line with the deprecation policy for platform dependencies, this is the last release to support Python 3.5 and PostgreSQL 9.5. Future versions of Synapse will require Python 3.6+ and PostgreSQL 9.6+.
This is also the last release that the Synapse team will be publishing packages for Debian Stretch and Ubuntu Xenial.
Features
--------
- Add support to OpenID Connect login for requiring attributes on the `userinfo` response. Contributed by Hubbe King. ([\#9609](https://github.com/matrix-org/synapse/issues/9609))
- Add initial experimental support for a "space summary" API. ([\#9643](https://github.com/matrix-org/synapse/issues/9643), [\#9652](https://github.com/matrix-org/synapse/issues/9652), [\#9653](https://github.com/matrix-org/synapse/issues/9653))
- Add support for the busy presence state as described in [MSC3026](https://github.com/matrix-org/matrix-doc/pull/3026). ([\#9644](https://github.com/matrix-org/synapse/issues/9644))
- Add support for credentials for proxy authentication in the `HTTPS_PROXY` environment variable. ([\#9657](https://github.com/matrix-org/synapse/issues/9657))
Bugfixes
--------
- Fix a longstanding bug that could cause issues when editing a reply to a message. ([\#9585](https://github.com/matrix-org/synapse/issues/9585))
- Fix the `/capabilities` endpoint to return `m.change_password` as disabled if the local password database is not used for authentication. Contributed by @dklimpel. ([\#9588](https://github.com/matrix-org/synapse/issues/9588))
- Check if local passwords are enabled before setting them for the user. ([\#9636](https://github.com/matrix-org/synapse/issues/9636))
- Fix a bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind. ([\#9639](https://github.com/matrix-org/synapse/issues/9639))
- Fix a bug introduced in Synapse 1.30.1 which meant the suggested `pip` incantation to install an updated `cryptography` was incorrect. ([\#9699](https://github.com/matrix-org/synapse/issues/9699))
Updates to the Docker image
---------------------------
- Speed up Docker builds and make it nicer to test against Complement while developing (install all dependencies before copying the project). ([\#9610](https://github.com/matrix-org/synapse/issues/9610))
- Include [opencontainers labels](https://github.com/opencontainers/image-spec/blob/master/annotations.md#pre-defined-annotation-keys) in the Docker image. ([\#9612](https://github.com/matrix-org/synapse/issues/9612))
Improved Documentation
----------------------
- Clarify that `register_new_matrix_user` is present also when installed via non-pip package. ([\#9074](https://github.com/matrix-org/synapse/issues/9074))
- Update source install documentation to mention platform prerequisites before the source install steps. ([\#9667](https://github.com/matrix-org/synapse/issues/9667))
- Improve worker documentation for fallback/web auth endpoints. ([\#9679](https://github.com/matrix-org/synapse/issues/9679))
- Update the sample configuration for OIDC authentication. ([\#9695](https://github.com/matrix-org/synapse/issues/9695))
Internal Changes
----------------
- Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column. ([\#9411](https://github.com/matrix-org/synapse/issues/9411))
- Add type hints to the caching module. ([\#9442](https://github.com/matrix-org/synapse/issues/9442))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9499](https://github.com/matrix-org/synapse/issues/9499), [\#9659](https://github.com/matrix-org/synapse/issues/9659))
- Add additional type hints to the Homeserver object. ([\#9631](https://github.com/matrix-org/synapse/issues/9631), [\#9638](https://github.com/matrix-org/synapse/issues/9638), [\#9675](https://github.com/matrix-org/synapse/issues/9675), [\#9681](https://github.com/matrix-org/synapse/issues/9681))
- Only save remote cross-signing and device keys if they're different from the current ones. ([\#9634](https://github.com/matrix-org/synapse/issues/9634))
- Rename storage function to fix spelling and not conflict with another function's name. ([\#9637](https://github.com/matrix-org/synapse/issues/9637))
- Improve performance of federation catch up by sending the latest events in the room to the remote, rather than just the last event sent by the local server. ([\#9640](https://github.com/matrix-org/synapse/issues/9640), [\#9664](https://github.com/matrix-org/synapse/issues/9664))
- In the `federation_client` commandline client, stop automatically adding the URL prefix, so that servlets on other prefixes can be tested. ([\#9645](https://github.com/matrix-org/synapse/issues/9645))
- In the `federation_client` commandline client, handle inline `signing_key`s in `homeserver.yaml`. ([\#9647](https://github.com/matrix-org/synapse/issues/9647))
- Fixed some antipattern issues to improve code quality. ([\#9649](https://github.com/matrix-org/synapse/issues/9649))
- Add a storage method for pulling all current user presence state from the database. ([\#9650](https://github.com/matrix-org/synapse/issues/9650))
- Import `HomeServer` from the proper module. ([\#9665](https://github.com/matrix-org/synapse/issues/9665))
- Increase default join ratelimiting burst rate. ([\#9674](https://github.com/matrix-org/synapse/issues/9674))
- Add type hints to third party event rules and visibility modules. ([\#9676](https://github.com/matrix-org/synapse/issues/9676))
- Bump mypy-zope to 0.2.13 to fix "Cannot determine consistent method resolution order (MRO)" errors when running mypy a second time. ([\#9678](https://github.com/matrix-org/synapse/issues/9678))
- Use interpreter from `$PATH` via `/usr/bin/env` instead of absolute paths in various scripts. ([\#9689](https://github.com/matrix-org/synapse/issues/9689))
- Make it possible to use `dmypy`. ([\#9692](https://github.com/matrix-org/synapse/issues/9692))
- Suppress "CryptographyDeprecationWarning: int_from_bytes is deprecated". ([\#9698](https://github.com/matrix-org/synapse/issues/9698))
- Use `dmypy run` in lint script for improved performance in type-checking while developing. ([\#9701](https://github.com/matrix-org/synapse/issues/9701))
- Fix undetected mypy error when using Python 3.6. ([\#9703](https://github.com/matrix-org/synapse/issues/9703))
- Fix type-checking CI on develop. ([\#9709](https://github.com/matrix-org/synapse/issues/9709))
Synapse 1.30.1 (2021-03-26)
===========================

View File

@ -6,7 +6,7 @@ There are 3 steps to follow under **Installation Instructions**.
- [Choosing your server name](#choosing-your-server-name)
- [Installing Synapse](#installing-synapse)
- [Installing from source](#installing-from-source)
- [Platform-Specific Instructions](#platform-specific-instructions)
- [Platform-specific prerequisites](#platform-specific-prerequisites)
- [Debian/Ubuntu/Raspbian](#debianubunturaspbian)
- [ArchLinux](#archlinux)
- [CentOS/Fedora](#centosfedora)
@ -60,17 +60,14 @@ that your email address is probably `user@example.com` rather than
(Prebuilt packages are available for some platforms - see [Prebuilt packages](#prebuilt-packages).)
When installing from source please make sure that the [Platform-specific prerequisites](#platform-specific-prerequisites) are already installed.
System requirements:
- POSIX-compliant system (tested on Linux & OS X)
- Python 3.5.2 or later, up to Python 3.9.
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
Synapse is written in Python but some of the libraries it uses are written in
C. So before we can install Synapse itself we need a working C compiler and the
header files for Python C extensions. See [Platform-Specific
Instructions](#platform-specific-instructions) for information on installing
these on various platforms.
To install the Synapse homeserver run:
@ -128,7 +125,11 @@ source env/bin/activate
synctl start
```
#### Platform-Specific Instructions
#### Platform-specific prerequisites
Synapse is written in Python but some of the libraries it uses are written in
C. So before we can install Synapse itself we need a working C compiler and the
header files for Python C extensions.
##### Debian/Ubuntu/Raspbian
@ -526,14 +527,24 @@ email will be disabled.
The easiest way to create a new user is to do so from a client like [Element](https://element.io/).
Alternatively you can do so from the command line if you have installed via pip.
Alternatively, you can do so from the command line. This can be done as follows:
This can be done as follows:
1. If synapse was installed via pip, activate the virtualenv as follows (if Synapse was
installed via a prebuilt package, `register_new_matrix_user` should already be
on the search path):
```sh
cd ~/synapse
source env/bin/activate
synctl start # if not already running
```
2. Run the following command:
```sh
register_new_matrix_user -c homeserver.yaml http://localhost:8008
```
```sh
$ source ~/synapse/env/bin/activate
$ synctl start # if not already running
$ register_new_matrix_user -c homeserver.yaml http://localhost:8008
This will prompt you to add details for the new user, and will then connect to
the running Synapse to create the new user. For example:
```
New user localpart: erikj
Password:
Confirm password:

View File

@ -98,9 +98,12 @@ will log a warning on each received request.
To avoid the warning, administrators using a reverse proxy should ensure that
the reverse proxy sets `X-Forwarded-Proto` header to `https` or `http` to
indicate the protocol used by the client. See the `reverse proxy documentation
<docs/reverse_proxy.md>`_, where the example configurations have been updated to
show how to set this header.
indicate the protocol used by the client.
Synapse also requires the `Host` header to be preserved.
See the `reverse proxy documentation <docs/reverse_proxy.md>`_, where the
example configurations have been updated to show how to set these headers.
(Users of `Caddy <https://caddyserver.com/>`_ are unaffected, since we believe it
sets `X-Forwarded-Proto` by default.)

View File

@ -1 +0,0 @@
Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column.

View File

@ -1 +0,0 @@
Introduce flake8-bugbear to the test suite and fix some of its lint violations.

View File

@ -1 +0,0 @@
Fix a longstanding bug that could cause issues when editing a reply to a message.

View File

@ -1 +0,0 @@
Fix the `/capabilities` endpoint to return `m.change_password` as disabled if the local password database is not used for authentication. Contributed by @dklimpel.

View File

@ -1 +0,0 @@
Logins using OpenID Connect can require attributes on the `userinfo` response in order to login. Contributed by Hubbe King.

View File

@ -1 +0,0 @@
Include [opencontainers labels](https://github.com/opencontainers/image-spec/blob/master/annotations.md#pre-defined-annotation-keys) in the Docker image.

View File

@ -1 +0,0 @@
Add additional type hints to the Homeserver object.

View File

@ -1 +0,0 @@
Only save remote cross-signing and device keys if they're different from the current ones.

View File

@ -1 +0,0 @@
Checks if passwords are allowed before setting it for the user.

View File

@ -1 +0,0 @@
Rename storage function to fix spelling and not conflict with another functions name.

View File

@ -1 +0,0 @@
Add additional type hints to the Homeserver object.

View File

@ -1 +0,0 @@
Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.

View File

@ -1 +0,0 @@
Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.

View File

@ -1 +0,0 @@
Add initial experimental support for a "space summary" API.

View File

@ -1 +0,0 @@
Implement the busy presence state as described in [MSC3026](https://github.com/matrix-org/matrix-doc/pull/3026).

View File

@ -1 +0,0 @@
In the `federation_client` commandline client, stop automatically adding the URL prefix, so that servlets on other prefixes can be tested.

View File

@ -1 +0,0 @@
In the `federation_client` commandline client, handle inline `signing_key`s in `homeserver.yaml`.

View File

@ -1 +0,0 @@
Fixed some antipattern issues to improve code quality.

View File

@ -1 +0,0 @@
Add initial experimental support for a "space summary" API.

View File

@ -1 +0,0 @@
Add initial experimental support for a "space summary" API.

View File

@ -1 +0,0 @@
Add support for credentials for proxy authentication in the `HTTPS_PROXY` environment variable.

View File

@ -1 +0,0 @@
Introduce flake8-bugbear to the test suite and fix some of its lint violations.

View File

@ -1 +0,0 @@
Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.

View File

@ -1 +0,0 @@
Import `HomeServer` from the proper module.

View File

@ -1 +0,0 @@
Increase default join ratelimiting burst rate.

View File

@ -1 +0,0 @@
Add additional type hints to the Homeserver object.

View File

@ -1 +0,0 @@
Add type hints to third party event rules and visibility modules.

View File

@ -1 +0,0 @@
Bump mypy-zope to 0.2.13 to fix "Cannot determine consistent method resolution order (MRO)" errors when running mypy a second time.

View File

@ -1 +0,0 @@
Improve worker documentation for fallback/web auth endpoints.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# this script will use the api:
# https://github.com/matrix-org/synapse/blob/master/docs/admin_api/purge_history_api.rst

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
DOMAIN=yourserver.tld
# add this user as admin in your home server:

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -e

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
DIR="$( cd "$( dirname "$0" )" && pwd )"

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
DIR="$( cd "$( dirname "$0" )" && pwd )"

View File

@ -25,42 +25,40 @@ LABEL org.opencontainers.image.licenses='Apache-2.0'
# install the OS build deps
RUN apt-get update && apt-get install -y \
build-essential \
libffi-dev \
libjpeg-dev \
libpq-dev \
libssl-dev \
libwebp-dev \
libxml++2.6-dev \
libxslt1-dev \
openssl \
rustc \
zlib1g-dev \
&& rm -rf /var/lib/apt/lists/*
build-essential \
libffi-dev \
libjpeg-dev \
libpq-dev \
libssl-dev \
libwebp-dev \
libxml++2.6-dev \
libxslt1-dev \
openssl \
rustc \
zlib1g-dev \
&& rm -rf /var/lib/apt/lists/*
# Build dependencies that are not available as wheels, to speed up rebuilds
RUN pip install --prefix="/install" --no-warn-script-location \
cryptography \
frozendict \
jaeger-client \
opentracing \
# Match the version constraints of Synapse
"prometheus_client>=0.4.0" \
psycopg2 \
pycparser \
pyrsistent \
pyyaml \
simplejson \
threadloop \
thrift
# now install synapse and all of the python deps to /install.
COPY synapse /synapse/synapse/
# Copy just what we need to pip install
COPY scripts /synapse/scripts/
COPY MANIFEST.in README.rst setup.py synctl /synapse/
COPY synapse/__init__.py /synapse/synapse/__init__.py
COPY synapse/python_dependencies.py /synapse/synapse/python_dependencies.py
# To speed up rebuilds, install all of the dependencies before we copy over
# the whole synapse project so that we this layer in the Docker cache can be
# used while you develop on the source
#
# This is aiming at installing the `install_requires` and `extras_require` from `setup.py`
RUN pip install --prefix="/install" --no-warn-script-location \
/synapse[all]
/synapse[all]
# Copy over the rest of the project
COPY synapse /synapse/synapse/
# Install the synapse package itself and all of its children packages.
#
# This is aiming at installing only the `packages=find_packages(...)` from `setup.py
RUN pip install --prefix="/install" --no-deps --no-warn-script-location /synapse
###
### Stage 1: runtime
@ -69,16 +67,16 @@ RUN pip install --prefix="/install" --no-warn-script-location \
FROM docker.io/python:${PYTHON_VERSION}-slim
RUN apt-get update && apt-get install -y \
curl \
gosu \
libjpeg62-turbo \
libpq5 \
libwebp6 \
xmlsec1 \
libjemalloc2 \
libssl-dev \
openssl \
&& rm -rf /var/lib/apt/lists/*
curl \
gosu \
libjpeg62-turbo \
libpq5 \
libwebp6 \
xmlsec1 \
libjemalloc2 \
libssl-dev \
openssl \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /install /usr/local
COPY ./docker/start.py /start.py
@ -91,4 +89,4 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
ENTRYPOINT ["/start.py"]
HEALTHCHECK --interval=1m --timeout=5s \
CMD curl -fSs http://localhost:8008/health || exit 1
CMD curl -fSs http://localhost:8008/health || exit 1

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# The script to build the Debian package, as ran inside the Docker image.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# This script runs the PostgreSQL tests inside a Docker container. It expects
# the relevant source files to be mounted into /src (done automatically by the

View File

@ -104,10 +104,11 @@ example.com:8448 {
```
<VirtualHost *:443>
SSLEngine on
ServerName matrix.example.com;
ServerName matrix.example.com
RequestHeader set "X-Forwarded-Proto" expr=%{REQUEST_SCHEME}
AllowEncodedSlashes NoDecode
ProxyPreserveHost on
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
ProxyPass /_synapse/client http://127.0.0.1:8008/_synapse/client nocanon
@ -116,7 +117,7 @@ example.com:8448 {
<VirtualHost *:8448>
SSLEngine on
ServerName example.com;
ServerName example.com
RequestHeader set "X-Forwarded-Proto" expr=%{REQUEST_SCHEME}
AllowEncodedSlashes NoDecode
@ -135,6 +136,8 @@ example.com:8448 {
</IfModule>
```
**NOTE 3**: Missing `ProxyPreserveHost on` can lead to a redirect loop.
### HAProxy
```

View File

@ -1758,6 +1758,9 @@ saml2_config:
# Note that, if this is changed, users authenticating via that provider
# will no longer be recognised as the same user!
#
# (Use "oidc" here if you are migrating from an old "oidc_config"
# configuration.)
#
# idp_name: A user-facing name for this identity provider, which is used to
# offer the user a choice of login mechanisms.
#
@ -1927,37 +1930,6 @@ oidc_providers:
# - attribute: userGroup
# value: "synapseUsers"
# For use with Keycloak
#
#- idp_id: keycloak
# idp_name: Keycloak
# issuer: "https://127.0.0.1:8443/auth/realms/my_realm_name"
# client_id: "synapse"
# client_secret: "copy secret generated in Keycloak UI"
# scopes: ["openid", "profile"]
# attribute_requirements:
# - attribute: groups
# value: "admin"
# For use with Github
#
#- idp_id: github
# idp_name: Github
# idp_brand: github
# discover: false
# issuer: "https://github.com/"
# client_id: "your-client-id" # TO BE FILLED
# client_secret: "your-client-secret" # TO BE FILLED
# authorization_endpoint: "https://github.com/login/oauth/authorize"
# token_endpoint: "https://github.com/login/oauth/access_token"
# userinfo_endpoint: "https://api.github.com/user"
# scopes: ["read:user"]
# user_mapping_provider:
# config:
# subject_claim: "id"
# localpart_template: "{{ user.login }}"
# display_name_template: "{{ user.name }}"
# Enable Central Authentication Service (CAS) for registration and login.
#

View File

@ -1,12 +1,13 @@
[mypy]
namespace_packages = True
plugins = mypy_zope:plugin, scripts-dev/mypy_synapse_plugin.py
follow_imports = silent
follow_imports = normal
check_untyped_defs = True
show_error_codes = True
show_traceback = True
mypy_path = stubs
warn_unreachable = True
local_partial_types = True
# To find all folders that pass mypy you run:
#

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# A script which checks that an appropriate news file has been added on this
# branch.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# Find linting errors in Synapse's default config file.
# Exits with 0 if there are no problems, or another code otherwise.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Update/check the docs/sample_config.yaml

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Runs linting scripts over the local Synapse checkout
# isort - sorts import statements
@ -95,4 +95,4 @@ isort "${files[@]}"
python3 -m black "${files[@]}"
./scripts-dev/config-lint.sh
flake8 "${files[@]}"
mypy
dmypy run

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# This script generates SQL files for creating a brand new Synapse DB with the latest
# schema, on both SQLite3 and Postgres.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -e
@ -6,4 +6,4 @@ set -e
# next PR number.
CURRENT_NUMBER=`curl -s "https://api.github.com/repos/matrix-org/synapse/issues?state=all&per_page=1" | jq -r ".[0].number"`
CURRENT_NUMBER=$((CURRENT_NUMBER+1))
echo $CURRENT_NUMBER
echo $CURRENT_NUMBER

View File

@ -48,7 +48,7 @@ try:
except ImportError:
pass
__version__ = "1.30.1"
__version__ = "1.31.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@ -558,6 +558,9 @@ class Auth:
Returns:
bool: False if no access_token was given, True otherwise.
"""
# This will always be set by the time Twisted calls us.
assert request.args is not None
query_params = request.args.get(b"access_token")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
return bool(query_params) or bool(auth_headers)
@ -574,6 +577,8 @@ class Auth:
MissingClientTokenError: If there isn't a single access_token in the
request
"""
# This will always be set by the time Twisted calls us.
assert request.args is not None
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
query_params = request.args.get(b"access_token")

View File

@ -21,8 +21,10 @@ import signal
import socket
import sys
import traceback
import warnings
from typing import Awaitable, Callable, Iterable
from cryptography.utils import CryptographyDeprecationWarning
from typing_extensions import NoReturn
from twisted.internet import defer, error, reactor
@ -195,6 +197,25 @@ def listen_metrics(bind_addresses, port):
start_http_server(port, addr=host, registry=RegistryProxy)
def listen_manhole(bind_addresses: Iterable[str], port: int, manhole_globals: dict):
# twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing
# warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so
# suppress the warning for now.
warnings.filterwarnings(
action="ignore",
category=CryptographyDeprecationWarning,
message="int_from_bytes is deprecated",
)
from synapse.util.manhole import manhole
listen_tcp(
bind_addresses,
port,
manhole(username="matrix", password="rabbithole", globals=manhole_globals),
)
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses

View File

@ -147,7 +147,6 @@ from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.generic_worker")
@ -640,12 +639,8 @@ class GenericWorkerServer(HomeServer):
if listener.type == "http":
self._listen_http(listener)
elif listener.type == "manhole":
_base.listen_tcp(
listener.bind_addresses,
listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
_base.listen_manhole(
listener.bind_addresses, listener.port, manhole_globals={"hs": self}
)
elif listener.type == "metrics":
if not self.get_config().enable_metrics:
@ -792,13 +787,6 @@ class FederationSenderHandler:
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
def on_start(self):
# There may be some events that are persisted but haven't been sent,
# so send them now.
self.federation_sender.notify_new_events(
self.store.get_room_max_stream_ordering()
)
def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)

View File

@ -67,7 +67,6 @@ from synapse.storage import DataStore
from synapse.storage.engines import IncorrectDatabaseSetup
from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.module_loader import load_module
from synapse.util.versionstring import get_version_string
@ -288,12 +287,8 @@ class SynapseHomeServer(HomeServer):
if listener.type == "http":
self._listening_services.extend(self._listener_http(config, listener))
elif listener.type == "manhole":
listen_tcp(
listener.bind_addresses,
listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
_base.listen_manhole(
listener.bind_addresses, listener.port, manhole_globals={"hs": self}
)
elif listener.type == "replication":
services = listen_tcp(

View File

@ -24,7 +24,7 @@ from ._base import Config, ConfigError
_CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
# Map from canonicalised cache name to cache.
_CACHES = {}
_CACHES = {} # type: Dict[str, Callable[[float], None]]
# a lock on the contents of _CACHES
_CACHES_LOCK = threading.Lock()
@ -59,7 +59,9 @@ def _canonicalise_cache_name(cache_name: str) -> str:
return cache_name.lower()
def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
def add_resizable_cache(
cache_name: str, cache_resize_callback: Callable[[float], None]
):
"""Register a cache that's size can dynamically change
Args:

View File

@ -79,6 +79,9 @@ class OIDCConfig(Config):
# Note that, if this is changed, users authenticating via that provider
# will no longer be recognised as the same user!
#
# (Use "oidc" here if you are migrating from an old "oidc_config"
# configuration.)
#
# idp_name: A user-facing name for this identity provider, which is used to
# offer the user a choice of login mechanisms.
#
@ -247,37 +250,6 @@ class OIDCConfig(Config):
# attribute_requirements:
# - attribute: userGroup
# value: "synapseUsers"
# For use with Keycloak
#
#- idp_id: keycloak
# idp_name: Keycloak
# issuer: "https://127.0.0.1:8443/auth/realms/my_realm_name"
# client_id: "synapse"
# client_secret: "copy secret generated in Keycloak UI"
# scopes: ["openid", "profile"]
# attribute_requirements:
# - attribute: groups
# value: "admin"
# For use with Github
#
#- idp_id: github
# idp_name: Github
# idp_brand: github
# discover: false
# issuer: "https://github.com/"
# client_id: "your-client-id" # TO BE FILLED
# client_secret: "your-client-secret" # TO BE FILLED
# authorization_endpoint: "https://github.com/login/oauth/authorize"
# token_endpoint: "https://github.com/login/oauth/access_token"
# userinfo_endpoint: "https://api.github.com/user"
# scopes: ["read:user"]
# user_mapping_provider:
# config:
# subject_claim: "id"
# localpart_template: "{{{{ user.login }}}}"
# display_name_template: "{{{{ user.name }}}}"
""".format(
mapping_provider=DEFAULT_USER_MAPPING_PROVIDER
)

View File

@ -31,25 +31,39 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
from typing import Dict, List, Tuple, Type
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
List,
Optional,
Sized,
Tuple,
Type,
)
from sortedcontainers import SortedDict
from twisted.internet import defer
from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure
from .units import Edu
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class FederationRemoteSendQueue:
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@ -58,7 +72,7 @@ class FederationRemoteSendQueue:
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
self._sender_positions = {}
self._sender_positions = {} # type: Dict[str, int]
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
@ -71,7 +85,7 @@ class FederationRemoteSendQueue:
# Stream position -> (user_id, destinations)
self.presence_destinations = (
SortedDict()
) # type: SortedDict[int, Tuple[str, List[str]]]
) # type: SortedDict[int, Tuple[str, Iterable[str]]]
# (destination, key) -> EDU
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
@ -94,7 +108,7 @@ class FederationRemoteSendQueue:
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
@ -115,13 +129,13 @@ class FederationRemoteSendQueue:
self.clock.looping_call(self._clear_queue, 30 * 1000)
def _next_pos(self):
def _next_pos(self) -> int:
pos = self.pos
self.pos += 1
self.pos_time[self.clock.time_msec()] = pos
return pos
def _clear_queue(self):
def _clear_queue(self) -> None:
"""Clear the queues for anything older than N minutes"""
FIVE_MINUTES_AGO = 5 * 60 * 1000
@ -138,7 +152,7 @@ class FederationRemoteSendQueue:
self._clear_queue_before_pos(position_to_delete)
def _clear_queue_before_pos(self, position_to_delete):
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
@ -188,13 +202,18 @@ class FederationRemoteSendQueue:
for key in keys[:i]:
del self.edus[key]
def notify_new_events(self, max_token):
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
# This should never get called.
raise NotImplementedError()
def build_and_send_edu(self, destination, edu_type, content, key=None):
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[Hashable] = None,
) -> None:
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
@ -218,38 +237,39 @@ class FederationRemoteSendQueue:
self.notifier.on_new_replication_data()
def send_read_receipt(self, receipt):
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""As per FederationSender
Args:
receipt (synapse.types.ReadReceipt):
receipt:
"""
# nothing to do here: the replication listener will handle it.
return defer.succeed(None)
def send_presence(self, states):
def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Args:
states (list(UserPresenceState))
states
"""
pos = self._next_pos()
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
local_states = [s for s in states if self.is_mine_id(s.user_id)]
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
def send_presence_to_destinations(self, states, destinations):
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
Args:
states (list[UserPresenceState])
destinations (list[str])
states
destinations
"""
for state in states:
pos = self._next_pos()
@ -258,15 +278,18 @@ class FederationRemoteSendQueue:
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
def send_device_messages(self, destination: str) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
def get_current_token(self):
def wake_destination(self, server: str) -> None:
pass
def get_current_token(self) -> int:
return self.pos - 1
def federation_ack(self, instance_name, token):
def federation_ack(self, instance_name: str, token: int) -> None:
if self._sender_instances:
# If we have configured multiple federation sender instances we need
# to track their positions separately, and only clear the queue up
@ -504,13 +527,16 @@ ParsedFederationStreamData = namedtuple(
)
def process_rows_for_federation(transaction_queue, rows):
def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
"""Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args:
transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
transaction_queue
rows
"""
# The federation stream contains a bunch of different types of

View File

@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter
from twisted.internet import defer
import synapse
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
@ -40,9 +40,12 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt, RoomStreamToken
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
@ -65,8 +68,91 @@ CATCH_UP_STARTUP_DELAY_SEC = 15
CATCH_UP_STARTUP_INTERVAL_SEC = 5
class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
class AbstractFederationSender(metaclass=abc.ABCMeta):
@abc.abstractmethod
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
raise NotImplementedError()
@abc.abstractmethod
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Args:
receipt: receipt to be sent
"""
raise NotImplementedError()
@abc.abstractmethod
def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
raise NotImplementedError()
@abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
Args:
destinations:
"""
raise NotImplementedError()
@abc.abstractmethod
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[Hashable] = None,
) -> None:
"""Construct an Edu object, and queue it for sending
Args:
destination: name of server to send to
edu_type: type of EDU to send
content: content of EDU
key: clobbering key for this edu
"""
raise NotImplementedError()
@abc.abstractmethod
def send_device_messages(self, destination: str) -> None:
raise NotImplementedError()
@abc.abstractmethod
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
might have come back.
"""
raise NotImplementedError()
@abc.abstractmethod
def get_current_token(self) -> int:
raise NotImplementedError()
@abc.abstractmethod
def federation_ack(self, instance_name: str, token: int) -> None:
raise NotImplementedError()
@abc.abstractmethod
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
raise NotImplementedError()
class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@ -432,7 +518,7 @@ class FederationSender:
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
async def send_presence(self, states: List[UserPresenceState]):
async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
@ -494,7 +580,7 @@ class FederationSender:
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
async def _process_presence_inner(self, states: List[UserPresenceState]):
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
@ -516,9 +602,9 @@ class FederationSender:
self,
destination: str,
edu_type: str,
content: dict,
content: JsonDict,
key: Optional[Hashable] = None,
):
) -> None:
"""Construct an Edu object, and queue it for sending
Args:
@ -545,7 +631,7 @@ class FederationSender:
self.send_edu(edu, key)
def send_edu(self, edu: Edu, key: Optional[Hashable]):
def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
"""Queue an EDU for sending
Args:
@ -563,7 +649,7 @@ class FederationSender:
else:
queue.send_edu(edu)
def send_device_messages(self, destination: str):
def send_device_messages(self, destination: str) -> None:
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
@ -575,7 +661,7 @@ class FederationSender:
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
@ -599,6 +685,10 @@ class FederationSender:
# to a worker.
return 0
def federation_ack(self, instance_name: str, token: int) -> None:
# It is not expected that this gets called on FederationSender.
raise NotImplementedError()
@staticmethod
async def get_replication_rows(
instance_name: str, from_token: int, to_token: int, target_row_count: int
@ -607,7 +697,7 @@ class FederationSender:
# to a worker.
return [], 0, False
async def _wake_destinations_needing_catchup(self):
async def _wake_destinations_needing_catchup(self) -> None:
"""
Wakes up destinations that need catch-up and are not currently being
backed off from.

View File

@ -149,6 +149,9 @@ class OidcHandler:
Args:
request: the incoming request from the browser.
"""
# This will always be set by the time Twisted calls us.
assert request.args is not None
# The provider might redirect with an error.
# In that case, just display it as-is.
if b"error" in request.args:

View File

@ -71,8 +71,10 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3
logger = logging.getLogger(__name__)
_well_known_cache = TTLCache("well-known")
_had_valid_well_known_cache = TTLCache("had-valid-well-known")
_well_known_cache = TTLCache("well-known") # type: TTLCache[bytes, Optional[bytes]]
_had_valid_well_known_cache = TTLCache(
"had-valid-well-known"
) # type: TTLCache[bytes, bool]
@attr.s(slots=True, frozen=True)
@ -88,8 +90,8 @@ class WellKnownResolver:
reactor: IReactorTime,
agent: IAgent,
user_agent: bytes,
well_known_cache: Optional[TTLCache] = None,
had_well_known_cache: Optional[TTLCache] = None,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
):
self._reactor = reactor
self._clock = Clock(reactor)

View File

@ -169,7 +169,7 @@ import inspect
import logging
import re
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Type
from typing import TYPE_CHECKING, Dict, Optional, Pattern, Type
import attr
@ -262,7 +262,7 @@ logger = logging.getLogger(__name__)
# Block everything by default
# A regex which matches the server_names to expose traces for.
# None means 'block everything'.
_homeserver_whitelist = None
_homeserver_whitelist = None # type: Optional[Pattern[str]]
# Util methods

View File

@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from typing import List, Set
@ -101,7 +102,7 @@ CONDITIONAL_REQUIREMENTS = {
"txacme>=0.9.2",
# txacme depends on eliot. Eliot 1.8.0 is incompatible with
# python 3.5.2, as per https://github.com/itamarst/eliot/issues/418
'eliot<1.8.0;python_version<"3.5.3"',
"eliot<1.8.0;python_version<'3.5.3'",
],
"saml2": [
# pysaml2 6.4.0 is incompatible with Python 3.5 (see https://github.com/IdentityPython/pysaml2/issues/749)
@ -131,6 +132,18 @@ for name, optional_deps in CONDITIONAL_REQUIREMENTS.items():
ALL_OPTIONAL_REQUIREMENTS = set(optional_deps) | ALL_OPTIONAL_REQUIREMENTS
# ensure there are no double-quote characters in any of the deps (otherwise the
# 'pip install' incantation in DependencyException will break)
for dep in itertools.chain(
REQUIREMENTS,
*CONDITIONAL_REQUIREMENTS.values(),
):
if '"' in dep:
raise Exception(
"Dependency `%s` contains double-quote; use single-quotes instead" % (dep,)
)
def list_requirements():
return list(set(REQUIREMENTS) | ALL_OPTIONAL_REQUIREMENTS)
@ -150,7 +163,7 @@ class DependencyException(Exception):
@property
def dependencies(self):
for i in self.args[0]:
yield "'" + i + "'"
yield '"' + i + '"'
def check_requirements(for_feature=None):

View File

@ -312,16 +312,16 @@ class FederationAckCommand(Command):
NAME = "FEDERATION_ACK"
def __init__(self, instance_name, token):
def __init__(self, instance_name: str, token: int):
self.instance_name = instance_name
self.token = token
@classmethod
def from_line(cls, line):
def from_line(cls, line: str) -> "FederationAckCommand":
instance_name, token = line.split(" ")
return cls(instance_name, int(token))
def to_line(self):
def to_line(self) -> str:
return "%s %s" % (self.instance_name, self.token)

View File

@ -104,7 +104,7 @@ tcp_outbound_commands_counter = Counter(
# A list of all connected protocols. This allows us to send metrics about the
# connections.
connected_connections = []
connected_connections = [] # type: List[BaseReplicationStreamProtocol]
logger = logging.getLogger(__name__)

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
from synapse.replication.tcp.streams._base import (
Stream,
@ -21,6 +22,9 @@ from synapse.replication.tcp.streams._base import (
make_http_update_function,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
@ -38,7 +42,7 @@ class FederationStream(Stream):
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
if hs.config.worker_app is None:
# master process: get updates from the FederationRemoteSendQueue.
# (if the master is configured to send federation itself, federation_sender
@ -48,7 +52,9 @@ class FederationStream(Stream):
current_token = current_token_without_instance(
federation_sender.get_current_token
)
update_function = federation_sender.get_replication_rows
update_function = (
federation_sender.get_replication_rows
) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
elif hs.should_send_federation():
# federation sender: Query master process
@ -69,5 +75,7 @@ class FederationStream(Stream):
return 0
@staticmethod
async def _stub_update_function(instance_name, from_token, upto_token, limit):
async def _stub_update_function(
instance_name: str, from_token: int, upto_token: int, limit: int
) -> Tuple[list, int, bool]:
return [], upto_token, False

View File

@ -390,6 +390,9 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, RestServlet):
async def on_POST(
self, request: SynapseRequest, room_identifier: str
) -> Tuple[int, JsonDict]:
# This will always be set by the time Twisted calls us.
assert request.args is not None
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)

View File

@ -833,6 +833,9 @@ class UserMediaRestServlet(RestServlet):
async def on_GET(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
# This will always be set by the time Twisted calls us.
assert request.args is not None
await assert_requester_is_admin(self.auth, request)
if not self.is_mine(UserID.from_string(user_id)):

View File

@ -91,6 +91,9 @@ class SyncRestServlet(RestServlet):
self._event_serializer = hs.get_event_client_serializer()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
# This will always be set by the time Twisted calls us.
assert request.args is not None
if b"from" in request.args:
# /events used to use 'from', but /sync uses 'since'.
# Lets be helpful and whine if we see a 'from'.

View File

@ -187,6 +187,8 @@ class PreviewUrlResource(DirectServeJsonResource):
respond_with_json(request, 200, {}, send_cors=True)
async def _async_render_GET(self, request: SynapseRequest) -> None:
# This will always be set by the time Twisted calls us.
assert request.args is not None
# XXX: if get_user_by_req fails, what should we do in an async render?
requester = await self.auth.get_user_by_req(request)

View File

@ -104,6 +104,9 @@ class AccountDetailsResource(DirectServeHtmlResource):
respond_with_html(request, 200, html)
async def _async_render_POST(self, request: SynapseRequest):
# This will always be set by the time Twisted calls us.
assert request.args is not None
try:
session_id = get_username_mapping_session_cookie_from_request(request)
except SynapseError as e:

View File

@ -60,7 +60,7 @@ from synapse.federation.federation_server import (
FederationServer,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import FederationSender
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
@ -571,7 +571,7 @@ class HomeServer(metaclass=abc.ABCMeta):
return TransportLayerClient(self)
@cache_in_self
def get_federation_sender(self):
def get_federation_sender(self) -> AbstractFederationSender:
if self.should_send_federation():
return FederationSender(self)
elif not self.config.worker_app:

View File

@ -1906,6 +1906,7 @@ class DatabasePool:
retcols: Iterable[str],
filters: Optional[Dict[str, Any]] = None,
keyvalues: Optional[Dict[str, Any]] = None,
exclude_keyvalues: Optional[Dict[str, Any]] = None,
order_direction: str = "ASC",
) -> List[Dict[str, Any]]:
"""
@ -1929,7 +1930,10 @@ class DatabasePool:
apply a WHERE ? LIKE ? clause.
keyvalues:
column names and values to select the rows with, or None to not
apply a WHERE clause.
apply a WHERE key = value clause.
exclude_keyvalues:
column names and values to exclude rows with, or None to not
apply a WHERE key != value clause.
order_direction: Whether the results should be ordered "ASC" or "DESC".
Returns:
@ -1938,7 +1942,7 @@ class DatabasePool:
if order_direction not in ["ASC", "DESC"]:
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
where_clause = "WHERE " if filters or keyvalues else ""
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
arg_list = [] # type: List[Any]
if filters:
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
@ -1947,6 +1951,9 @@ class DatabasePool:
if keyvalues:
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
arg_list += list(keyvalues.values())
if exclude_keyvalues:
where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
arg_list += list(exclude_keyvalues.values())
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
", ".join(retcols),

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Tuple
from typing import Dict, List, Tuple
from synapse.api.presence import UserPresenceState
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@ -157,5 +157,63 @@ class PresenceStore(SQLBaseStore):
return {row["user_id"]: UserPresenceState(**row) for row in rows}
async def get_presence_for_all_users(
self,
include_offline: bool = True,
) -> Dict[str, UserPresenceState]:
"""Retrieve the current presence state for all users.
Note that the presence_stream table is culled frequently, so it should only
contain the latest presence state for each user.
Args:
include_offline: Whether to include offline presence states
Returns:
A dict of user IDs to their current UserPresenceState.
"""
users_to_state = {}
exclude_keyvalues = None
if not include_offline:
# Exclude offline presence state
exclude_keyvalues = {"state": "offline"}
# This may be a very heavy database query.
# We paginate in order to not block a database connection.
limit = 100
offset = 0
while True:
rows = await self.db_pool.runInteraction(
"get_presence_for_all_users",
self.db_pool.simple_select_list_paginate_txn,
"presence_stream",
orderby="stream_id",
start=offset,
limit=limit,
exclude_keyvalues=exclude_keyvalues,
retcols=(
"user_id",
"state",
"last_active_ts",
"last_federation_update_ts",
"last_user_sync_ts",
"status_msg",
"currently_active",
),
order_direction="ASC",
)
for row in rows:
users_to_state[row["user_id"]] = UserPresenceState(**row)
# We've run out of updates to query
if len(rows) < limit:
break
offset += limit
return users_to_state
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()

View File

@ -183,12 +183,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
requests state from the cache, if False we need to query the DB for the
missing state.
"""
is_all, known_absent, state_dict_ids = cache.get(group)
cache_entry = cache.get(group)
state_dict_ids = cache_entry.value
if is_all or state_filter.is_full():
if cache_entry.full or state_filter.is_full():
# Either we have everything or want everything, either way
# `is_all` tells us whether we've gotten everything.
return state_filter.filter_state(state_dict_ids), is_all
return state_filter.filter_state(state_dict_ids), cache_entry.full
# tracks whether any of our requested types are missing from the cache
missing_types = False
@ -202,7 +203,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
# There aren't any wild cards, so `concrete_types()` returns the
# complete list of event types we're wanting.
for key in state_filter.concrete_types():
if key not in state_dict_ids and key not in known_absent:
if key not in state_dict_ids and key not in cache_entry.known_absent:
missing_types = True
break

View File

@ -25,8 +25,8 @@ from synapse.config.cache import add_resizable_cache
logger = logging.getLogger(__name__)
caches_by_name = {}
collectors_by_name = {} # type: Dict
caches_by_name = {} # type: Dict[str, Sized]
collectors_by_name = {} # type: Dict[str, CacheMetric]
cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])

View File

@ -15,26 +15,38 @@
import enum
import logging
import threading
from collections import namedtuple
from typing import Any
from typing import Any, Dict, Generic, Iterable, Optional, Set, TypeVar
import attr
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))):
# The type of the cache keys.
KT = TypeVar("KT")
# The type of the dictionary keys.
DKT = TypeVar("DKT")
@attr.s(slots=True)
class DictionaryEntry:
"""Returned when getting an entry from the cache
Attributes:
full (bool): Whether the cache has the full or dict or just some keys.
full: Whether the cache has the full or dict or just some keys.
If not full then not all requested keys will necessarily be present
in `value`
known_absent (set): Keys that were looked up in the dict and were not
known_absent: Keys that were looked up in the dict and were not
there.
value (dict): The full or partial dict value
value: The full or partial dict value
"""
full = attr.ib(type=bool)
known_absent = attr.ib()
value = attr.ib()
def __len__(self):
return len(self.value)
@ -45,21 +57,21 @@ class _Sentinel(enum.Enum):
sentinel = object()
class DictionaryCache:
class DictionaryCache(Generic[KT, DKT]):
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
fetching a subset of dictionary keys for a particular key.
"""
def __init__(self, name, max_entries=1000):
def __init__(self, name: str, max_entries: int = 1000):
self.cache = LruCache(
max_size=max_entries, cache_name=name, size_callback=len
) # type: LruCache[Any, DictionaryEntry]
) # type: LruCache[KT, DictionaryEntry]
self.name = name
self.sequence = 0
self.thread = None
self.thread = None # type: Optional[threading.Thread]
def check_thread(self):
def check_thread(self) -> None:
expected_thread = self.thread
if expected_thread is None:
self.thread = threading.current_thread()
@ -69,12 +81,14 @@ class DictionaryCache:
"Cache objects can only be accessed from the main thread"
)
def get(self, key, dict_keys=None):
def get(
self, key: KT, dict_keys: Optional[Iterable[DKT]] = None
) -> DictionaryEntry:
"""Fetch an entry out of the cache
Args:
key
dict_key(list): If given a set of keys then return only those keys
dict_key: If given a set of keys then return only those keys
that exist in the cache.
Returns:
@ -95,7 +109,7 @@ class DictionaryCache:
return DictionaryEntry(False, set(), {})
def invalidate(self, key):
def invalidate(self, key: KT) -> None:
self.check_thread()
# Increment the sequence number so that any SELECT statements that
@ -103,19 +117,25 @@ class DictionaryCache:
self.sequence += 1
self.cache.pop(key, None)
def invalidate_all(self):
def invalidate_all(self) -> None:
self.check_thread()
self.sequence += 1
self.cache.clear()
def update(self, sequence, key, value, fetched_keys=None):
def update(
self,
sequence: int,
key: KT,
value: Dict[DKT, Any],
fetched_keys: Optional[Set[DKT]] = None,
) -> None:
"""Updates the entry in the cache
Args:
sequence
key (K)
value (dict[X,Y]): The value to update the cache with.
fetched_keys (None|set[X]): All of the dictionary keys which were
key
value: The value to update the cache with.
fetched_keys: All of the dictionary keys which were
fetched from the database.
If None, this is the complete value for key K. Otherwise, it
@ -131,7 +151,9 @@ class DictionaryCache:
else:
self._update_or_insert(key, value, fetched_keys)
def _update_or_insert(self, key, value, known_absent):
def _update_or_insert(
self, key: KT, value: Dict[DKT, Any], known_absent: Set[DKT]
) -> None:
# We pop and reinsert as we need to tell the cache the size may have
# changed
@ -140,5 +162,5 @@ class DictionaryCache:
entry.known_absent.update(known_absent)
self.cache[key] = entry
def _insert(self, key, value, known_absent):
def _insert(self, key: KT, value: Dict[DKT, Any], known_absent: Set[DKT]) -> None:
self.cache[key] = DictionaryEntry(True, known_absent, value)

View File

@ -15,6 +15,7 @@
import logging
import time
from typing import Any, Callable, Dict, Generic, Tuple, TypeVar, Union
import attr
from sortedcontainers import SortedList
@ -23,15 +24,19 @@ from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
SENTINEL = object()
SENTINEL = object() # type: Any
T = TypeVar("T")
KT = TypeVar("KT")
VT = TypeVar("VT")
class TTLCache:
class TTLCache(Generic[KT, VT]):
"""A key/value cache implementation where each entry has its own TTL"""
def __init__(self, cache_name, timer=time.time):
def __init__(self, cache_name: str, timer: Callable[[], float] = time.time):
# map from key to _CacheEntry
self._data = {}
self._data = {} # type: Dict[KT, _CacheEntry]
# the _CacheEntries, sorted by expiry time
self._expiry_list = SortedList() # type: SortedList[_CacheEntry]
@ -40,26 +45,27 @@ class TTLCache:
self._metrics = register_cache("ttl", cache_name, self, resizable=False)
def set(self, key, value, ttl):
def set(self, key: KT, value: VT, ttl: float) -> None:
"""Add/update an entry in the cache
Args:
key: key for this entry
value: value for this entry
ttl (float): TTL for this entry, in seconds
ttl: TTL for this entry, in seconds
"""
expiry = self._timer() + ttl
self.expire()
e = self._data.pop(key, SENTINEL)
if e != SENTINEL:
if e is not SENTINEL:
assert isinstance(e, _CacheEntry)
self._expiry_list.remove(e)
entry = _CacheEntry(expiry_time=expiry, ttl=ttl, key=key, value=value)
self._data[key] = entry
self._expiry_list.add(entry)
def get(self, key, default=SENTINEL):
def get(self, key: KT, default: T = SENTINEL) -> Union[VT, T]:
"""Get a value from the cache
Args:
@ -72,23 +78,23 @@ class TTLCache:
"""
self.expire()
e = self._data.get(key, SENTINEL)
if e == SENTINEL:
if e is SENTINEL:
self._metrics.inc_misses()
if default == SENTINEL:
if default is SENTINEL:
raise KeyError(key)
return default
assert isinstance(e, _CacheEntry)
self._metrics.inc_hits()
return e.value
def get_with_expiry(self, key):
def get_with_expiry(self, key: KT) -> Tuple[VT, float, float]:
"""Get a value, and its expiry time, from the cache
Args:
key: key to look up
Returns:
Tuple[Any, float, float]: the value from the cache, the expiry time
and the TTL
A tuple of the value from the cache, the expiry time and the TTL
Raises:
KeyError if the entry is not found
@ -102,7 +108,7 @@ class TTLCache:
self._metrics.inc_hits()
return e.value, e.expiry_time, e.ttl
def pop(self, key, default=SENTINEL):
def pop(self, key: KT, default: T = SENTINEL) -> Union[VT, T]: # type: ignore
"""Remove a value from the cache
If key is in the cache, remove it and return its value, else return default.
@ -118,29 +124,30 @@ class TTLCache:
"""
self.expire()
e = self._data.pop(key, SENTINEL)
if e == SENTINEL:
if e is SENTINEL:
self._metrics.inc_misses()
if default == SENTINEL:
if default is SENTINEL:
raise KeyError(key)
return default
assert isinstance(e, _CacheEntry)
self._expiry_list.remove(e)
self._metrics.inc_hits()
return e.value
def __getitem__(self, key):
def __getitem__(self, key: KT) -> VT:
return self.get(key)
def __delitem__(self, key):
def __delitem__(self, key: KT) -> None:
self.pop(key)
def __contains__(self, key):
def __contains__(self, key: KT) -> bool:
return key in self._data
def __len__(self):
def __len__(self) -> int:
self.expire()
return len(self._data)
def expire(self):
def expire(self) -> None:
"""Run the expiry on the cache. Any entries whose expiry times are due will
be removed
"""
@ -158,7 +165,7 @@ class _CacheEntry:
"""TTLCache entry"""
# expiry_time is the first attribute, so that entries are sorted by expiry.
expiry_time = attr.ib()
ttl = attr.ib()
expiry_time = attr.ib(type=float)
ttl = attr.ib(type=float)
key = attr.ib()
value = attr.ib()

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# This script builds the Docker image to run the PostgreSQL tests, and then runs
# the tests.

View File

@ -44,7 +44,7 @@ from tests.server import FakeTransport
try:
import hiredis
except ImportError:
hiredis = None
hiredis = None # type: ignore
logger = logging.getLogger(__name__)

View File

@ -69,6 +69,7 @@ class TypingStreamTestCase(BaseStreamTestCase):
self.assert_request_is_get_repl_stream_updates(request, "typing")
# The from token should be the token from the last RDATA we got.
assert request.args is not None
self.assertEqual(int(request.args[b"from_token"][0]), token)
self.test_handler.on_rdata.assert_called_once()

View File

@ -15,7 +15,7 @@
import logging
import os
from binascii import unhexlify
from typing import Tuple
from typing import Optional, Tuple
from twisted.internet.protocol import Factory
from twisted.protocols.tls import TLSMemoryBIOFactory
@ -32,7 +32,7 @@ from tests.server import FakeChannel, FakeSite, FakeTransport, make_request
logger = logging.getLogger(__name__)
test_server_connection_factory = None
test_server_connection_factory = None # type: Optional[TestServerTLSConnectionFactory]
class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase):

View File

@ -2,7 +2,7 @@ import json
import logging
from collections import deque
from io import SEEK_END, BytesIO
from typing import Callable, Iterable, MutableMapping, Optional, Tuple, Union
from typing import Callable, Dict, Iterable, MutableMapping, Optional, Tuple, Union
import attr
from typing_extensions import Deque
@ -13,8 +13,11 @@ from twisted.internet._resolver import SimpleResolverComplexifier
from twisted.internet.defer import Deferred, fail, succeed
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import (
IHostnameResolver,
IProtocol,
IPullProducer,
IPushProducer,
IReactorPluggableNameResolver,
IReactorTCP,
IResolverSimple,
ITransport,
)
@ -45,11 +48,11 @@ class FakeChannel:
wire).
"""
site = attr.ib(type=Site)
site = attr.ib(type=Union[Site, "FakeSite"])
_reactor = attr.ib()
result = attr.ib(type=dict, default=attr.Factory(dict))
_ip = attr.ib(type=str, default="127.0.0.1")
_producer = None
_producer = None # type: Optional[Union[IPullProducer, IPushProducer]]
@property
def json_body(self):
@ -159,7 +162,11 @@ class FakeChannel:
Any cookines found are added to the given dict
"""
for h in self.headers.getRawHeaders("Set-Cookie"):
headers = self.headers.getRawHeaders("Set-Cookie")
if not headers:
return
for h in headers:
parts = h.split(";")
k, v = parts[0].split("=", maxsplit=1)
cookies[k] = v
@ -311,8 +318,8 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
self._tcp_callbacks = {}
self._udp = []
lookups = self.lookups = {}
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()
lookups = self.lookups = {} # type: Dict[str, str]
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]
@implementer(IResolverSimple)
class FakeResolver:
@ -324,6 +331,9 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
self.nameResolver = SimpleResolverComplexifier(FakeResolver())
super().__init__()
def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
raise NotImplementedError()
def listenUDP(self, port, protocol, interface="", maxPacketSize=8196):
p = udp.Port(port, protocol, interface, maxPacketSize, self)
p.startListening()
@ -621,7 +631,9 @@ class FakeTransport:
self.disconnected = True
def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
def connect_client(
reactor: ThreadedMemoryReactorClock, client_id: int
) -> Tuple[IProtocol, AccumulatingProtocol]:
"""
Connect a client to a fake TCP transport.

View File

@ -377,14 +377,11 @@ class StateStoreTestCase(tests.unittest.TestCase):
#######################################################
# deliberately remove e2 (room name) from the _state_group_cache
(
is_all,
known_absent,
state_dict_ids,
) = self.state_datastore._state_group_cache.get(group)
cache_entry = self.state_datastore._state_group_cache.get(group)
state_dict_ids = cache_entry.value
self.assertEqual(is_all, True)
self.assertEqual(known_absent, set())
self.assertEqual(cache_entry.full, True)
self.assertEqual(cache_entry.known_absent, set())
self.assertDictEqual(
state_dict_ids,
{
@ -403,14 +400,11 @@ class StateStoreTestCase(tests.unittest.TestCase):
fetched_keys=((e1.type, e1.state_key),),
)
(
is_all,
known_absent,
state_dict_ids,
) = self.state_datastore._state_group_cache.get(group)
cache_entry = self.state_datastore._state_group_cache.get(group)
state_dict_ids = cache_entry.value
self.assertEqual(is_all, False)
self.assertEqual(known_absent, {(e1.type, e1.state_key)})
self.assertEqual(cache_entry.full, False)
self.assertEqual(cache_entry.known_absent, {(e1.type, e1.state_key)})
self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id})
############################################

View File

@ -27,7 +27,9 @@ class DictCacheTestCase(unittest.TestCase):
key = "test_simple_cache_hit_full"
v = self.cache.get(key)
self.assertEqual((False, set(), {}), v)
self.assertIs(v.full, False)
self.assertEqual(v.known_absent, set())
self.assertEqual({}, v.value)
seq = self.cache.sequence
test_value = {"test": "test_simple_cache_hit_full"}