Merge branch 'develop' of github.com:matrix-org/synapse into anoa/async_is_server_admin

* 'develop' of github.com:matrix-org/synapse: (75 commits)
  Add instance name to RDATA/POSITION commands (#7364)
  Don't relay REMOTE_SERVER_UP cmds to same conn. (#7352)
  Fix limit logic for EventsStream (#7358)
  Fix fallback value for account_threepid_delegates.email (#7316)
  Clean up admin api docs (#7361)
  Return total number of users and profile attributes in admin users endpoint (#6881)
  Add some replication tests (#7278)
  Fix typo 'datbases' in ConfigError
  Fix collation for postgres for unit tests (#7359)
  Run replication streamers on workers (#7146)
  Fix incorrect metrics reporting for renew_attestations (#7344)
  Document monitoring workers (#7357)
  Add some explanation to application_services.md (#7091)
  Don't crash when one of the configuration files is empty (#7341)
  Add documentation to the sample config about the templates for SSO. (#7343)
  Convert some of the federation handler methods to async/await. (#7338)
  changelog
  Fix EventsStream raising assertions when it falls behind
  1.12.4
  Make it clear that the limit for an update_function is a target
  ...
pull/7363/head
Andrew Morgan 2020-04-30 01:05:39 +01:00
commit 11430c07b6
166 changed files with 5135 additions and 1744 deletions

View File

@ -5,8 +5,6 @@ Message history can be paginated
Can re-join room if re-invited
/upgrade creates a new room
The only membership state included in an initial sync is for all the senders in the timeline
Local device key changes get to remote servers

View File

@ -1,10 +1,39 @@
Next version
============
* A new template (`sso_auth_confirm.html`) was added to Synapse. If your Synapse
is configured to use SSO and a custom `sso_redirect_confirm_template_dir`
configuration then this template will need to be duplicated into that
directory.
* New templates (`sso_auth_confirm.html`, `sso_auth_success.html`, and
`sso_account_deactivated.html`) were added to Synapse. If your Synapse is
configured to use SSO and a custom `sso_redirect_confirm_template_dir`
configuration then these templates will need to be duplicated into that
directory.
* Plugins using the `complete_sso_login` method of `synapse.module_api.ModuleApi`
should update to using the async/await version `complete_sso_login_async` which
includes additional checks. The non-async version is considered deprecated.
Synapse 1.12.4 (2020-04-23)
===========================
No significant changes.
Synapse 1.12.4rc1 (2020-04-22)
==============================
Features
--------
- Always send users their own device updates. ([\#7160](https://github.com/matrix-org/synapse/issues/7160))
- Add support for handling GET requests for `account_data` on a worker. ([\#7311](https://github.com/matrix-org/synapse/issues/7311))
Bugfixes
--------
- Fix a bug that prevented cross-signing with users on worker-mode synapses. ([\#7255](https://github.com/matrix-org/synapse/issues/7255))
- Do not treat display names as globs in push rules. ([\#7271](https://github.com/matrix-org/synapse/issues/7271))
- Fix a bug with cross-signing devices belonging to remote users who did not share a room with any user on the local homeserver. ([\#7289](https://github.com/matrix-org/synapse/issues/7289))
Synapse 1.12.3 (2020-04-03)
===========================
@ -15,14 +44,10 @@ correctly fix the issue with building the Debian packages. ([\#7212](https://git
Synapse 1.12.2 (2020-04-02)
===========================
This release works around [an
issue](https://github.com/matrix-org/synapse/issues/7208) with building the
debian packages.
This release works around [an issue](https://github.com/matrix-org/synapse/issues/7208) with building the debian packages.
No other significant changes since 1.12.1.
>>>>>>> master
Synapse 1.12.1 (2020-04-02)
===========================
@ -42,12 +67,19 @@ Bugfixes
Synapse 1.12.0 (2020-03-23)
===========================
No significant changes since 1.12.0rc1.
Debian packages and Docker images are rebuilt using the latest versions of
dependency libraries, including Twisted 20.3.0. **Please see security advisory
below**.
Potential slow database update during upgrade
---------------------------------------------
Synapse 1.12.0 includes a database update which is run as part of the upgrade,
and which may take some time (several hours in the case of a large
server). Synapse will not respond to HTTP requests while this update is taking
place. For imformation on seeing if you are affected, and workaround if you
are, see the [upgrade notes](UPGRADE.rst#upgrading-to-v1120).
Security advisory
-----------------

View File

@ -75,6 +75,71 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.12.0
====================
This version includes a database update which is run as part of the upgrade,
and which may take some time (several hours in the case of a large
server). Synapse will not respond to HTTP requests while this update is taking
place.
This is only likely to be a problem in the case of a server which is
participating in many rooms.
0. As with all upgrades, it is recommended that you have a recent backup of
your database which can be used for recovery in the event of any problems.
1. As an initial check to see if you will be affected, you can try running the
following query from the `psql` or `sqlite3` console. It is safe to run it
while Synapse is still running.
.. code:: sql
SELECT MAX(q.v) FROM (
SELECT (
SELECT ej.json AS v
FROM state_events se INNER JOIN event_json ej USING (event_id)
WHERE se.room_id=rooms.room_id AND se.type='m.room.create' AND se.state_key=''
LIMIT 1
) FROM rooms WHERE rooms.room_version IS NULL
) q;
This query will take about the same amount of time as the upgrade process: ie,
if it takes 5 minutes, then it is likely that Synapse will be unresponsive for
5 minutes during the upgrade.
If you consider an outage of this duration to be acceptable, no further
action is necessary and you can simply start Synapse 1.12.0.
If you would prefer to reduce the downtime, continue with the steps below.
2. The easiest workaround for this issue is to manually
create a new index before upgrading. On PostgreSQL, his can be done as follows:
.. code:: sql
CREATE INDEX CONCURRENTLY tmp_upgrade_1_12_0_index
ON state_events(room_id) WHERE type = 'm.room.create';
The above query may take some time, but is also safe to run while Synapse is
running.
We assume that no SQLite users have databases large enough to be
affected. If you *are* affected, you can run a similar query, omitting the
``CONCURRENTLY`` keyword. Note however that this operation may in itself cause
Synapse to stop running for some time. Synapse admins are reminded that
`SQLite is not recommended for use outside a test
environment <https://github.com/matrix-org/synapse/blob/master/README.rst#using-postgresql>`_.
3. Once the index has been created, the ``SELECT`` query in step 1 above should
complete quickly. It is therefore safe to upgrade to Synapse 1.12.0.
4. Once Synapse 1.12.0 has successfully started and is responding to HTTP
requests, the temporary index can be removed:
.. code:: sql
DROP INDEX tmp_upgrade_1_12_0_index;
Upgrading to v1.10.0
====================

1
changelog.d/6881.misc Normal file
View File

@ -0,0 +1 @@
Return total number of users and profile attributes in admin users endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.

1
changelog.d/7040.feature Normal file
View File

@ -0,0 +1 @@
Add support for running replication over Redis when using workers.

1
changelog.d/7091.doc Normal file
View File

@ -0,0 +1 @@
Improve the documentation of application service configuration files.

1
changelog.d/7146.misc Normal file
View File

@ -0,0 +1 @@
Run replication streamers on workers.

View File

@ -1 +0,0 @@
Always send users their own device updates.

1
changelog.d/7213.misc Normal file
View File

@ -0,0 +1 @@
Add explicit Python build tooling as dependencies for the snapcraft build.

1
changelog.d/7225.misc Normal file
View File

@ -0,0 +1 @@
Extend room admin api (`GET /_synapse/admin/v1/rooms`) with additional attributes.

1
changelog.d/7228.misc Normal file
View File

@ -0,0 +1 @@
Unblacklist '/upgrade creates a new room' sytest for workers.

1
changelog.d/7230.feature Normal file
View File

@ -0,0 +1 @@
Require admin privileges to enable room encryption by default. This does not affect existing rooms.

1
changelog.d/7236.misc Normal file
View File

@ -0,0 +1 @@
Upgrade jQuery to v3.4.1 on fallback login/registration pages.

1
changelog.d/7237.misc Normal file
View File

@ -0,0 +1 @@
Change log line that told user to implement onLogin/onRegister fallback js functions to a warning, instead of an info, so it's more visible.

1
changelog.d/7240.bugfix Normal file
View File

@ -0,0 +1 @@
Do not allow a deactivated user to login via SSO.

1
changelog.d/7243.misc Normal file
View File

@ -0,0 +1 @@
Correct the parameters of a test fixture. Contributed by Isaiah Singletary.

1
changelog.d/7248.doc Normal file
View File

@ -0,0 +1 @@
Add documentation to the `password_providers` config option. Add known password provider implementations to docs.

1
changelog.d/7249.bugfix Normal file
View File

@ -0,0 +1 @@
Fix --help command-line argument.

1
changelog.d/7251.doc Normal file
View File

@ -0,0 +1 @@
Modify suggested nginx reverse proxy configuration to match Synapse's default file upload size. Contributed by @ProCycleDev.

1
changelog.d/7259.bugfix Normal file
View File

@ -0,0 +1 @@
Do not allow a deactivated user to login via SSO.

1
changelog.d/7260.bugfix Normal file
View File

@ -0,0 +1 @@
Fix room publish permissions not being checked on room creation.

1
changelog.d/7261.misc Normal file
View File

@ -0,0 +1 @@
Convert auth handler to async/await.

1
changelog.d/7265.feature Normal file
View File

@ -0,0 +1 @@
Add a config option for specifying the value of the Accept-Language HTTP header when generating URL previews.

1
changelog.d/7268.bugfix Normal file
View File

@ -0,0 +1 @@
Reject unknown session IDs during user interactive authentication instead of silently creating a new session.

1
changelog.d/7272.doc Normal file
View File

@ -0,0 +1 @@
Documentation of media_storage_providers options updated to avoid misunderstandings. Contributed by Tristan Lins.

1
changelog.d/7274.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a sql query introduced in Synapse 1.12.0 which could cause large amounts of logging to the postgres slow-query log.

1
changelog.d/7278.misc Normal file
View File

@ -0,0 +1 @@
Add some unit tests for replication.

1
changelog.d/7279.feature Normal file
View File

@ -0,0 +1 @@
Support SSO in the user interactive authentication workflow.

1
changelog.d/7286.misc Normal file
View File

@ -0,0 +1 @@
Move catchup of replication streams logic to worker.

1
changelog.d/7290.misc Normal file
View File

@ -0,0 +1 @@
Move catchup of replication streams logic to worker.

1
changelog.d/7291.misc Normal file
View File

@ -0,0 +1 @@
Improve typing annotations in `synapse.replication.tcp.streams.Stream`.

1
changelog.d/7295.misc Normal file
View File

@ -0,0 +1 @@
Reduce log verbosity of url cache cleanup tasks.

1
changelog.d/7300.misc Normal file
View File

@ -0,0 +1 @@
Fix sample SAML Service Provider configuration. Contributed by @frcl.

1
changelog.d/7303.misc Normal file
View File

@ -0,0 +1 @@
Fix StreamChangeCache to work with multiple entities changing on the same stream id.

1
changelog.d/7315.feature Normal file
View File

@ -0,0 +1 @@
Allow `/requestToken` endpoints to hide the existence (or lack thereof) of 3PID associations on the homeserver.

1
changelog.d/7316.bugfix Normal file
View File

@ -0,0 +1 @@
Fixed backwards compatibility logic of the first value of `trusted_third_party_id_servers` being used for `account_threepid_delegates.email`, which occurs when the former, deprecated option is set and the latter is not.

1
changelog.d/7318.misc Normal file
View File

@ -0,0 +1 @@
Move catchup of replication streams logic to worker.

1
changelog.d/7319.misc Normal file
View File

@ -0,0 +1 @@
Fix an incorrect import in IdentityHandler.

1
changelog.d/7321.misc Normal file
View File

@ -0,0 +1 @@
Reduce logging verbosity for successful federation requests.

1
changelog.d/7325.feature Normal file
View File

@ -0,0 +1 @@
Add support for running replication over Redis when using workers.

1
changelog.d/7326.misc Normal file
View File

@ -0,0 +1 @@
Move catchup of replication streams logic to worker.

1
changelog.d/7337.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind.

1
changelog.d/7338.misc Normal file
View File

@ -0,0 +1 @@
Convert some federation handler code to async/await.

1
changelog.d/7341.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bad error handling that would cause Synapse to crash if it's provided with a YAML configuration file that's either empty or doesn't parse into a key-value map.

1
changelog.d/7343.feature Normal file
View File

@ -0,0 +1 @@
Support SSO in the user interactive authentication workflow.

1
changelog.d/7344.bugfix Normal file
View File

@ -0,0 +1 @@
Fix incorrect metrics reporting for `renew_attestations` background task.

1
changelog.d/7352.feature Normal file
View File

@ -0,0 +1 @@
Add support for running replication over Redis when using workers.

1
changelog.d/7357.doc Normal file
View File

@ -0,0 +1 @@
Add documentation on monitoring workers with Prometheus.

1
changelog.d/7358.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind.

1
changelog.d/7359.misc Normal file
View File

@ -0,0 +1 @@
Fix collation for postgres for unit tests.

1
changelog.d/7361.doc Normal file
View File

@ -0,0 +1 @@
Clarify endpoint usage in the users admin api documentation.

1
changelog.d/7364.misc Normal file
View File

@ -0,0 +1 @@
Add an `instance_name` to `RDATA` and `POSITION` replication commands.

8
debian/changelog vendored
View File

@ -1,8 +1,16 @@
<<<<<<< HEAD
matrix-synapse-py3 (1.12.3ubuntu1) UNRELEASED; urgency=medium
* Add information about .well-known files to Debian installation scripts.
-- Patrick Cloke <patrickc@matrix.org> Mon, 06 Apr 2020 10:10:38 -0400
=======
matrix-synapse-py3 (1.12.4) stable; urgency=medium
* New synapse release 1.12.4.
-- Synapse Packaging team <packages@matrix.org> Thu, 23 Apr 2020 10:58:14 -0400
>>>>>>> master
matrix-synapse-py3 (1.12.3) stable; urgency=medium

View File

@ -11,8 +11,21 @@ The following query parameters are available:
* `from` - Offset in the returned list. Defaults to `0`.
* `limit` - Maximum amount of rooms to return. Defaults to `100`.
* `order_by` - The method in which to sort the returned list of rooms. Valid values are:
- `alphabetical` - Rooms are ordered alphabetically by room name. This is the default.
- `size` - Rooms are ordered by the number of members. Largest to smallest.
- `alphabetical` - Same as `name`. This is deprecated.
- `size` - Same as `joined_members`. This is deprecated.
- `name` - Rooms are ordered alphabetically by room name. This is the default.
- `canonical_alias` - Rooms are ordered alphabetically by main alias address of the room.
- `joined_members` - Rooms are ordered by the number of members. Largest to smallest.
- `joined_local_members` - Rooms are ordered by the number of local members. Largest to smallest.
- `version` - Rooms are ordered by room version. Largest to smallest.
- `creator` - Rooms are ordered alphabetically by creator of the room.
- `encryption` - Rooms are ordered alphabetically by the end-to-end encryption algorithm.
- `federatable` - Rooms are ordered by whether the room is federatable.
- `public` - Rooms are ordered by visibility in room list.
- `join_rules` - Rooms are ordered alphabetically by join rules of the room.
- `guest_access` - Rooms are ordered alphabetically by guest access option of the room.
- `history_visibility` - Rooms are ordered alphabetically by visibility of history of the room.
- `state_events` - Rooms are ordered by number of state events. Largest to smallest.
* `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.
* `search_term` - Filter rooms by their room name. Search term can be contained in any
@ -26,6 +39,16 @@ The following fields are possible in the JSON response body:
- `name` - The name of the room.
- `canonical_alias` - The canonical (main) alias address of the room.
- `joined_members` - How many users are currently in the room.
- `joined_local_members` - How many local users are currently in the room.
- `version` - The version of the room as a string.
- `creator` - The `user_id` of the room creator.
- `encryption` - Algorithm of end-to-end encryption of messages. Is `null` if encryption is not active.
- `federatable` - Whether users on other servers can join this room.
- `public` - Whether the room is visible in room directory.
- `join_rules` - The type of rules used for users wishing to join this room. One of: ["public", "knock", "invite", "private"].
- `guest_access` - Whether guests can join the room. One of: ["can_join", "forbidden"].
- `history_visibility` - Who can see the room history. One of: ["invited", "joined", "shared", "world_readable"].
- `state_events` - Total number of state_events of a room. Complexity of the room.
* `offset` - The current pagination offset in rooms. This parameter should be
used instead of `next_token` for room offset as `next_token` is
not intended to be parsed.
@ -60,14 +83,34 @@ Response:
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"name": "Matrix HQ",
"canonical_alias": "#matrix:matrix.org",
"joined_members": 8326
"joined_members": 8326,
"joined_local_members": 2,
"version": "1",
"creator": "@foo:matrix.org",
"encryption": null,
"federatable": true,
"public": true,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 93534
},
... (8 hidden items) ...
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
"joined_members": 314,
"joined_local_members": 20,
"version": "4",
"creator": "@foo:matrix.org",
"encryption": "m.megolm.v1.aes-sha2",
"federatable": true,
"public": false,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 8345
}
],
"offset": 0,
@ -92,7 +135,17 @@ Response:
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
"joined_members": 314,
"joined_local_members": 20,
"version": "4",
"creator": "@foo:matrix.org",
"encryption": "m.megolm.v1.aes-sha2",
"federatable": true,
"public": false,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 8
}
],
"offset": 0,
@ -117,14 +170,34 @@ Response:
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"name": "Matrix HQ",
"canonical_alias": "#matrix:matrix.org",
"joined_members": 8326
"joined_members": 8326,
"joined_local_members": 2,
"version": "1",
"creator": "@foo:matrix.org",
"encryption": null,
"federatable": true,
"public": true,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 93534
},
... (98 hidden items) ...
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
"joined_members": 314,
"joined_local_members": 20,
"version": "4",
"creator": "@foo:matrix.org",
"encryption": "m.megolm.v1.aes-sha2",
"federatable": true,
"public": false,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 8345
}
],
"offset": 0,
@ -154,6 +227,16 @@ Response:
"name": "Music Theory",
"canonical_alias": "#musictheory:matrix.org",
"joined_members": 127
"joined_local_members": 2,
"version": "1",
"creator": "@foo:matrix.org",
"encryption": null,
"federatable": true,
"public": true,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 93534
},
... (48 hidden items) ...
{
@ -161,6 +244,16 @@ Response:
"name": "weechat-matrix",
"canonical_alias": "#weechat-matrix:termina.org.uk",
"joined_members": 137
"joined_local_members": 20,
"version": "4",
"creator": "@foo:termina.org.uk",
"encryption": null,
"federatable": true,
"public": true,
"join_rules": "invite",
"guest_access": null,
"history_visibility": "shared",
"state_events": 8345
}
],
"offset": 100,

View File

@ -33,12 +33,22 @@ with a body of:
including an ``access_token`` of a server admin.
The parameter ``displayname`` is optional and defaults to ``user_id``.
The parameter ``threepids`` is optional.
The parameter ``avatar_url`` is optional.
The parameter ``admin`` is optional and defaults to 'false'.
The parameter ``deactivated`` is optional and defaults to 'false'.
The parameter ``password`` is optional. If provided the user's password is updated and all devices are logged out.
The parameter ``displayname`` is optional and defaults to the value of
``user_id``.
The parameter ``threepids`` is optional and allows setting the third-party IDs
(email, msisdn) belonging to a user.
The parameter ``avatar_url`` is optional. Must be a [MXC
URI](https://matrix.org/docs/spec/client_server/r0.6.0#matrix-content-mxc-uris).
The parameter ``admin`` is optional and defaults to ``false``.
The parameter ``deactivated`` is optional and defaults to ``false``.
The parameter ``password`` is optional. If provided, the user's password is
updated and all devices are logged out.
If the user already exists then optional parameters default to the current value.
List Accounts
@ -51,16 +61,25 @@ The api is::
GET /_synapse/admin/v2/users?from=0&limit=10&guests=false
including an ``access_token`` of a server admin.
The parameters ``from`` and ``limit`` are required only for pagination.
By default, a ``limit`` of 100 is used.
The parameter ``user_id`` can be used to select only users with user ids that
contain this value.
The parameter ``guests=false`` can be used to exclude guest users,
default is to include guest users.
The parameter ``deactivated=true`` can be used to include deactivated users,
default is to exclude deactivated users.
If the endpoint does not return a ``next_token`` then there are no more users left.
It returns a JSON body like the following:
The parameter ``from`` is optional but used for pagination, denoting the
offset in the returned results. This should be treated as an opaque value and
not explicitly set to anything other than the return value of ``next_token``
from a previous call.
The parameter ``limit`` is optional but is used for pagination, denoting the
maximum number of items to return in this call. Defaults to ``100``.
The parameter ``user_id`` is optional and filters to only users with user IDs
that contain this value.
The parameter ``guests`` is optional and if ``false`` will **exclude** guest users.
Defaults to ``true`` to include guest users.
The parameter ``deactivated`` is optional and if ``true`` will **include** deactivated users.
Defaults to ``false`` to exclude deactivated users.
A JSON body is returned with the following shape:
.. code:: json
@ -72,19 +91,29 @@ It returns a JSON body like the following:
"is_guest": 0,
"admin": 0,
"user_type": null,
"deactivated": 0
"deactivated": 0,
"displayname": "<User One>",
"avatar_url": null
}, {
"name": "<user_id2>",
"password_hash": "<password_hash2>",
"is_guest": 0,
"admin": 1,
"user_type": null,
"deactivated": 0
"deactivated": 0,
"displayname": "<User Two>",
"avatar_url": "<avatar_url>"
}
],
"next_token": "100"
"next_token": "100",
"total": 200
}
To paginate, check for ``next_token`` and if present, call the endpoint again
with ``from`` set to the value of ``next_token``. This will return a new page.
If the endpoint does not return a ``next_token`` then there are no more users
to paginate through.
Query Account
=============

View File

@ -23,9 +23,13 @@ namespaces:
users: # List of users we're interested in
- exclusive: <bool>
regex: <regex>
group_id: <group>
- ...
aliases: [] # List of aliases we're interested in
rooms: [] # List of room ids we're interested in
```
`exclusive`: If enabled, only this application service is allowed to register users in its namespace(s).
`group_id`: All users of this application service are dynamically joined to this group. This is useful for e.g user organisation or flairs.
See the [spec](https://matrix.org/docs/spec/application_service/unstable.html) for further details on how application services work.

View File

@ -60,6 +60,31 @@
1. Restart Prometheus.
## Monitoring workers
To monitor a Synapse installation using
[workers](https://github.com/matrix-org/synapse/blob/master/docs/workers.md),
every worker needs to be monitored independently, in addition to
the main homeserver process. This is because workers don't send
their metrics to the main homeserver process, but expose them
directly (if they are configured to do so).
To allow collecting metrics from a worker, you need to add a
`metrics` listener to its configuration, by adding the following
under `worker_listeners`:
```yaml
- type: metrics
bind_address: ''
port: 9101
```
The `bind_address` and `port` parameters should be set so that
the resulting listener can be reached by prometheus, and they
don't clash with an existing worker.
With this example, the worker's metrics would then be available
on `http://127.0.0.1:9101`.
## Renaming of metrics & deprecation of old names in 1.2
Synapse 1.2 updates the Prometheus metrics to match the naming

View File

@ -13,6 +13,7 @@ own password auth providers. Additionally, here is a list of known
password auth provider module implementations:
* [matrix-synapse-ldap3](https://github.com/matrix-org/matrix-synapse-ldap3/)
* [matrix-synapse-shared-secret-auth](https://github.com/devture/matrix-synapse-shared-secret-auth)
## Required methods

View File

@ -42,6 +42,9 @@ the reverse proxy and the homeserver.
location /_matrix {
proxy_pass http://localhost:8008;
proxy_set_header X-Forwarded-For $remote_addr;
# Nginx by default only allows file uploads up to 1M in size
# Increase client_max_body_size to match max_upload_size defined in homeserver.yaml
client_max_body_size 10M;
}
}

View File

@ -414,6 +414,16 @@ retention:
# longest_max_lifetime: 1y
# interval: 1d
# Inhibits the /requestToken endpoints from returning an error that might leak
# information about whether an e-mail address is in use or not on this
# homeserver.
# Note that for some endpoints the error situation is the e-mail already being
# used, and for others the error is entering the e-mail being unused.
# If this option is enabled, instead of returning an error, these endpoints will
# act as if no error happened and return a fake session ID ('sid') to clients.
#
#request_token_inhibit_3pid_errors: true
## TLS ##
@ -735,12 +745,11 @@ media_store_path: "DATADIR/media_store"
#
#media_storage_providers:
# - module: file_system
# # Whether to write new local files.
# # Whether to store newly uploaded local files
# store_local: false
# # Whether to write new remote media
# # Whether to store newly downloaded remote files
# store_remote: false
# # Whether to block upload requests waiting for write to this
# # provider to complete
# # Whether to wait for successful storage for local uploads
# store_synchronous: false
# config:
# directory: /mnt/some/other/directory
@ -859,6 +868,31 @@ media_store_path: "DATADIR/media_store"
#
#max_spider_size: 10M
# A list of values for the Accept-Language HTTP header used when
# downloading webpages during URL preview generation. This allows
# Synapse to specify the preferred languages that URL previews should
# be in when communicating with remote servers.
#
# Each value is a IETF language tag; a 2-3 letter identifier for a
# language, optionally followed by subtags separated by '-', specifying
# a country or region variant.
#
# Multiple values can be provided, and a weight can be added to each by
# using quality value syntax (;q=). '*' translates to any language.
#
# Defaults to "en".
#
# Example:
#
# url_preview_accept_language:
# - en-UK
# - en-US;q=0.9
# - fr;q=0.8
# - *;q=0.7
#
url_preview_accept_language:
# - en
## Captcha ##
# See docs/CAPTCHA_SETUP for full details of configuring this.
@ -1315,32 +1349,32 @@ saml2_config:
# remote:
# - url: https://our_idp/metadata.xml
#
# # By default, the user has to go to our login page first. If you'd like
# # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
# # 'service.sp' section:
# #
# #service:
# # sp:
# # allow_unsolicited: true
# # By default, the user has to go to our login page first. If you'd like
# # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
# # 'service.sp' section:
# #
# #service:
# # sp:
# # allow_unsolicited: true
#
# # The examples below are just used to generate our metadata xml, and you
# # may well not need them, depending on your setup. Alternatively you
# # may need a whole lot more detail - see the pysaml2 docs!
# # The examples below are just used to generate our metadata xml, and you
# # may well not need them, depending on your setup. Alternatively you
# # may need a whole lot more detail - see the pysaml2 docs!
#
# description: ["My awesome SP", "en"]
# name: ["Test SP", "en"]
# description: ["My awesome SP", "en"]
# name: ["Test SP", "en"]
#
# organization:
# name: Example com
# display_name:
# - ["Example co", "en"]
# url: "http://example.com"
# organization:
# name: Example com
# display_name:
# - ["Example co", "en"]
# url: "http://example.com"
#
# contact_person:
# - given_name: Bob
# sur_name: "the Sysadmin"
# email_address": ["admin@example.com"]
# contact_type": technical
# contact_person:
# - given_name: Bob
# sur_name: "the Sysadmin"
# email_address": ["admin@example.com"]
# contact_type": technical
# Instead of putting the config inline as above, you can specify a
# separate pysaml2 configuration file:
@ -1484,6 +1518,30 @@ sso:
#
# * server_name: the homeserver's name.
#
# * HTML page which notifies the user that they are authenticating to confirm
# an operation on their account during the user interactive authentication
# process: 'sso_auth_confirm.html'.
#
# When rendering, this template is given the following variables:
# * redirect_url: the URL the user is about to be redirected to. Needs
# manual escaping (see
# https://jinja.palletsprojects.com/en/2.11.x/templates/#html-escaping).
#
# * description: the operation which the user is being asked to confirm
#
# * HTML page shown after a successful user interactive authentication session:
# 'sso_auth_success.html'.
#
# Note that this page must include the JavaScript which notifies of a successful authentication
# (see https://matrix.org/docs/spec/client_server/r0.6.0#fallback).
#
# This template has no additional variables.
#
# * HTML page shown during single sign-on if a deactivated user (according to Synapse's database)
# attempts to login: 'sso_account_deactivated.html'.
#
# This template has no additional variables.
#
# You can see the default templates at:
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
#

View File

@ -15,15 +15,17 @@ example flow would be (where '>' indicates master to worker and
> SERVER example.com
< REPLICATE
> POSITION events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
> POSITION events master 53
> RDATA events master 54 ["$foo1:bar.com", ...]
> RDATA events master 55 ["$foo4:bar.com", ...]
The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client server to respond with the
position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <token> <row>`, where the format of
`<row>` is defined by the individual streams.
which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
the format of `<row>` is defined by the individual streams. The
`<instance_name>` is the name of the Synapse process that generated the data
(usually "master").
Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
@ -52,7 +54,7 @@ The basic structure of the protocol is line based, where the initial
word of each line specifies the command. The rest of the line is parsed
based on the command. For example, the RDATA command is defined as:
RDATA <stream_name> <token> <row_json>
RDATA <stream_name> <instance_name> <token> <row_json>
(Note that <row_json> may contains spaces, but cannot contain
newlines.)
@ -136,11 +138,11 @@ the wire:
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
> POSITION events master 1
> POSITION backfill master 1
> POSITION caches master 1
> RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
@ -151,10 +153,10 @@ position without needing to send data with the `RDATA` command.
An example of a batched set of `RDATA` is:
> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
In this case the client shouldn't advance their caches token until it
sees the the last `RDATA`.
@ -178,6 +180,11 @@ client (C):
updates, and if so then fetch them out of band. Sent in response to a
REPLICATE command (but can happen at any time).
The POSITION command includes the source of the stream. Currently all streams
are written by a single process (usually "master"). If fetching missing
updates via HTTP API, rather than via the DB, then processes should make the
request to the appropriate process.
#### ERROR (S, C)
There was an error
@ -196,7 +203,7 @@ Asks the server for the current position of all streams.
#### USER_SYNC (C)
A user has started or stopped syncing
A user has started or stopped syncing on this process.
#### CLEAR_USER_SYNC (C)
@ -216,10 +223,6 @@ Asks the server for the current position of all streams.
Inform the server a cache should be invalidated
#### SYNC (S, C)
Used exclusively in tests
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.
@ -238,12 +241,12 @@ Each individual cache invalidation results in a row being sent down
replication, which includes the cache name (the name of the function)
and they key to invalidate. For example:
> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
> RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
Alternatively, an entire cache can be invalidated by sending down a `null`
instead of the key. For example:
> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
> RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]
However, there are times when a number of caches need to be invalidated
at the same time with the same key. To reduce traffic we batch those

View File

@ -120,7 +120,7 @@ Your home server configuration file needs the following extra keys:
As an example, here is the relevant section of the config file for matrix.org:
turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ]
turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons
turn_shared_secret: "n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons"
turn_user_lifetime: 86400000
turn_allow_guests: True

View File

@ -286,6 +286,8 @@ Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/account_data/
^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/rooms/[^/]*/account_data/
Additionally, the following REST endpoints can be handled, but all requests must
be routed to the same instance:

View File

@ -33,6 +33,10 @@ parts:
python-version: python3
python-packages:
- '.[all]'
- pip
- setuptools
- setuptools-scm
- wheel
build-packages:
- libffi-dev
- libturbojpeg0-dev

View File

@ -0,0 +1,13 @@
from .sorteddict import (
SortedDict,
SortedKeysView,
SortedItemsView,
SortedValuesView,
)
__all__ = [
"SortedDict",
"SortedKeysView",
"SortedItemsView",
"SortedValuesView",
]

View File

@ -0,0 +1,124 @@
# stub for SortedDict. This is a lightly edited copy of
# https://github.com/grantjenks/python-sortedcontainers/blob/eea42df1f7bad2792e8da77335ff888f04b9e5ae/sortedcontainers/sorteddict.pyi
# (from https://github.com/grantjenks/python-sortedcontainers/pull/107)
from typing import (
Any,
Callable,
Dict,
Hashable,
Iterator,
Iterable,
ItemsView,
KeysView,
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Tuple,
Union,
ValuesView,
overload,
)
_T = TypeVar("_T")
_S = TypeVar("_S")
_T_h = TypeVar("_T_h", bound=Hashable)
_KT = TypeVar("_KT", bound=Hashable) # Key type.
_VT = TypeVar("_VT") # Value type.
_KT_co = TypeVar("_KT_co", covariant=True, bound=Hashable)
_VT_co = TypeVar("_VT_co", covariant=True)
_SD = TypeVar("_SD", bound=SortedDict)
_Key = Callable[[_T], Any]
class SortedDict(Dict[_KT, _VT]):
@overload
def __init__(self, **kwargs: _VT) -> None: ...
@overload
def __init__(self, __map: Mapping[_KT, _VT], **kwargs: _VT) -> None: ...
@overload
def __init__(
self, __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT
) -> None: ...
@overload
def __init__(self, __key: _Key[_KT], **kwargs: _VT) -> None: ...
@overload
def __init__(
self, __key: _Key[_KT], __map: Mapping[_KT, _VT], **kwargs: _VT
) -> None: ...
@overload
def __init__(
self, __key: _Key[_KT], __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT
) -> None: ...
@property
def key(self) -> Optional[_Key[_KT]]: ...
@property
def iloc(self) -> SortedKeysView[_KT]: ...
def clear(self) -> None: ...
def __delitem__(self, key: _KT) -> None: ...
def __iter__(self) -> Iterator[_KT]: ...
def __reversed__(self) -> Iterator[_KT]: ...
def __setitem__(self, key: _KT, value: _VT) -> None: ...
def _setitem(self, key: _KT, value: _VT) -> None: ...
def copy(self: _SD) -> _SD: ...
def __copy__(self: _SD) -> _SD: ...
@classmethod
@overload
def fromkeys(cls, seq: Iterable[_T_h]) -> SortedDict[_T_h, None]: ...
@classmethod
@overload
def fromkeys(cls, seq: Iterable[_T_h], value: _S) -> SortedDict[_T_h, _S]: ...
def keys(self) -> SortedKeysView[_KT]: ...
def items(self) -> SortedItemsView[_KT, _VT]: ...
def values(self) -> SortedValuesView[_VT]: ...
@overload
def pop(self, key: _KT) -> _VT: ...
@overload
def pop(self, key: _KT, default: _T = ...) -> Union[_VT, _T]: ...
def popitem(self, index: int = ...) -> Tuple[_KT, _VT]: ...
def peekitem(self, index: int = ...) -> Tuple[_KT, _VT]: ...
def setdefault(self, key: _KT, default: Optional[_VT] = ...) -> _VT: ...
@overload
def update(self, __map: Mapping[_KT, _VT], **kwargs: _VT) -> None: ...
@overload
def update(self, __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT) -> None: ...
@overload
def update(self, **kwargs: _VT) -> None: ...
def __reduce__(
self,
) -> Tuple[
Type[SortedDict[_KT, _VT]], Tuple[Callable[[_KT], Any], List[Tuple[_KT, _VT]]],
]: ...
def __repr__(self) -> str: ...
def _check(self) -> None: ...
def islice(
self, start: Optional[int] = ..., stop: Optional[int] = ..., reverse=bool,
) -> Iterator[_KT]: ...
def bisect_left(self, value: _KT) -> int: ...
def bisect_right(self, value: _KT) -> int: ...
class SortedKeysView(KeysView[_KT_co], Sequence[_KT_co]):
@overload
def __getitem__(self, index: int) -> _KT_co: ...
@overload
def __getitem__(self, index: slice) -> List[_KT_co]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...
class SortedItemsView( # type: ignore
ItemsView[_KT_co, _VT_co], Sequence[Tuple[_KT_co, _VT_co]]
):
def __iter__(self) -> Iterator[Tuple[_KT_co, _VT_co]]: ...
@overload
def __getitem__(self, index: int) -> Tuple[_KT_co, _VT_co]: ...
@overload
def __getitem__(self, index: slice) -> List[Tuple[_KT_co, _VT_co]]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...
class SortedValuesView(ValuesView[_VT_co], Sequence[_VT_co]):
@overload
def __getitem__(self, index: int) -> _VT_co: ...
@overload
def __getitem__(self, index: slice) -> List[_VT_co]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...

40
stubs/txredisapi.pyi Normal file
View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains *incomplete* type hints for txredisapi.
"""
from typing import List, Optional, Union
class RedisProtocol:
def publish(self, channel: str, message: bytes): ...
class SubscriberProtocol:
def subscribe(self, channels: Union[str, List[str]]): ...
def lazyConnection(
host: str = ...,
port: int = ...,
dbid: Optional[int] = ...,
reconnect: bool = ...,
charset: str = ...,
password: Optional[str] = ...,
connectTimeout: Optional[int] = ...,
replyTimeout: Optional[int] = ...,
convertNumbers: bool = ...,
) -> RedisProtocol: ...
class SubscriberFactory:
def buildProtocol(self, addr): ...

View File

@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.12.3"
__version__ = "1.12.4"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@ -97,6 +97,8 @@ class EventTypes(object):
Retention = "m.room.retention"
Presence = "m.presence"
class RejectedReason(object):
AUTH_ERROR = "auth_error"

View File

@ -270,7 +270,7 @@ def start(hs, listeners=None):
# Start the tracer
synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
hs.config
hs
)
# It is now safe to start your Synapse.
@ -316,7 +316,7 @@ def setup_sentry(hs):
scope.set_tag("matrix_server_name", hs.config.server_name)
app = hs.config.worker_app if hs.config.worker_app else "synapse.app.homeserver"
name = hs.config.worker_name if hs.config.worker_name else "master"
name = hs.get_instance_name()
scope.set_tag("worker_app", app)
scope.set_tag("worker_name", name)

View File

@ -17,6 +17,9 @@
import contextlib
import logging
import sys
from typing import Dict, Iterable
from typing_extensions import ContextManager
from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
@ -38,14 +41,14 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@ -110,6 +113,10 @@ from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
@ -221,23 +228,32 @@ class KeyUploadServlet(RestServlet):
return 200, {"one_time_key_counts": result}
class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
def __exit__(self, exc_type, exc_val, exc_tb):
pass
UPDATE_SYNCING_USERS_MS = 10 * 1000
class GenericWorkerPresence(object):
class GenericWorkerPresence(BasePresenceHandler):
def __init__(self, hs):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
self.clock = hs.get_clock()
self._presence_enabled = hs.config.use_presence
# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}
# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
self.users_going_offline = {}
@ -255,13 +271,13 @@ class GenericWorkerPresence(object):
)
def _on_shutdown(self):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
@ -303,28 +319,33 @@ class GenericWorkerPresence(object):
# TODO Hows this supposed to work?
return defer.succeed(None)
get_states = __func__(PresenceHandler.get_states)
get_state = __func__(PresenceHandler.get_state)
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
"""Record that a user is syncing.
def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
Called by the sync and events servlets to record that a user has connected to
this worker and is waiting for some events.
"""
if not affect_presence or not self._presence_enabled:
return _NullContextManager()
# If we went from no in flight sync to some, notify replication
if self.user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1
# If we went from no in flight sync to some, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
def _end():
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1
# If we went from one in flight sync to non, notify replication
if self.user_to_num_current_syncs[user_id] == 0:
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)
@contextlib.contextmanager
@ -334,7 +355,7 @@ class GenericWorkerPresence(object):
finally:
_end()
return defer.succeed(_user_syncing())
return _user_syncing()
@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
@ -369,15 +390,12 @@ class GenericWorkerPresence(object):
stream_id = token
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count > 0
]
else:
return set()
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
user_id
for user_id, count in self._user_to_num_current_syncs.items()
if count > 0
]
class GenericWorkerTyping(object):
@ -501,6 +519,8 @@ class GenericWorkerServer(HomeServer):
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
@ -619,8 +639,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
# NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()
self.notify_pushers = hs.config.start_pushers
@ -941,17 +960,22 @@ def start(config_options):
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
ss = GenericWorkerServer(
hs = GenericWorkerServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)
setup_logging(ss, config, use_worker_options=True)
setup_logging(hs, config, use_worker_options=True)
hs.setup()
# Ensure the replication streamer is always started in case we write to any
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
"before", "startup", _base.start, hs, config.worker_listeners
)
_base.start_worker_reactor("synapse-generic-worker", config)

View File

@ -273,6 +273,12 @@ class SynapseHomeServer(HomeServer):
def start_listening(self, listeners):
config = self.get_config()
if config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)
for listener in listeners:
if listener["type"] == "http":
self._listening_services.extend(self._listener_http(config, listener))

View File

@ -468,8 +468,8 @@ class RootConfig(object):
Returns: Config object, or None if --generate-config or --generate-keys was set
"""
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-c",
"--config-path",
action="append",
@ -478,7 +478,7 @@ class RootConfig(object):
" may specify directories containing *.yaml files.",
)
generate_group = config_parser.add_argument_group("Config generation")
generate_group = parser.add_argument_group("Config generation")
generate_group.add_argument(
"--generate-config",
action="store_true",
@ -526,12 +526,13 @@ class RootConfig(object):
),
)
config_args, remaining_args = config_parser.parse_known_args(argv)
cls.invoke_all_static("add_arguments", parser)
config_args = parser.parse_args(argv)
config_files = find_config_files(search_paths=config_args.config_path)
if not config_files:
config_parser.error(
parser.error(
"Must supply a config file.\nA config file can be automatically"
' generated using "--generate-config -H SERVER_NAME'
' -c CONFIG-FILE"'
@ -550,7 +551,7 @@ class RootConfig(object):
if config_args.generate_config:
if config_args.report_stats is None:
config_parser.error(
parser.error(
"Please specify either --report-stats=yes or --report-stats=no\n\n"
+ MISSING_REPORT_STATS_SPIEL
)
@ -609,15 +610,6 @@ class RootConfig(object):
)
generate_missing_configs = True
parser = argparse.ArgumentParser(
parents=[config_parser],
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
obj.invoke_all_static("add_arguments", parser)
args = parser.parse_args(remaining_args)
config_dict = read_config_files(config_files)
if generate_missing_configs:
obj.generate_missing_files(config_dict, config_dir_path)
@ -626,7 +618,7 @@ class RootConfig(object):
obj.parse_config_dict(
config_dict, config_dir_path=config_dir_path, data_dir_path=data_dir_path
)
obj.invoke_all("read_arguments", args)
obj.invoke_all("read_arguments", config_args)
return obj
@ -665,6 +657,12 @@ def read_config_files(config_files):
for config_file in config_files:
with open(config_file) as file_stream:
yaml_config = yaml.safe_load(file_stream)
if not isinstance(yaml_config, dict):
err = "File %r is empty or doesn't parse into a key-value map. IGNORING."
print(err % (config_file,))
continue
specified_config.update(yaml_config)
if "server_name" not in specified_config:

View File

@ -138,7 +138,7 @@ class DatabaseConfig(Config):
database_path = config.get("database_path")
if multi_database_config and database_config:
raise ConfigError("Can't specify both 'database' and 'datbases' in config")
raise ConfigError("Can't specify both 'database' and 'databases' in config")
if multi_database_config:
if database_path:

View File

@ -108,9 +108,14 @@ class EmailConfig(Config):
if self.trusted_third_party_id_servers:
# XXX: It's a little confusing that account_threepid_delegate_email is modified
# both in RegistrationConfig and here. We should factor this bit out
self.account_threepid_delegate_email = self.trusted_third_party_id_servers[
0
] # type: Optional[str]
first_trusted_identity_server = self.trusted_third_party_id_servers[0]
# trusted_third_party_id_servers does not contain a scheme whereas
# account_threepid_delegate_email is expected to. Presume https
self.account_threepid_delegate_email = (
"https://" + first_trusted_identity_server
) # type: Optional[str]
self.using_identity_server_from_trusted_list = True
else:
raise ConfigError(

View File

@ -31,6 +31,7 @@ from .password import PasswordConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room_directory import RoomDirectoryConfig
@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig,
ThirdPartyRulesConfig,
TracerConfig,
RedisConfig,
]

35
synapse/config/redis.py Normal file
View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config
from synapse.python_dependencies import check_requirements
class RedisConfig(Config):
section = "redis"
def read_config(self, config, **kwargs):
redis_config = config.get("redis", {})
self.redis_enabled = redis_config.get("enabled", False)
if not self.redis_enabled:
return
check_requirements("redis")
self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_dbid = redis_config.get("dbid")
self.redis_password = redis_config.get("password")

View File

@ -192,6 +192,10 @@ class ContentRepositoryConfig(Config):
self.url_preview_url_blacklist = config.get("url_preview_url_blacklist", ())
self.url_preview_accept_language = config.get(
"url_preview_accept_language"
) or ["en"]
def generate_config_section(self, data_dir_path, **kwargs):
media_store = os.path.join(data_dir_path, "media_store")
uploads_path = os.path.join(data_dir_path, "uploads")
@ -220,12 +224,11 @@ class ContentRepositoryConfig(Config):
#
#media_storage_providers:
# - module: file_system
# # Whether to write new local files.
# # Whether to store newly uploaded local files
# store_local: false
# # Whether to write new remote media
# # Whether to store newly downloaded remote files
# store_remote: false
# # Whether to block upload requests waiting for write to this
# # provider to complete
# # Whether to wait for successful storage for local uploads
# store_synchronous: false
# config:
# directory: /mnt/some/other/directory
@ -329,6 +332,31 @@ class ContentRepositoryConfig(Config):
# The largest allowed URL preview spidering size in bytes
#
#max_spider_size: 10M
# A list of values for the Accept-Language HTTP header used when
# downloading webpages during URL preview generation. This allows
# Synapse to specify the preferred languages that URL previews should
# be in when communicating with remote servers.
#
# Each value is a IETF language tag; a 2-3 letter identifier for a
# language, optionally followed by subtags separated by '-', specifying
# a country or region variant.
#
# Multiple values can be provided, and a weight can be added to each by
# using quality value syntax (;q=). '*' translates to any language.
#
# Defaults to "en".
#
# Example:
#
# url_preview_accept_language:
# - en-UK
# - en-US;q=0.9
# - fr;q=0.8
# - *;q=0.7
#
url_preview_accept_language:
# - en
"""
% locals()
)

View File

@ -248,32 +248,32 @@ class SAML2Config(Config):
# remote:
# - url: https://our_idp/metadata.xml
#
# # By default, the user has to go to our login page first. If you'd like
# # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
# # 'service.sp' section:
# #
# #service:
# # sp:
# # allow_unsolicited: true
# # By default, the user has to go to our login page first. If you'd like
# # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
# # 'service.sp' section:
# #
# #service:
# # sp:
# # allow_unsolicited: true
#
# # The examples below are just used to generate our metadata xml, and you
# # may well not need them, depending on your setup. Alternatively you
# # may need a whole lot more detail - see the pysaml2 docs!
# # The examples below are just used to generate our metadata xml, and you
# # may well not need them, depending on your setup. Alternatively you
# # may need a whole lot more detail - see the pysaml2 docs!
#
# description: ["My awesome SP", "en"]
# name: ["Test SP", "en"]
# description: ["My awesome SP", "en"]
# name: ["Test SP", "en"]
#
# organization:
# name: Example com
# display_name:
# - ["Example co", "en"]
# url: "http://example.com"
# organization:
# name: Example com
# display_name:
# - ["Example co", "en"]
# url: "http://example.com"
#
# contact_person:
# - given_name: Bob
# sur_name: "the Sysadmin"
# email_address": ["admin@example.com"]
# contact_type": technical
# contact_person:
# - given_name: Bob
# sur_name: "the Sysadmin"
# email_address": ["admin@example.com"]
# contact_type": technical
# Instead of putting the config inline as above, you can specify a
# separate pysaml2 configuration file:

View File

@ -507,6 +507,17 @@ class ServerConfig(Config):
self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False)
# Inhibits the /requestToken endpoints from returning an error that might leak
# information about whether an e-mail address is in use or not on this
# homeserver, and instead return a 200 with a fake sid if this kind of error is
# met, without sending anything.
# This is a compromise between sending an email, which could be a spam vector,
# and letting the client know which email address is bound to an account and
# which one isn't.
self.request_token_inhibit_3pid_errors = config.get(
"request_token_inhibit_3pid_errors", False,
)
def has_tls_listener(self) -> bool:
return any(l["tls"] for l in self.listeners)
@ -972,6 +983,16 @@ class ServerConfig(Config):
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 1d
# Inhibits the /requestToken endpoints from returning an error that might leak
# information about whether an e-mail address is in use or not on this
# homeserver.
# Note that for some endpoints the error situation is the e-mail already being
# used, and for others the error is entering the e-mail being unused.
# If this option is enabled, instead of returning an error, these endpoints will
# act as if no error happened and return a fake session ID ('sid') to clients.
#
#request_token_inhibit_3pid_errors: true
"""
% locals()
)

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Any, Dict
import pkg_resources
@ -36,6 +37,18 @@ class SSOConfig(Config):
template_dir = pkg_resources.resource_filename("synapse", "res/templates",)
self.sso_redirect_confirm_template_dir = template_dir
self.sso_account_deactivated_template = self.read_file(
os.path.join(
self.sso_redirect_confirm_template_dir, "sso_account_deactivated.html"
),
"sso_account_deactivated_template",
)
self.sso_auth_success_template = self.read_file(
os.path.join(
self.sso_redirect_confirm_template_dir, "sso_auth_success.html"
),
"sso_auth_success_template",
)
self.sso_client_whitelist = sso_config.get("client_whitelist") or []
@ -100,6 +113,30 @@ class SSOConfig(Config):
#
# * server_name: the homeserver's name.
#
# * HTML page which notifies the user that they are authenticating to confirm
# an operation on their account during the user interactive authentication
# process: 'sso_auth_confirm.html'.
#
# When rendering, this template is given the following variables:
# * redirect_url: the URL the user is about to be redirected to. Needs
# manual escaping (see
# https://jinja.palletsprojects.com/en/2.11.x/templates/#html-escaping).
#
# * description: the operation which the user is being asked to confirm
#
# * HTML page shown after a successful user interactive authentication session:
# 'sso_auth_success.html'.
#
# Note that this page must include the JavaScript which notifies of a successful authentication
# (see https://matrix.org/docs/spec/client_server/r0.6.0#fallback).
#
# This template has no additional variables.
#
# * HTML page shown during single sign-on if a deactivated user (according to Synapse's database)
# attempts to login: 'sso_account_deactivated.html'.
#
# This template has no additional variables.
#
# You can see the default templates at:
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
#

View File

@ -399,20 +399,30 @@ class TransportLayerClient(object):
{
"device_keys": {
"<user_id>": ["<device_id>"]
} }
}
}
Response:
{
"device_keys": {
"<user_id>": {
"<device_id>": {...}
} } }
}
},
"master_key": {
"<user_id>": {...}
}
},
"self_signing_key": {
"<user_id>": {...}
}
}
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the device keys.
A dict containing device and cross-signing keys.
"""
path = _create_v1_path("/user/keys/query")
@ -429,14 +439,30 @@ class TransportLayerClient(object):
Response:
{
"stream_id": "...",
"devices": [ { ... } ]
"devices": [ { ... } ],
"master_key": {
"user_id": "<user_id>",
"usage": [...],
"keys": {...},
"signatures": {
"<user_id>": {...}
}
},
"self_signing_key": {
"user_id": "<user_id>",
"usage": [...],
"keys": {...},
"signatures": {
"<user_id>": {...}
}
}
}
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the device keys.
A dict containing device and cross-signing keys.
"""
path = _create_v1_path("/user/devices/%s", user_id)
@ -454,8 +480,10 @@ class TransportLayerClient(object):
{
"one_time_keys": {
"<user_id>": {
"<device_id>": "<algorithm>"
} } }
"<device_id>": "<algorithm>"
}
}
}
Response:
{
@ -463,13 +491,16 @@ class TransportLayerClient(object):
"<user_id>": {
"<device_id>": {
"<algorithm>:<key_id>": "<key_base64>"
} } } }
}
}
}
}
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the one-time keys.
A dict containing the one-time keys.
"""
path = _create_v1_path("/user/keys/claim")

View File

@ -37,15 +37,16 @@ An attestation is a signed blob of json that looks like:
import logging
import random
from typing import Tuple
from signedjson.sign import sign_json
from twisted.internet import defer
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import yieldable_gather_results
logger = logging.getLogger(__name__)
@ -162,19 +163,19 @@ class GroupAttestionRenewer(object):
def _start_renew_attestations(self):
return run_as_background_process("renew_attestations", self._renew_attestations)
@defer.inlineCallbacks
def _renew_attestations(self):
async def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations
"""
now = self.clock.time_msec()
rows = yield self.store.get_attestations_need_renewals(
rows = await self.store.get_attestations_need_renewals(
now + UPDATE_ATTESTATION_TIME_MS
)
@defer.inlineCallbacks
def _renew_attestation(group_id, user_id):
def _renew_attestation(group_user: Tuple[str, str]):
group_id, user_id = group_user
try:
if not self.is_mine_id(group_id):
destination = get_domain_from_id(group_id)
@ -207,8 +208,6 @@ class GroupAttestionRenewer(object):
"Error renewing attestation of %r in %r", user_id, group_id
)
for row in rows:
group_id = row["group_id"]
user_id = row["user_id"]
run_in_background(_renew_attestation, group_id, user_id)
await yieldable_gather_results(
_renew_attestation, ((row["group_id"], row["user_id"]) for row in rows)
)

View File

@ -18,14 +18,12 @@ import logging
import time
import unicodedata
import urllib.parse
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
import attr
import bcrypt # type: ignore[import]
import pymacaroons
from twisted.internet import defer
import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import (
@ -53,31 +51,6 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
SUCCESS_TEMPLATE = """
<html>
<head>
<title>Success!</title>
<meta name='viewport' content='width=device-width, initial-scale=1,
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
<script>
if (window.onAuthDone) {
window.onAuthDone();
} else if (window.opener && window.opener.postMessage) {
window.opener.postMessage("authDone", "*");
}
</script>
</head>
<body>
<div>
<p>Thank you</p>
<p>You may now close this window and return to the application</p>
</div>
</body>
</html>
"""
class AuthHandler(BaseHandler):
SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
@ -161,21 +134,28 @@ class AuthHandler(BaseHandler):
self._sso_auth_confirm_template = load_jinja2_templates(
hs.config.sso_redirect_confirm_template_dir, ["sso_auth_confirm.html"],
)[0]
# The following template is shown after a successful user interactive
# authentication session. It tells the user they can close the window.
self._sso_auth_success_template = hs.config.sso_auth_success_template
# The following template is shown during the SSO authentication process if
# the account is deactivated.
self._sso_account_deactivated_template = (
hs.config.sso_account_deactivated_template
)
self._server_name = hs.config.server_name
# cast to tuple for use with str.startswith
self._whitelisted_sso_clients = tuple(hs.config.sso_client_whitelist)
@defer.inlineCallbacks
def validate_user_via_ui_auth(
async def validate_user_via_ui_auth(
self,
requester: Requester,
request: SynapseRequest,
request_body: Dict[str, Any],
clientip: str,
description: str,
):
) -> dict:
"""
Checks that the user is who they claim to be, via a UI auth.
@ -196,7 +176,7 @@ class AuthHandler(BaseHandler):
describes the operation happening on their account.
Returns:
defer.Deferred[dict]: the parameters for this request (which may
The parameters for this request (which may
have been given only in a previous call).
Raises:
@ -226,7 +206,7 @@ class AuthHandler(BaseHandler):
flows = [[login_type] for login_type in self._supported_ui_auth_types]
try:
result, params, _ = yield self.check_auth(
result, params, _ = await self.check_auth(
flows, request, request_body, clientip, description
)
except LoginError:
@ -265,23 +245,18 @@ class AuthHandler(BaseHandler):
"""
return self.checkers.keys()
@defer.inlineCallbacks
def check_auth(
async def check_auth(
self,
flows: List[List[str]],
request: SynapseRequest,
clientdict: Dict[str, Any],
clientip: str,
description: str,
):
) -> Tuple[dict, dict, str]:
"""
Takes a dictionary sent by the client in the login / registration
protocol and handles the User-Interactive Auth flow.
As a side effect, this function fills in the 'creds' key on the user's
session with a map, which maps each auth-type (str) to the relevant
identity authenticated by that auth-type (mostly str, but for captcha, bool).
If no auth flows have been completed successfully, raises an
InteractiveAuthIncompleteError. To handle this, you can use
synapse.rest.client.v2_alpha._base.interactive_auth_handler as a
@ -303,8 +278,7 @@ class AuthHandler(BaseHandler):
describes the operation happening on their account.
Returns:
defer.Deferred[dict, dict, str]: a deferred tuple of
(creds, params, session_id).
A tuple of (creds, params, session_id).
'creds' contains the authenticated credentials of each stage.
@ -326,50 +300,47 @@ class AuthHandler(BaseHandler):
del clientdict["auth"]
if "session" in authdict:
sid = authdict["session"]
session = self._get_session_info(sid)
if len(clientdict) > 0:
# This was designed to allow the client to omit the parameters
# and just supply the session in subsequent calls so it split
# auth between devices by just sharing the session, (eg. so you
# could continue registration from your phone having clicked the
# email auth link on there). It's probably too open to abuse
# because it lets unauthenticated clients store arbitrary objects
# on a homeserver.
# Revisit: Assuming the REST APIs do sensible validation, the data
# isn't arbintrary.
session["clientdict"] = clientdict
self._save_session(session)
elif "clientdict" in session:
clientdict = session["clientdict"]
# Ensure that the queried operation does not vary between stages of
# the UI authentication session. This is done by generating a stable
# comparator based on the URI, method, and body (minus the auth dict)
# and storing it during the initial query. Subsequent queries ensure
# that this comparator has not changed.
comparator = (request.uri, request.method, clientdict)
if "ui_auth" not in session:
session["ui_auth"] = comparator
self._save_session(session)
elif session["ui_auth"] != comparator:
raise SynapseError(
403,
"Requested operation has changed during the UI authentication session.",
# If there's no session ID, create a new session.
if not sid:
session = self._create_session(
clientdict, (request.uri, request.method, clientdict), description
)
session_id = session["id"]
# Add a human readable description to the session.
if "description" not in session:
session["description"] = description
self._save_session(session)
else:
session = self._get_session_info(sid)
session_id = sid
if not clientdict:
# This was designed to allow the client to omit the parameters
# and just supply the session in subsequent calls so it split
# auth between devices by just sharing the session, (eg. so you
# could continue registration from your phone having clicked the
# email auth link on there). It's probably too open to abuse
# because it lets unauthenticated clients store arbitrary objects
# on a homeserver.
# Revisit: Assuming the REST APIs do sensible validation, the data
# isn't arbitrary.
clientdict = session["clientdict"]
# Ensure that the queried operation does not vary between stages of
# the UI authentication session. This is done by generating a stable
# comparator based on the URI, method, and body (minus the auth dict)
# and storing it during the initial query. Subsequent queries ensure
# that this comparator has not changed.
comparator = (request.uri, request.method, clientdict)
if session["ui_auth"] != comparator:
raise SynapseError(
403,
"Requested operation has changed during the UI authentication session.",
)
if not authdict:
raise InteractiveAuthIncompleteError(
self._auth_dict_for_flows(flows, session)
self._auth_dict_for_flows(flows, session_id)
)
if "creds" not in session:
session["creds"] = {}
creds = session["creds"]
# check auth type currently being presented
@ -377,7 +348,7 @@ class AuthHandler(BaseHandler):
if "type" in authdict:
login_type = authdict["type"] # type: str
try:
result = yield self._check_auth_dict(authdict, clientip)
result = await self._check_auth_dict(authdict, clientip)
if result:
creds[login_type] = result
self._save_session(session)
@ -409,15 +380,16 @@ class AuthHandler(BaseHandler):
list(clientdict),
)
return creds, clientdict, session["id"]
return creds, clientdict, session_id
ret = self._auth_dict_for_flows(flows, session)
ret = self._auth_dict_for_flows(flows, session_id)
ret["completed"] = list(creds)
ret.update(errordict)
raise InteractiveAuthIncompleteError(ret)
@defer.inlineCallbacks
def add_oob_auth(self, stagetype: str, authdict: Dict[str, Any], clientip: str):
async def add_oob_auth(
self, stagetype: str, authdict: Dict[str, Any], clientip: str
) -> bool:
"""
Adds the result of out-of-band authentication into an existing auth
session. Currently used for adding the result of fallback auth.
@ -428,11 +400,9 @@ class AuthHandler(BaseHandler):
raise LoginError(400, "", Codes.MISSING_PARAM)
sess = self._get_session_info(authdict["session"])
if "creds" not in sess:
sess["creds"] = {}
creds = sess["creds"]
result = yield self.checkers[stagetype].check_auth(authdict, clientip)
result = await self.checkers[stagetype].check_auth(authdict, clientip)
if result:
creds[stagetype] = result
self._save_session(sess)
@ -469,7 +439,7 @@ class AuthHandler(BaseHandler):
value: The data to store
"""
sess = self._get_session_info(session_id)
sess.setdefault("serverdict", {})[key] = value
sess["serverdict"][key] = value
self._save_session(sess)
def get_session_data(
@ -484,10 +454,11 @@ class AuthHandler(BaseHandler):
default: Value to return if the key has not been set
"""
sess = self._get_session_info(session_id)
return sess.setdefault("serverdict", {}).get(key, default)
return sess["serverdict"].get(key, default)
@defer.inlineCallbacks
def _check_auth_dict(self, authdict: Dict[str, Any], clientip: str):
async def _check_auth_dict(
self, authdict: Dict[str, Any], clientip: str
) -> Union[Dict[str, Any], str]:
"""Attempt to validate the auth dict provided by a client
Args:
@ -495,7 +466,7 @@ class AuthHandler(BaseHandler):
clientip: IP address of the client
Returns:
Deferred: result of the stage verification.
Result of the stage verification.
Raises:
StoreError if there was a problem accessing the database
@ -505,7 +476,7 @@ class AuthHandler(BaseHandler):
login_type = authdict["type"]
checker = self.checkers.get(login_type)
if checker is not None:
res = yield checker.check_auth(authdict, clientip=clientip)
res = await checker.check_auth(authdict, clientip=clientip)
return res
# build a v1-login-style dict out of the authdict and fall back to the
@ -515,7 +486,7 @@ class AuthHandler(BaseHandler):
if user_id is None:
raise SynapseError(400, "", Codes.MISSING_PARAM)
(canonical_id, callback) = yield self.validate_login(user_id, authdict)
(canonical_id, callback) = await self.validate_login(user_id, authdict)
return canonical_id
def _get_params_recaptcha(self) -> dict:
@ -539,7 +510,7 @@ class AuthHandler(BaseHandler):
}
def _auth_dict_for_flows(
self, flows: List[List[str]], session: Dict[str, Any]
self, flows: List[List[str]], session_id: str,
) -> Dict[str, Any]:
public_flows = []
for f in flows:
@ -558,31 +529,73 @@ class AuthHandler(BaseHandler):
params[stage] = get_params[stage]()
return {
"session": session["id"],
"session": session_id,
"flows": [{"stages": f} for f in public_flows],
"params": params,
}
def _get_session_info(self, session_id: Optional[str]) -> dict:
def _create_session(
self,
clientdict: Dict[str, Any],
ui_auth: Tuple[bytes, bytes, Dict[str, Any]],
description: str,
) -> dict:
"""
Gets or creates a session given a session ID.
Creates a new user interactive authentication session.
The session can be used to track data across multiple requests, e.g. for
interactive authentication.
Each session has the following keys:
id:
A unique identifier for this session. Passed back to the client
and returned for each stage.
clientdict:
The dictionary from the client root level, not the 'auth' key.
ui_auth:
A tuple which is checked at each stage of the authentication to
ensure that the asked for operation has not changed.
creds:
A map, which maps each auth-type (str) to the relevant identity
authenticated by that auth-type (mostly str, but for captcha, bool).
serverdict:
A map of data that is stored server-side and cannot be modified
by the client.
description:
A string description of the operation that the current
authentication is authorising.
Returns:
The newly created session.
"""
session_id = None
while session_id is None or session_id in self.sessions:
session_id = stringutils.random_string(24)
self.sessions[session_id] = {
"id": session_id,
"clientdict": clientdict,
"ui_auth": ui_auth,
"creds": {},
"serverdict": {},
"description": description,
}
return self.sessions[session_id]
def _get_session_info(self, session_id: str) -> dict:
"""
Gets a session given a session ID.
The session can be used to track data across multiple requests, e.g. for
interactive authentication.
"""
if session_id not in self.sessions:
session_id = None
try:
return self.sessions[session_id]
except KeyError:
raise SynapseError(400, "Unknown session ID: %s" % (session_id,))
if not session_id:
# create a new session
while session_id is None or session_id in self.sessions:
session_id = stringutils.random_string(24)
self.sessions[session_id] = {"id": session_id}
return self.sessions[session_id]
@defer.inlineCallbacks
def get_access_token_for_user_id(
async def get_access_token_for_user_id(
self, user_id: str, device_id: Optional[str], valid_until_ms: Optional[int]
):
"""
@ -612,10 +625,10 @@ class AuthHandler(BaseHandler):
)
logger.info("Logging in user %s on device %s%s", user_id, device_id, fmt_expiry)
yield self.auth.check_auth_blocking(user_id)
await self.auth.check_auth_blocking(user_id)
access_token = self.macaroon_gen.generate_access_token(user_id)
yield self.store.add_access_token_to_user(
await self.store.add_access_token_to_user(
user_id, access_token, device_id, valid_until_ms
)
@ -625,15 +638,14 @@ class AuthHandler(BaseHandler):
# device, so we double-check it here.
if device_id is not None:
try:
yield self.store.get_device(user_id, device_id)
await self.store.get_device(user_id, device_id)
except StoreError:
yield self.store.delete_access_token(access_token)
await self.store.delete_access_token(access_token)
raise StoreError(400, "Login raced against device deletion")
return access_token
@defer.inlineCallbacks
def check_user_exists(self, user_id: str):
async def check_user_exists(self, user_id: str) -> Optional[str]:
"""
Checks to see if a user with the given id exists. Will check case
insensitively, but return None if there are multiple inexact matches.
@ -642,28 +654,25 @@ class AuthHandler(BaseHandler):
user_id: complete @user:id
Returns:
defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
Raises:
UserDeactivatedError if a user is found but is deactivated.
The canonical_user_id, or None if zero or multiple matches
"""
res = yield self._find_user_id_and_pwd_hash(user_id)
res = await self._find_user_id_and_pwd_hash(user_id)
if res is not None:
return res[0]
return None
@defer.inlineCallbacks
def _find_user_id_and_pwd_hash(self, user_id: str):
async def _find_user_id_and_pwd_hash(
self, user_id: str
) -> Optional[Tuple[str, str]]:
"""Checks to see if a user with the given id exists. Will check case
insensitively, but will return None if there are multiple inexact
matches.
Returns:
tuple: A 2-tuple of `(canonical_user_id, password_hash)`
None: if there is not exactly one match
A 2-tuple of `(canonical_user_id, password_hash)` or `None`
if there is not exactly one match
"""
user_infos = yield self.store.get_users_by_id_case_insensitive(user_id)
user_infos = await self.store.get_users_by_id_case_insensitive(user_id)
result = None
if not user_infos:
@ -696,8 +705,9 @@ class AuthHandler(BaseHandler):
"""
return self._supported_login_types
@defer.inlineCallbacks
def validate_login(self, username: str, login_submission: Dict[str, Any]):
async def validate_login(
self, username: str, login_submission: Dict[str, Any]
) -> Tuple[str, Optional[Callable[[Dict[str, str]], None]]]:
"""Authenticates the user for the /login API
Also used by the user-interactive auth flow to validate
@ -708,7 +718,7 @@ class AuthHandler(BaseHandler):
login_submission: the whole of the login submission
(including 'type' and other relevant fields)
Returns:
Deferred[str, func]: canonical user id, and optional callback
A tuple of the canonical user id, and optional callback
to be called once the access token and device id are issued
Raises:
StoreError if there was a problem accessing the database
@ -737,7 +747,7 @@ class AuthHandler(BaseHandler):
for provider in self.password_providers:
if hasattr(provider, "check_password") and login_type == LoginType.PASSWORD:
known_login_type = True
is_valid = yield provider.check_password(qualified_user_id, password)
is_valid = await provider.check_password(qualified_user_id, password)
if is_valid:
return qualified_user_id, None
@ -769,7 +779,7 @@ class AuthHandler(BaseHandler):
% (login_type, missing_fields),
)
result = yield provider.check_auth(username, login_type, login_dict)
result = await provider.check_auth(username, login_type, login_dict)
if result:
if isinstance(result, str):
result = (result, None)
@ -778,8 +788,8 @@ class AuthHandler(BaseHandler):
if login_type == LoginType.PASSWORD and self.hs.config.password_localdb_enabled:
known_login_type = True
canonical_user_id = yield self._check_local_password(
qualified_user_id, password
canonical_user_id = await self._check_local_password(
qualified_user_id, password # type: ignore
)
if canonical_user_id:
@ -792,8 +802,9 @@ class AuthHandler(BaseHandler):
# login, it turns all LoginErrors into a 401 anyway.
raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
@defer.inlineCallbacks
def check_password_provider_3pid(self, medium: str, address: str, password: str):
async def check_password_provider_3pid(
self, medium: str, address: str, password: str
) -> Tuple[Optional[str], Optional[Callable[[Dict[str, str]], None]]]:
"""Check if a password provider is able to validate a thirdparty login
Args:
@ -802,9 +813,8 @@ class AuthHandler(BaseHandler):
password: The password of the user.
Returns:
Deferred[(str|None, func|None)]: A tuple of `(user_id,
callback)`. If authentication is successful, `user_id` is a `str`
containing the authenticated, canonical user ID. `callback` is
A tuple of `(user_id, callback)`. If authentication is successful,
`user_id`is the authenticated, canonical user ID. `callback` is
then either a function to be later run after the server has
completed login/registration, or `None`. If authentication was
unsuccessful, `user_id` and `callback` are both `None`.
@ -816,7 +826,7 @@ class AuthHandler(BaseHandler):
# success, to a str (which is the user_id) or a tuple of
# (user_id, callback_func), where callback_func should be run
# after we've finished everything else
result = yield provider.check_3pid_auth(medium, address, password)
result = await provider.check_3pid_auth(medium, address, password)
if result:
# Check if the return value is a str or a tuple
if isinstance(result, str):
@ -826,8 +836,7 @@ class AuthHandler(BaseHandler):
return None, None
@defer.inlineCallbacks
def _check_local_password(self, user_id: str, password: str):
async def _check_local_password(self, user_id: str, password: str) -> Optional[str]:
"""Authenticate a user against the local password database.
user_id is checked case insensitively, but will return None if there are
@ -837,28 +846,26 @@ class AuthHandler(BaseHandler):
user_id: complete @user:id
password: the provided password
Returns:
Deferred[unicode] the canonical_user_id, or Deferred[None] if
unknown user/bad password
The canonical_user_id, or None if unknown user/bad password
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
lookupres = await self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
return None
(user_id, password_hash) = lookupres
# If the password hash is None, the account has likely been deactivated
if not password_hash:
deactivated = yield self.store.get_user_deactivated_status(user_id)
deactivated = await self.store.get_user_deactivated_status(user_id)
if deactivated:
raise UserDeactivatedError("This account has been deactivated")
result = yield self.validate_hash(password, password_hash)
result = await self.validate_hash(password, password_hash)
if not result:
logger.warning("Failed password login for user %s", user_id)
return None
return user_id
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token: str):
async def validate_short_term_login_token_and_get_user_id(self, login_token: str):
auth_api = self.hs.get_auth()
user_id = None
try:
@ -868,26 +875,23 @@ class AuthHandler(BaseHandler):
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
yield self.auth.check_auth_blocking(user_id)
await self.auth.check_auth_blocking(user_id)
return user_id
@defer.inlineCallbacks
def delete_access_token(self, access_token: str):
async def delete_access_token(self, access_token: str):
"""Invalidate a single access token
Args:
access_token: access token to be deleted
Returns:
Deferred
"""
user_info = yield self.auth.get_user_by_access_token(access_token)
yield self.store.delete_access_token(access_token)
user_info = await self.auth.get_user_by_access_token(access_token)
await self.store.delete_access_token(access_token)
# see if any of our auth providers want to know about this
for provider in self.password_providers:
if hasattr(provider, "on_logged_out"):
yield provider.on_logged_out(
await provider.on_logged_out(
user_id=str(user_info["user"]),
device_id=user_info["device_id"],
access_token=access_token,
@ -895,12 +899,11 @@ class AuthHandler(BaseHandler):
# delete pushers associated with this access token
if user_info["token_id"] is not None:
yield self.hs.get_pusherpool().remove_pushers_by_access_token(
await self.hs.get_pusherpool().remove_pushers_by_access_token(
str(user_info["user"]), (user_info["token_id"],)
)
@defer.inlineCallbacks
def delete_access_tokens_for_user(
async def delete_access_tokens_for_user(
self,
user_id: str,
except_token_id: Optional[str] = None,
@ -914,10 +917,8 @@ class AuthHandler(BaseHandler):
device_id: ID of device the tokens are associated with.
If None, tokens associated with any device (or no device) will
be deleted
Returns:
Deferred
"""
tokens_and_devices = yield self.store.user_delete_access_tokens(
tokens_and_devices = await self.store.user_delete_access_tokens(
user_id, except_token_id=except_token_id, device_id=device_id
)
@ -925,17 +926,18 @@ class AuthHandler(BaseHandler):
for provider in self.password_providers:
if hasattr(provider, "on_logged_out"):
for token, token_id, device_id in tokens_and_devices:
yield provider.on_logged_out(
await provider.on_logged_out(
user_id=user_id, device_id=device_id, access_token=token
)
# delete pushers associated with the access tokens
yield self.hs.get_pusherpool().remove_pushers_by_access_token(
await self.hs.get_pusherpool().remove_pushers_by_access_token(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)
@defer.inlineCallbacks
def add_threepid(self, user_id: str, medium: str, address: str, validated_at: int):
async def add_threepid(
self, user_id: str, medium: str, address: str, validated_at: int
):
# check if medium has a valid value
if medium not in ["email", "msisdn"]:
raise SynapseError(
@ -956,14 +958,13 @@ class AuthHandler(BaseHandler):
if medium == "email":
address = address.lower()
yield self.store.user_add_threepid(
await self.store.user_add_threepid(
user_id, medium, address, validated_at, self.hs.get_clock().time_msec()
)
@defer.inlineCallbacks
def delete_threepid(
async def delete_threepid(
self, user_id: str, medium: str, address: str, id_server: Optional[str] = None
):
) -> bool:
"""Attempts to unbind the 3pid on the identity servers and deletes it
from the local database.
@ -976,7 +977,7 @@ class AuthHandler(BaseHandler):
identity server specified when binding (if known).
Returns:
Deferred[bool]: Returns True if successfully unbound the 3pid on
Returns True if successfully unbound the 3pid on
the identity server, False if identity server doesn't support the
unbind API.
"""
@ -986,11 +987,11 @@ class AuthHandler(BaseHandler):
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
result = yield identity_handler.try_unbind_threepid(
result = await identity_handler.try_unbind_threepid(
user_id, {"medium": medium, "address": address, "id_server": id_server}
)
yield self.store.user_delete_threepid(user_id, medium, address)
await self.store.user_delete_threepid(user_id, medium, address)
return result
def _save_session(self, session: Dict[str, Any]) -> None:
@ -1000,14 +1001,14 @@ class AuthHandler(BaseHandler):
session["last_used"] = self.hs.get_clock().time_msec()
self.sessions[session["id"]] = session
def hash(self, password: str):
async def hash(self, password: str) -> str:
"""Computes a secure hash of password.
Args:
password: Password to hash.
Returns:
Deferred(unicode): Hashed password.
Hashed password.
"""
def _do_hash():
@ -1019,9 +1020,11 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode("ascii")
return defer_to_thread(self.hs.get_reactor(), _do_hash)
return await defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password: str, stored_hash: bytes):
async def validate_hash(
self, password: str, stored_hash: Union[bytes, str]
) -> bool:
"""Validates that self.hash(password) == stored_hash.
Args:
@ -1029,7 +1032,7 @@ class AuthHandler(BaseHandler):
stored_hash: Expected hash value.
Returns:
Deferred(bool): Whether self.hash(password) == stored_hash.
Whether self.hash(password) == stored_hash.
"""
def _do_validate_hash():
@ -1045,9 +1048,9 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode("ascii")
return defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
return await defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
return False
def start_sso_ui_auth(self, redirect_url: str, session_id: str) -> str:
"""
@ -1061,11 +1064,8 @@ class AuthHandler(BaseHandler):
The HTML to render.
"""
session = self._get_session_info(session_id)
# Get the human readable operation of what is occurring, falling back to
# a generic message if it isn't available for some reason.
description = session.get("description", "modify your account")
return self._sso_auth_confirm_template.render(
description=description, redirect_url=redirect_url,
description=session["description"], redirect_url=redirect_url,
)
def complete_sso_ui_auth(
@ -1081,8 +1081,6 @@ class AuthHandler(BaseHandler):
"""
# Mark the stage of the authentication as successful.
sess = self._get_session_info(session_id)
if "creds" not in sess:
sess["creds"] = {}
creds = sess["creds"]
# Save the user who authenticated with SSO, this will be used to ensure
@ -1091,7 +1089,7 @@ class AuthHandler(BaseHandler):
self._save_session(sess)
# Render the HTML and return.
html_bytes = SUCCESS_TEMPLATE.encode("utf8")
html_bytes = self._sso_auth_success_template.encode("utf-8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
@ -1099,7 +1097,7 @@ class AuthHandler(BaseHandler):
request.write(html_bytes)
finish_request(request)
def complete_sso_login(
async def complete_sso_login(
self,
registered_user_id: str,
request: SynapseRequest,
@ -1113,6 +1111,32 @@ class AuthHandler(BaseHandler):
client_redirect_url: The URL to which to redirect the user at the end of the
process.
"""
# If the account has been deactivated, do not proceed with the login
# flow.
deactivated = await self.store.get_user_deactivated_status(registered_user_id)
if deactivated:
html_bytes = self._sso_account_deactivated_template.encode("utf-8")
request.setResponseCode(403)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
return
self._complete_sso_login(registered_user_id, request, client_redirect_url)
def _complete_sso_login(
self,
registered_user_id: str,
request: SynapseRequest,
client_redirect_url: str,
):
"""
The synchronous portion of complete_sso_login.
This exists purely for backwards compatibility of synapse.module_api.ModuleApi.
"""
# Create a login token
login_token = self.macaroon_gen.generate_short_term_login_token(
registered_user_id
@ -1138,7 +1162,7 @@ class AuthHandler(BaseHandler):
# URL we redirect users to.
redirect_url_no_params = client_redirect_url.split("?")[0]
html = self._sso_redirect_confirm_template.render(
html_bytes = self._sso_redirect_confirm_template.render(
display_url=redirect_url_no_params,
redirect_url=redirect_url,
server_name=self._server_name,
@ -1146,8 +1170,8 @@ class AuthHandler(BaseHandler):
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(html),))
request.write(html)
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
finish_request(request)
@staticmethod

View File

@ -216,6 +216,6 @@ class CasHandler:
localpart=localpart, default_display_name=user_display_name
)
self._auth_handler.complete_sso_login(
await self._auth_handler.complete_sso_login(
registered_user_id, request, client_redirect_url
)

View File

@ -338,8 +338,10 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
yield self._auth_handler.delete_access_tokens_for_user(
user_id, device_id=device_id
yield defer.ensureDeferred(
self._auth_handler.delete_access_tokens_for_user(
user_id, device_id=device_id
)
)
yield self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id)
@ -391,8 +393,10 @@ class DeviceHandler(DeviceWorkerHandler):
# Delete access tokens and e2e keys for each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
yield self._auth_handler.delete_access_tokens_for_user(
user_id, device_id=device_id
yield defer.ensureDeferred(
self._auth_handler.delete_access_tokens_for_user(
user_id, device_id=device_id
)
)
yield self.store.delete_e2e_keys_by_device(
user_id=user_id, device_id=device_id

View File

@ -54,19 +54,23 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
federation_registry = hs.get_federation_registry()
self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
else:
# Only register this edu handler on master as it requires writing
# device updates to the db
#
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
@ -170,8 +174,8 @@ class E2eKeysHandler(object):
"""This is called when we are querying the device list of a user on
a remote homeserver and their device list is not in the device list
cache. If we share a room with this user and we're not querying for
specific user we will update the cache
with their device list."""
specific user we will update the cache with their device list.
"""
destination_query = remote_queries_not_in_cache[destination]
@ -957,13 +961,19 @@ class E2eKeysHandler(object):
return signature_list, failures
@defer.inlineCallbacks
def _get_e2e_cross_signing_verify_key(self, user_id, key_type, from_user_id=None):
"""Fetch the cross-signing public key from storage and interpret it.
def _get_e2e_cross_signing_verify_key(
self, user_id: str, key_type: str, from_user_id: str = None
):
"""Fetch locally or remotely query for a cross-signing public key.
First, attempt to fetch the cross-signing public key from storage.
If that fails, query the keys from the homeserver they belong to
and update our local copy.
Args:
user_id (str): the user whose key should be fetched
key_type (str): the type of key to fetch
from_user_id (str): the user that we are fetching the keys for.
user_id: the user whose key should be fetched
key_type: the type of key to fetch
from_user_id: the user that we are fetching the keys for.
This affects what signatures are fetched.
Returns:
@ -972,16 +982,140 @@ class E2eKeysHandler(object):
Raises:
NotFoundError: if the key is not found
SynapseError: if `user_id` is invalid
"""
user = UserID.from_string(user_id)
key = yield self.store.get_e2e_cross_signing_key(
user_id, key_type, from_user_id
)
if key is None:
logger.debug("no %s key found for %s", key_type, user_id)
if key:
# We found a copy of this key in our database. Decode and return it
key_id, verify_key = get_verify_key_from_cross_signing_key(key)
return key, key_id, verify_key
# If we couldn't find the key locally, and we're looking for keys of
# another user then attempt to fetch the missing key from the remote
# user's server.
#
# We may run into this in possible edge cases where a user tries to
# cross-sign a remote user, but does not share any rooms with them yet.
# Thus, we would not have their key list yet. We instead fetch the key,
# store it and notify clients of new, associated device IDs.
if self.is_mine(user) or key_type not in ["master", "self_signing"]:
# Note that master and self_signing keys are the only cross-signing keys we
# can request over federation
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
key_id, verify_key = get_verify_key_from_cross_signing_key(key)
(
key,
key_id,
verify_key,
) = yield self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
if key is None:
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
return key, key_id, verify_key
@defer.inlineCallbacks
def _retrieve_cross_signing_keys_for_remote_user(
self, user: UserID, desired_key_type: str,
):
"""Queries cross-signing keys for a remote user and saves them to the database
Only the key specified by `key_type` will be returned, while all retrieved keys
will be saved regardless
Args:
user: The user to query remote keys for
desired_key_type: The type of key to receive. One of "master", "self_signing"
Returns:
Deferred[Tuple[Optional[Dict], Optional[str], Optional[VerifyKey]]]: A tuple
of the retrieved key content, the key's ID and the matching VerifyKey.
If the key cannot be retrieved, all values in the tuple will instead be None.
"""
try:
remote_result = yield self.federation.query_user_devices(
user.domain, user.to_string()
)
except Exception as e:
logger.warning(
"Unable to query %s for cross-signing keys of user %s: %s %s",
user.domain,
user.to_string(),
type(e),
e,
)
return None, None, None
# Process each of the retrieved cross-signing keys
desired_key = None
desired_key_id = None
desired_verify_key = None
retrieved_device_ids = []
for key_type in ["master", "self_signing"]:
key_content = remote_result.get(key_type + "_key")
if not key_content:
continue
# Ensure these keys belong to the correct user
if "user_id" not in key_content:
logger.warning(
"Invalid %s key retrieved, missing user_id field: %s",
key_type,
key_content,
)
continue
if user.to_string() != key_content["user_id"]:
logger.warning(
"Found %s key of user %s when querying for keys of user %s",
key_type,
key_content["user_id"],
user.to_string(),
)
continue
# Validate the key contents
try:
# verify_key is a VerifyKey from signedjson, which uses
# .version to denote the portion of the key ID after the
# algorithm and colon, which is the device ID
key_id, verify_key = get_verify_key_from_cross_signing_key(key_content)
except ValueError as e:
logger.warning(
"Invalid %s key retrieved: %s - %s %s",
key_type,
key_content,
type(e),
e,
)
continue
# Note down the device ID attached to this key
retrieved_device_ids.append(verify_key.version)
# If this is the desired key type, save it and its ID/VerifyKey
if key_type == desired_key_type:
desired_key = key_content
desired_verify_key = verify_key
desired_key_id = key_id
# At the same time, store this key in the db for subsequent queries
yield self.store.set_e2e_cross_signing_key(
user.to_string(), key_type, key_content
)
# Notify clients that new devices for this user have been discovered
if retrieved_device_ids:
# XXX is this necessary?
yield self.device_handler.notify_device_update(
user.to_string(), retrieved_device_ids
)
return desired_key, desired_key_id, desired_verify_key
def _check_cross_signing_key(key, user_id, key_type, signing_key=None):
"""Check a cross-signing key uploaded by a user. Performs some basic sanity

View File

@ -19,6 +19,7 @@ import random
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.utils import log_function
from synapse.types import UserID
from synapse.visibility import filter_events_for_client
@ -97,6 +98,8 @@ class EventStreamHandler(BaseHandler):
explicit_room_id=room_id,
)
time_now = self.clock.time_msec()
# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
@ -112,19 +115,20 @@ class EventStreamHandler(BaseHandler):
users = await self.state.get_current_users_in_room(
event.room_id
)
states = await presence_handler.get_states(users, as_event=True)
to_add.extend(states)
else:
users = [event.state_key]
ev = await presence_handler.get_state(
UserID.from_string(event.state_key), as_event=True
)
to_add.append(ev)
states = await presence_handler.get_states(users)
to_add.extend(
{
"type": EventTypes.Presence,
"content": format_user_presence_state(state, time_now),
}
for state in states
)
events.extend(to_add)
time_now = self.clock.time_msec()
chunks = await self._event_serializer.serialize_events(
events,
time_now,

View File

@ -343,7 +343,7 @@ class FederationHandler(BaseHandler):
ours = await self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(ours.values()) # type: list[StateMap[str]]
state_maps = list(ours.values()) # type: List[StateMap[str]]
# we don't need this any more, let's delete it.
del ours
@ -1694,16 +1694,15 @@ class FederationHandler(BaseHandler):
return None
@defer.inlineCallbacks
def get_state_for_pdu(self, room_id, event_id):
async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
"""Returns the state at the event. i.e. not including said event.
"""
event = yield self.store.get_event(
event = await self.store.get_event(
event_id, allow_none=False, check_room_id=room_id
)
state_groups = yield self.state_store.get_state_groups(room_id, [event_id])
state_groups = await self.state_store.get_state_groups(room_id, [event_id])
if state_groups:
_, state = list(iteritems(state_groups)).pop()
@ -1714,7 +1713,7 @@ class FederationHandler(BaseHandler):
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
if prev_id != event.event_id:
prev_event = yield self.store.get_event(prev_id)
prev_event = await self.store.get_event(prev_id)
results[(event.type, event.state_key)] = prev_event
else:
del results[(event.type, event.state_key)]
@ -1724,15 +1723,14 @@ class FederationHandler(BaseHandler):
else:
return []
@defer.inlineCallbacks
def get_state_ids_for_pdu(self, room_id, event_id):
async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
"""Returns the state at the event. i.e. not including said event.
"""
event = yield self.store.get_event(
event = await self.store.get_event(
event_id, allow_none=False, check_room_id=room_id
)
state_groups = yield self.state_store.get_state_groups_ids(room_id, [event_id])
state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
if state_groups:
_, state = list(state_groups.items()).pop()
@ -1751,49 +1749,50 @@ class FederationHandler(BaseHandler):
else:
return []
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, pdu_list, limit):
in_room = yield self.auth.check_host_in_room(room_id, origin)
async def on_backfill_request(
self, origin: str, room_id: str, pdu_list: List[str], limit: int
) -> List[EventBase]:
in_room = await self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
# Synapse asks for 100 events per backfill request. Do not allow more.
limit = min(limit, 100)
events = yield self.store.get_backfill_events(room_id, pdu_list, limit)
events = await self.store.get_backfill_events(room_id, pdu_list, limit)
events = yield filter_events_for_server(self.storage, origin, events)
events = await filter_events_for_server(self.storage, origin, events)
return events
@defer.inlineCallbacks
@log_function
def get_persisted_pdu(self, origin, event_id):
async def get_persisted_pdu(
self, origin: str, event_id: str
) -> Optional[EventBase]:
"""Get an event from the database for the given server.
Args:
origin [str]: hostname of server which is requesting the event; we
origin: hostname of server which is requesting the event; we
will check that the server is allowed to see it.
event_id [str]: id of the event being requested
event_id: id of the event being requested
Returns:
Deferred[EventBase|None]: None if we know nothing about the event;
otherwise the (possibly-redacted) event.
None if we know nothing about the event; otherwise the (possibly-redacted) event.
Raises:
AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event = await self.store.get_event(
event_id, allow_none=True, allow_rejected=True
)
if event:
in_room = yield self.auth.check_host_in_room(event.room_id, origin)
in_room = await self.auth.check_host_in_room(event.room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield filter_events_for_server(self.storage, origin, [event])
events = await filter_events_for_server(self.storage, origin, [event])
event = events[0]
return event
else:
@ -2397,7 +2396,7 @@ class FederationHandler(BaseHandler):
"""
# exclude the state key of the new event from the current_state in the context.
if event.is_state():
event_key = (event.type, event.state_key)
event_key = (event.type, event.state_key) # type: Optional[Tuple[str, str]]
else:
event_key = None
state_updates = {

View File

@ -18,7 +18,7 @@
"""Utilities for interacting with Identity Servers"""
import logging
import urllib
import urllib.parse
from canonicaljson import json
from signedjson.key import decode_verify_key_bytes

View File

@ -381,10 +381,16 @@ class InitialSyncHandler(BaseHandler):
return []
states = await presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
[m.user_id for m in room_members]
)
return states
return [
{
"type": EventTypes.Presence,
"content": format_user_presence_state(s, time_now),
}
for s in states
]
async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -21,10 +22,10 @@ The methods that define policy are:
- PresenceHandler._handle_timeouts
- should_notify
"""
import abc
import logging
from contextlib import contextmanager
from typing import Dict, List, Set
from typing import Dict, Iterable, List, Set
from six import iteritems, itervalues
@ -41,7 +42,7 @@ from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cached
from synapse.util.metrics import Measure
@ -99,13 +100,106 @@ EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and master"""
def __init__(self, hs: "synapse.server.HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}
@abc.abstractmethod
async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.
This allows us to keep track of who is currently streaming and who isn't
without having to have timers outside of this module to avoid flickering
when users disconnect/reconnect.
Args:
user_id: the user that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
@abc.abstractmethod
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
"""Get an iterable of syncing users on this worker, to send to the presence handler
This is called when a replication connection is established. It should return
a list of user ids, which are then sent as USER_SYNC commands to inform the
process handling presence about those users.
Returns:
An iterable of user_id strings.
"""
async def get_state(self, target_user: UserID) -> UserPresenceState:
results = await self.get_states([target_user.to_string()])
return results[0]
async def get_states(
self, target_user_ids: Iterable[str]
) -> List[UserPresenceState]:
"""Get the presence state for users."""
updates_d = await self.current_state_for_users(target_user_ids)
updates = list(updates_d.values())
for user_id in set(target_user_ids) - {u.user_id for u in updates}:
updates.append(UserPresenceState.default(user_id))
return updates
async def current_state_for_users(
self, user_ids: Iterable[str]
) -> Dict[str, UserPresenceState]:
"""Get the current presence state for multiple users.
Returns:
dict: `user_id` -> `UserPresenceState`
"""
states = {
user_id: self.user_to_current_state.get(user_id, None)
for user_id in user_ids
}
missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = await self.store.get_presence_for_users(missing)
states.update(res)
missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id) for user_id in missing
}
states.update(new)
self.user_to_current_state.update(new)
return states
@abc.abstractmethod
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
) -> None:
"""Set the presence state of the user. """
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "synapse.server.HomeServer"):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
@ -115,13 +209,6 @@ class PresenceHandler(object):
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
active_presence = self.store.take_presence_startup_info()
# A dictionary of the current state of users. This is prefilled with
# non-offline presence from the DB. We should fetch from the DB if
# we can't find a users presence in here.
self.user_to_current_state = {state.user_id: state for state in active_presence}
LaterGauge(
"synapse_handlers_presence_user_to_current_state_size",
"",
@ -130,7 +217,7 @@ class PresenceHandler(object):
)
now = self.clock.time_msec()
for state in active_presence:
for state in self.user_to_current_state.values():
self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
@ -361,10 +448,18 @@ class PresenceHandler(object):
timers_fired_counter.inc(len(states))
syncing_user_ids = {
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
syncing_user_ids=syncing_user_ids,
now=now,
)
@ -462,22 +557,9 @@ class PresenceHandler(object):
return _user_syncing()
def get_currently_syncing_users(self):
"""Get the set of user ids that are currently syncing on this HS.
Returns:
set(str): A set of user_id strings.
"""
if self.hs.config.use_presence:
syncing_user_ids = {
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
# since we are the process handling presence, there is nothing to do here.
return []
async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
@ -554,34 +636,6 @@ class PresenceHandler(object):
res = await self.current_state_for_users([user_id])
return res[user_id]
async def current_state_for_users(self, user_ids):
"""Get the current presence state for multiple users.
Returns:
dict: `user_id` -> `UserPresenceState`
"""
states = {
user_id: self.user_to_current_state.get(user_id, None)
for user_id in user_ids
}
missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = await self.store.get_presence_for_users(missing)
states.update(res)
missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id) for user_id in missing
}
states.update(new)
self.user_to_current_state.update(new)
return states
async def _persist_and_notify(self, states):
"""Persist states in the database, poke the notifier and send to
interested remote servers
@ -669,40 +723,6 @@ class PresenceHandler(object):
federation_presence_counter.inc(len(updates))
await self._update_states(updates)
async def get_state(self, target_user, as_event=False):
results = await self.get_states([target_user.to_string()], as_event=as_event)
return results[0]
async def get_states(self, target_user_ids, as_event=False):
"""Get the presence state for users.
Args:
target_user_ids (list)
as_event (bool): Whether to format it as a client event or not.
Returns:
list
"""
updates = await self.current_state_for_users(target_user_ids)
updates = list(updates.values())
for user_id in set(target_user_ids) - {u.user_id for u in updates}:
updates.append(UserPresenceState.default(user_id))
now = self.clock.time_msec()
if as_event:
return [
{
"type": "m.presence",
"content": format_user_presence_state(state, now),
}
for state in updates
]
else:
return updates
async def set_state(self, target_user, state, ignore_status_msg=False):
"""Set the presence state of the user.
"""
@ -889,7 +909,7 @@ class PresenceHandler(object):
user_ids = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))
states = await self.current_state_for_users(user_ids)
states_d = await self.current_state_for_users(user_ids)
# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
@ -899,7 +919,7 @@ class PresenceHandler(object):
now = self.clock.time_msec()
states = [
state
for state in states.values()
for state in states_d.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None

View File

@ -166,7 +166,9 @@ class RegistrationHandler(BaseHandler):
yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None
if password:
password_hash = yield self._auth_handler.hash(password)
password_hash = yield defer.ensureDeferred(
self._auth_handler.hash(password)
)
if localpart is not None:
yield self.check_username(localpart, guest_access_token=guest_access_token)
@ -537,8 +539,10 @@ class RegistrationHandler(BaseHandler):
user_id, ["guest = true"]
)
else:
access_token = yield self._auth_handler.get_access_token_for_user_id(
user_id, device_id=device_id, valid_until_ms=valid_until_ms
access_token = yield defer.ensureDeferred(
self._auth_handler.get_access_token_for_user_id(
user_id, device_id=device_id, valid_until_ms=valid_until_ms
)
)
return (device_id, access_token)
@ -612,8 +616,13 @@ class RegistrationHandler(BaseHandler):
logger.info("Can't add incomplete 3pid")
return
yield self._auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
yield defer.ensureDeferred(
self._auth_handler.add_threepid(
user_id,
threepid["medium"],
threepid["address"],
threepid["validated_at"],
)
)
# And we add an email pusher for them by default, but only
@ -665,6 +674,11 @@ class RegistrationHandler(BaseHandler):
return None
raise
yield self._auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
yield defer.ensureDeferred(
self._auth_handler.add_threepid(
user_id,
threepid["medium"],
threepid["address"],
threepid["validated_at"],
)
)

View File

@ -642,6 +642,13 @@ class RoomCreationHandler(BaseHandler):
check_membership=False,
)
if is_public:
if not self.config.is_publishing_room_allowed(user_id, room_id, room_alias):
# Lets just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
# per alias creation rule?
raise SynapseError(403, "Not allowed to publish room")
preset_config = config.get(
"preset",
RoomCreationPreset.PRIVATE_CHAT
@ -801,6 +808,7 @@ class RoomCreationHandler(BaseHandler):
EventTypes.RoomAvatar: 50,
EventTypes.Tombstone: 100,
EventTypes.ServerACL: 100,
EventTypes.RoomEncryption: 100,
},
"events_default": 0,
"state_default": 50,

View File

@ -154,7 +154,7 @@ class SamlHandler:
)
else:
self._auth_handler.complete_sso_login(user_id, request, relay_state)
await self._auth_handler.complete_sso_login(user_id, request, relay_state)
async def _map_saml_response_to_user(
self, resp_bytes: str, client_redirect_url: str

View File

@ -15,8 +15,6 @@
import logging
from typing import Optional
from twisted.internet import defer
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.types import Requester
@ -34,8 +32,7 @@ class SetPasswordHandler(BaseHandler):
self._device_handler = hs.get_device_handler()
self._password_policy_handler = hs.get_password_policy_handler()
@defer.inlineCallbacks
def set_password(
async def set_password(
self,
user_id: str,
new_password: str,
@ -46,10 +43,10 @@ class SetPasswordHandler(BaseHandler):
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
self._password_policy_handler.validate_password(new_password)
password_hash = yield self._auth_handler.hash(new_password)
password_hash = await self._auth_handler.hash(new_password)
try:
yield self.store.user_set_password_hash(user_id, password_hash)
await self.store.user_set_password_hash(user_id, password_hash)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
@ -61,12 +58,12 @@ class SetPasswordHandler(BaseHandler):
except_access_token_id = requester.access_token_id if requester else None
# First delete all of their other devices.
yield self._device_handler.delete_all_devices_for_user(
await self._device_handler.delete_all_devices_for_user(
user_id, except_device_id=except_device_id
)
# and now delete any access tokens which weren't associated with
# devices (or were associated with this device).
yield self._auth_handler.delete_access_tokens_for_user(
await self._auth_handler.delete_access_tokens_for_user(
user_id, except_token_id=except_access_token_id
)

View File

@ -1639,7 +1639,7 @@ class SyncHandler(object):
)
# We loop through all room ids, even if there are no new events, in case
# there are non room events taht we need to notify about.
# there are non room events that we need to notify about.
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)

View File

@ -434,21 +434,27 @@ class MatrixFederationHttpClient(object):
logger.info("Failed to send request: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e)
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
)
incoming_responses_counter.labels(method_bytes, response.code).inc()
set_tag(tags.HTTP_STATUS_CODE, response.code)
if 200 <= response.code < 300:
logger.debug(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
)
pass
else:
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
)
# :'(
# Update transactions table?
d = treq.content(response)

View File

@ -171,7 +171,7 @@ import logging
import re
import types
from functools import wraps
from typing import Dict
from typing import TYPE_CHECKING, Dict
from canonicaljson import json
@ -179,6 +179,9 @@ from twisted.internet import defer
from synapse.config import ConfigError
if TYPE_CHECKING:
from synapse.server import HomeServer
# Helper class
@ -297,14 +300,11 @@ def _noop_context_manager(*args, **kwargs):
# Setup
def init_tracer(config):
def init_tracer(hs: "HomeServer"):
"""Set the whitelists and initialise the JaegerClient tracer
Args:
config (HomeserverConfig): The config used by the homeserver
"""
global opentracing
if not config.opentracer_enabled:
if not hs.config.opentracer_enabled:
# We don't have a tracer
opentracing = None
return
@ -315,18 +315,15 @@ def init_tracer(config):
"installed."
)
# Include the worker name
name = config.worker_name if config.worker_name else "master"
# Pull out the jaeger config if it was given. Otherwise set it to something sensible.
# See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py
set_homeserver_whitelist(config.opentracer_whitelist)
set_homeserver_whitelist(hs.config.opentracer_whitelist)
JaegerConfig(
config=config.jaeger_config,
service_name="{} {}".format(config.server_name, name),
scope_manager=LogContextScopeManager(config),
config=hs.config.jaeger_config,
service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
scope_manager=LogContextScopeManager(hs.config),
).initialize_tracer()

Some files were not shown because too many files have changed in this diff Show More