Merge remote-tracking branch 'origin/develop' into 3218-official-prom
						commit
						a8990fa2ec
					
				|  | @ -49,3 +49,4 @@ env/ | |||
| *.config | ||||
| 
 | ||||
| .vscode/ | ||||
| .ropeproject/ | ||||
|  |  | |||
|  | @ -1,12 +1,12 @@ | |||
| FROM docker.io/python:2-alpine3.7 | ||||
| 
 | ||||
| RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev | ||||
| RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev libxslt-dev | ||||
| 
 | ||||
| COPY . /synapse | ||||
| 
 | ||||
| # A wheel cache may be provided in ./cache for faster build | ||||
| RUN cd /synapse \ | ||||
|  && pip install --upgrade pip setuptools psycopg2 \ | ||||
|  && pip install --upgrade pip setuptools psycopg2 lxml \ | ||||
|  && mkdir -p /synapse/cache \ | ||||
|  && pip install -f /synapse/cache --upgrade --process-dependency-links . \ | ||||
|  && mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \ | ||||
|  |  | |||
|  | @ -0,0 +1,23 @@ | |||
| If enabling the 'consent' resource in synapse, you will need some templates | ||||
| for the HTML to be served to the user. This directory contains very simple | ||||
| examples of the sort of thing that can be done. | ||||
| 
 | ||||
| You'll need to add this sort of thing to your homeserver.yaml: | ||||
| 
 | ||||
| ``` | ||||
| form_secret: <unique but arbitrary secret> | ||||
| 
 | ||||
| user_consent: | ||||
|   template_dir: docs/privacy_policy_templates | ||||
|   version: 1.0 | ||||
| ``` | ||||
| 
 | ||||
| You should then be able to enable the `consent` resource under a `listener` | ||||
| entry. For example: | ||||
| 
 | ||||
| ``` | ||||
| listeners: | ||||
|   - port: 8008 | ||||
|     resources: | ||||
|       - names: [client, consent] | ||||
| ``` | ||||
|  | @ -0,0 +1,23 @@ | |||
| <!doctype html> | ||||
| <html lang="en"> | ||||
|   <head> | ||||
|     <title>Matrix.org Privacy policy</title> | ||||
|   </head> | ||||
|   <body> | ||||
|   {% if has_consented %} | ||||
|     <p> | ||||
|       Your base already belong to us. | ||||
|     </p> | ||||
|   {% else %} | ||||
|     <p> | ||||
|       All your base are belong to us. | ||||
|     </p> | ||||
|     <form method="post" action="consent"> | ||||
|       <input type="hidden" name="v" value="{{version}}"/> | ||||
|       <input type="hidden" name="u" value="{{user}}"/> | ||||
|       <input type="hidden" name="h" value="{{userhmac}}"/> | ||||
|       <input type="submit" value="Sure thing!"/> | ||||
|     </form> | ||||
|   {% endif %} | ||||
|   </body> | ||||
| </html> | ||||
|  | @ -0,0 +1,11 @@ | |||
| <!doctype html> | ||||
| <html lang="en"> | ||||
|   <head> | ||||
|     <title>Matrix.org Privacy policy</title> | ||||
|   </head> | ||||
|   <body> | ||||
|     <p> | ||||
|       Sweet. | ||||
|     </p> | ||||
|   </body> | ||||
| </html> | ||||
|  | @ -19,6 +19,7 @@ import logging | |||
| 
 | ||||
| import simplejson as json | ||||
| from six import iteritems | ||||
| from six.moves import http_client | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
|  | @ -51,6 +52,7 @@ class Codes(object): | |||
|     THREEPID_DENIED = "M_THREEPID_DENIED" | ||||
|     INVALID_USERNAME = "M_INVALID_USERNAME" | ||||
|     SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" | ||||
|     CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" | ||||
| 
 | ||||
| 
 | ||||
| class CodeMessageException(RuntimeError): | ||||
|  | @ -138,6 +140,32 @@ class SynapseError(CodeMessageException): | |||
|         return res | ||||
| 
 | ||||
| 
 | ||||
| class ConsentNotGivenError(SynapseError): | ||||
|     """The error returned to the client when the user has not consented to the | ||||
|     privacy policy. | ||||
|     """ | ||||
|     def __init__(self, msg, consent_uri): | ||||
|         """Constructs a ConsentNotGivenError | ||||
| 
 | ||||
|         Args: | ||||
|             msg (str): The human-readable error message | ||||
|             consent_url (str): The URL where the user can give their consent | ||||
|         """ | ||||
|         super(ConsentNotGivenError, self).__init__( | ||||
|             code=http_client.FORBIDDEN, | ||||
|             msg=msg, | ||||
|             errcode=Codes.CONSENT_NOT_GIVEN | ||||
|         ) | ||||
|         self._consent_uri = consent_uri | ||||
| 
 | ||||
|     def error_dict(self): | ||||
|         return cs_error( | ||||
|             self.msg, | ||||
|             self.errcode, | ||||
|             consent_uri=self._consent_uri | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| class RegistrationError(SynapseError): | ||||
|     """An error raised when a registration event fails.""" | ||||
|     pass | ||||
|  | @ -292,7 +320,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs): | |||
| 
 | ||||
|     Args: | ||||
|         msg (str): The error message. | ||||
|         code (int): The error code. | ||||
|         code (str): The error code. | ||||
|         kwargs : Additional keys to add to the response. | ||||
|     Returns: | ||||
|         A dict representing the error response JSON. | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2014-2016 OpenMarket Ltd | ||||
| # Copyright 2018 New Vector Ltd. | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
|  | @ -14,6 +15,12 @@ | |||
| # limitations under the License. | ||||
| 
 | ||||
| """Contains the URL paths to prefix various aspects of the server with. """ | ||||
| from hashlib import sha256 | ||||
| import hmac | ||||
| 
 | ||||
| from six.moves.urllib.parse import urlencode | ||||
| 
 | ||||
| from synapse.config import ConfigError | ||||
| 
 | ||||
| CLIENT_PREFIX = "/_matrix/client/api/v1" | ||||
| CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" | ||||
|  | @ -25,3 +32,46 @@ SERVER_KEY_PREFIX = "/_matrix/key/v1" | |||
| SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" | ||||
| MEDIA_PREFIX = "/_matrix/media/r0" | ||||
| LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" | ||||
| 
 | ||||
| 
 | ||||
| class ConsentURIBuilder(object): | ||||
|     def __init__(self, hs_config): | ||||
|         """ | ||||
|         Args: | ||||
|             hs_config (synapse.config.homeserver.HomeServerConfig): | ||||
|         """ | ||||
|         if hs_config.form_secret is None: | ||||
|             raise ConfigError( | ||||
|                 "form_secret not set in config", | ||||
|             ) | ||||
|         if hs_config.public_baseurl is None: | ||||
|             raise ConfigError( | ||||
|                 "public_baseurl not set in config", | ||||
|             ) | ||||
| 
 | ||||
|         self._hmac_secret = hs_config.form_secret.encode("utf-8") | ||||
|         self._public_baseurl = hs_config.public_baseurl | ||||
| 
 | ||||
|     def build_user_consent_uri(self, user_id): | ||||
|         """Build a URI which we can give to the user to do their privacy | ||||
|         policy consent | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): mxid or username of user | ||||
| 
 | ||||
|         Returns | ||||
|             (str) the URI where the user can do consent | ||||
|         """ | ||||
|         mac = hmac.new( | ||||
|             key=self._hmac_secret, | ||||
|             msg=user_id, | ||||
|             digestmod=sha256, | ||||
|         ).hexdigest() | ||||
|         consent_uri = "%s_matrix/consent?%s" % ( | ||||
|             self._public_baseurl, | ||||
|             urlencode({ | ||||
|                 "u": user_id, | ||||
|                 "h": mac | ||||
|             }), | ||||
|         ) | ||||
|         return consent_uri | ||||
|  |  | |||
|  | @ -183,6 +183,15 @@ class SynapseHomeServer(HomeServer): | |||
|                 "/_matrix/client/versions": client_resource, | ||||
|             }) | ||||
| 
 | ||||
|         if name == "consent": | ||||
|             from synapse.rest.consent.consent_resource import ConsentResource | ||||
|             consent_resource = ConsentResource(self) | ||||
|             if compress: | ||||
|                 consent_resource = gz_wrap(consent_resource) | ||||
|             resources.update({ | ||||
|                 "/_matrix/consent": consent_resource, | ||||
|             }) | ||||
| 
 | ||||
|         if name == "federation": | ||||
|             resources.update({ | ||||
|                 FEDERATION_PREFIX: TransportLayerServer(self), | ||||
|  | @ -473,6 +482,14 @@ def run(hs): | |||
|                 " changes across releases." | ||||
|             ) | ||||
| 
 | ||||
|     def generate_user_daily_visit_stats(): | ||||
|         hs.get_datastore().generate_user_daily_visits() | ||||
| 
 | ||||
|     # Rather than update on per session basis, batch up the requests. | ||||
|     # If you increase the loop period, the accuracy of user_daily_visits | ||||
|     # table will decrease | ||||
|     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) | ||||
| 
 | ||||
|     if hs.config.report_stats: | ||||
|         logger.info("Scheduling stats reporting for 3 hour intervals") | ||||
|         clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) | ||||
|  |  | |||
|  | @ -12,3 +12,9 @@ | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from ._base import ConfigError | ||||
| 
 | ||||
| # export ConfigError if somebody does import * | ||||
| # this is largely a fudge to stop PEP8 moaning about the import | ||||
| __all__ = ["ConfigError"] | ||||
|  |  | |||
|  | @ -0,0 +1,76 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from ._base import Config | ||||
| 
 | ||||
| DEFAULT_CONFIG = """\ | ||||
| # User Consent configuration | ||||
| # | ||||
| # Parts of this section are required if enabling the 'consent' resource under | ||||
| # 'listeners', in particular 'template_dir' and 'version'. | ||||
| # | ||||
| # 'template_dir' gives the location of the templates for the HTML forms. | ||||
| # This directory should contain one subdirectory per language (eg, 'en', 'fr'), | ||||
| # and each language directory should contain the policy document (named as | ||||
| # '<version>.html') and a success page (success.html). | ||||
| # | ||||
| # 'version' specifies the 'current' version of the policy document. It defines | ||||
| # the version to be served by the consent resource if there is no 'v' | ||||
| # parameter. | ||||
| # | ||||
| # 'server_notice_content', if enabled, will send a user a "Server Notice" | ||||
| # asking them to consent to the privacy policy. The 'server_notices' section | ||||
| # must also be configured for this to work. | ||||
| # | ||||
| # 'block_events_error', if set, will block any attempts to send events | ||||
| # until the user consents to the privacy policy. The value of the setting is | ||||
| # used as the text of the error. | ||||
| # | ||||
| # user_consent: | ||||
| #   template_dir: res/templates/privacy | ||||
| #   version: 1.0 | ||||
| #   server_notice_content: | ||||
| #     msgtype: m.text | ||||
| #     body: | | ||||
| #       Pls do consent kthx | ||||
| #   block_events_error: | | ||||
| #     You can't send any messages until you consent to the privacy policy. | ||||
| """ | ||||
| 
 | ||||
| 
 | ||||
| class ConsentConfig(Config): | ||||
|     def __init__(self): | ||||
|         super(ConsentConfig, self).__init__() | ||||
| 
 | ||||
|         self.user_consent_version = None | ||||
|         self.user_consent_template_dir = None | ||||
|         self.user_consent_server_notice_content = None | ||||
|         self.block_events_without_consent_error = None | ||||
| 
 | ||||
|     def read_config(self, config): | ||||
|         consent_config = config.get("user_consent") | ||||
|         if consent_config is None: | ||||
|             return | ||||
|         self.user_consent_version = str(consent_config["version"]) | ||||
|         self.user_consent_template_dir = consent_config["template_dir"] | ||||
|         self.user_consent_server_notice_content = consent_config.get( | ||||
|             "server_notice_content", | ||||
|         ) | ||||
|         self.block_events_without_consent_error = consent_config.get( | ||||
|             "block_events_error", | ||||
|         ) | ||||
| 
 | ||||
|     def default_config(self, **kwargs): | ||||
|         return DEFAULT_CONFIG | ||||
|  | @ -1,5 +1,6 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2014-2016 OpenMarket Ltd | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
|  | @ -12,7 +13,6 @@ | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from .tls import TlsConfig | ||||
| from .server import ServerConfig | ||||
| from .logger import LoggingConfig | ||||
|  | @ -37,6 +37,8 @@ from .push import PushConfig | |||
| from .spam_checker import SpamCheckerConfig | ||||
| from .groups import GroupsConfig | ||||
| from .user_directory import UserDirectoryConfig | ||||
| from .consent_config import ConsentConfig | ||||
| from .server_notices_config import ServerNoticesConfig | ||||
| 
 | ||||
| 
 | ||||
| class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, | ||||
|  | @ -45,12 +47,15 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, | |||
|                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig, | ||||
|                        JWTConfig, PasswordConfig, EmailConfig, | ||||
|                        WorkerConfig, PasswordAuthProviderConfig, PushConfig, | ||||
|                        SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,): | ||||
|                        SpamCheckerConfig, GroupsConfig, UserDirectoryConfig, | ||||
|                        ConsentConfig, | ||||
|                        ServerNoticesConfig, | ||||
|                        ): | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     import sys | ||||
|     sys.stdout.write( | ||||
|         HomeServerConfig().generate_config(sys.argv[1], sys.argv[2])[0] | ||||
|         HomeServerConfig().generate_config(sys.argv[1], sys.argv[2], True)[0] | ||||
|     ) | ||||
|  |  | |||
|  | @ -59,14 +59,20 @@ class KeyConfig(Config): | |||
| 
 | ||||
|         self.expire_access_token = config.get("expire_access_token", False) | ||||
| 
 | ||||
|         # a secret which is used to calculate HMACs for form values, to stop | ||||
|         # falsification of values | ||||
|         self.form_secret = config.get("form_secret", None) | ||||
| 
 | ||||
|     def default_config(self, config_dir_path, server_name, is_generating_file=False, | ||||
|                        **kwargs): | ||||
|         base_key_name = os.path.join(config_dir_path, server_name) | ||||
| 
 | ||||
|         if is_generating_file: | ||||
|             macaroon_secret_key = random_string_with_symbols(50) | ||||
|             form_secret = '"%s"' % random_string_with_symbols(50) | ||||
|         else: | ||||
|             macaroon_secret_key = None | ||||
|             form_secret = 'null' | ||||
| 
 | ||||
|         return """\ | ||||
|         macaroon_secret_key: "%(macaroon_secret_key)s" | ||||
|  | @ -74,6 +80,10 @@ class KeyConfig(Config): | |||
|         # Used to enable access token expiration. | ||||
|         expire_access_token: False | ||||
| 
 | ||||
|         # a secret which is used to calculate HMACs for form values, to stop | ||||
|         # falsification of values | ||||
|         form_secret: %(form_secret)s | ||||
| 
 | ||||
|         ## Signing Keys ## | ||||
| 
 | ||||
|         # Path to the signing key to sign messages with | ||||
|  |  | |||
|  | @ -0,0 +1,77 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| from ._base import Config | ||||
| from synapse.types import UserID | ||||
| 
 | ||||
| DEFAULT_CONFIG = """\ | ||||
| # Server Notices room configuration | ||||
| # | ||||
| # Uncomment this section to enable a room which can be used to send notices | ||||
| # from the server to users. It is a special room which cannot be left; notices | ||||
| # come from a special "notices" user id. | ||||
| # | ||||
| # If you uncomment this section, you *must* define the system_mxid_localpart | ||||
| # setting, which defines the id of the user which will be used to send the | ||||
| # notices. | ||||
| # | ||||
| # It's also possible to override the room name, or the display name of the | ||||
| # "notices" user. | ||||
| # | ||||
| # server_notices: | ||||
| #   system_mxid_localpart: notices | ||||
| #   system_mxid_display_name: "Server Notices" | ||||
| #   room_name: "Server Notices" | ||||
| """ | ||||
| 
 | ||||
| 
 | ||||
| class ServerNoticesConfig(Config): | ||||
|     """Configuration for the server notices room. | ||||
| 
 | ||||
|     Attributes: | ||||
|         server_notices_mxid (str|None): | ||||
|             The MXID to use for server notices. | ||||
|             None if server notices are not enabled. | ||||
| 
 | ||||
|         server_notices_mxid_display_name (str|None): | ||||
|             The display name to use for the server notices user. | ||||
|             None if server notices are not enabled. | ||||
| 
 | ||||
|         server_notices_room_name (str|None): | ||||
|             The name to use for the server notices room. | ||||
|             None if server notices are not enabled. | ||||
|     """ | ||||
|     def __init__(self): | ||||
|         super(ServerNoticesConfig, self).__init__() | ||||
|         self.server_notices_mxid = None | ||||
|         self.server_notices_mxid_display_name = None | ||||
|         self.server_notices_room_name = None | ||||
| 
 | ||||
|     def read_config(self, config): | ||||
|         c = config.get("server_notices") | ||||
|         if c is None: | ||||
|             return | ||||
| 
 | ||||
|         mxid_localpart = c['system_mxid_localpart'] | ||||
|         self.server_notices_mxid = UserID( | ||||
|             mxid_localpart, self.server_name, | ||||
|         ).to_string() | ||||
|         self.server_notices_mxid_display_name = c.get( | ||||
|             'system_mxid_display_name', 'Server Notices', | ||||
|         ) | ||||
|         # todo: i18n | ||||
|         self.server_notices_room_name = c.get('room_name', "Server Notices") | ||||
| 
 | ||||
|     def default_config(self, **kwargs): | ||||
|         return DEFAULT_CONFIG | ||||
|  | @ -14,9 +14,7 @@ | |||
| # limitations under the License. | ||||
| 
 | ||||
| from .register import RegistrationHandler | ||||
| from .room import ( | ||||
|     RoomCreationHandler, RoomContextHandler, | ||||
| ) | ||||
| from .room import RoomContextHandler | ||||
| from .message import MessageHandler | ||||
| from .federation import FederationHandler | ||||
| from .directory import DirectoryHandler | ||||
|  | @ -47,7 +45,6 @@ class Handlers(object): | |||
|     def __init__(self, hs): | ||||
|         self.registration_handler = RegistrationHandler(hs) | ||||
|         self.message_handler = MessageHandler(hs) | ||||
|         self.room_creation_handler = RoomCreationHandler(hs) | ||||
|         self.federation_handler = FederationHandler(hs) | ||||
|         self.directory_handler = DirectoryHandler(hs) | ||||
|         self.admin_handler = AdminHandler(hs) | ||||
|  |  | |||
|  | @ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler): | |||
| 
 | ||||
|         self.notifier = hs.get_notifier() | ||||
|         self.state = hs.get_state_handler() | ||||
|         self._server_notices_sender = hs.get_server_notices_sender() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     @log_function | ||||
|  | @ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler): | |||
| 
 | ||||
|         If `only_keys` is not None, events from keys will be sent down. | ||||
|         """ | ||||
| 
 | ||||
|         # send any outstanding server notices to the user. | ||||
|         yield self._server_notices_sender.on_user_syncing(auth_user_id) | ||||
| 
 | ||||
|         auth_user = UserID.from_string(auth_user_id) | ||||
|         presence_handler = self.hs.get_presence_handler() | ||||
| 
 | ||||
|  |  | |||
|  | @ -81,6 +81,7 @@ class FederationHandler(BaseHandler): | |||
|         self.pusher_pool = hs.get_pusherpool() | ||||
|         self.spam_checker = hs.get_spam_checker() | ||||
|         self.event_creation_handler = hs.get_event_creation_handler() | ||||
|         self._server_notices_mxid = hs.config.server_notices_mxid | ||||
| 
 | ||||
|         # When joining a room we need to queue any events for that room up | ||||
|         self.room_queues = {} | ||||
|  | @ -1180,6 +1181,13 @@ class FederationHandler(BaseHandler): | |||
|         if not self.is_mine_id(event.state_key): | ||||
|             raise SynapseError(400, "The invite event must be for this server") | ||||
| 
 | ||||
|         # block any attempts to invite the server notices mxid | ||||
|         if event.state_key == self._server_notices_mxid: | ||||
|             raise SynapseError( | ||||
|                 http_client.FORBIDDEN, | ||||
|                 "Cannot invite this user", | ||||
|             ) | ||||
| 
 | ||||
|         event.internal_metadata.outlier = True | ||||
|         event.internal_metadata.invite_from_remote = True | ||||
| 
 | ||||
|  |  | |||
|  | @ -20,10 +20,15 @@ import sys | |||
| from canonicaljson import encode_canonical_json | ||||
| import six | ||||
| from twisted.internet import defer, reactor | ||||
| from twisted.internet.defer import succeed | ||||
| from twisted.python.failure import Failure | ||||
| 
 | ||||
| from synapse.api.constants import EventTypes, Membership, MAX_DEPTH | ||||
| from synapse.api.errors import AuthError, Codes, SynapseError | ||||
| from synapse.api.errors import ( | ||||
|     AuthError, Codes, SynapseError, | ||||
|     ConsentNotGivenError, | ||||
| ) | ||||
| from synapse.api.urls import ConsentURIBuilder | ||||
| from synapse.crypto.event_signing import add_hashes_and_signatures | ||||
| from synapse.events.utils import serialize_event | ||||
| from synapse.events.validator import EventValidator | ||||
|  | @ -86,14 +91,14 @@ class MessageHandler(BaseHandler): | |||
|         # map from purge id to PurgeStatus | ||||
|         self._purges_by_id = {} | ||||
| 
 | ||||
|     def start_purge_history(self, room_id, topological_ordering, | ||||
|     def start_purge_history(self, room_id, token, | ||||
|                             delete_local_events=False): | ||||
|         """Start off a history purge on a room. | ||||
| 
 | ||||
|         Args: | ||||
|             room_id (str): The room to purge from | ||||
| 
 | ||||
|             topological_ordering (int): minimum topo ordering to preserve | ||||
|             token (str): topological token to delete events before | ||||
|             delete_local_events (bool): True to delete local events as well as | ||||
|                 remote ones | ||||
| 
 | ||||
|  | @ -115,19 +120,19 @@ class MessageHandler(BaseHandler): | |||
|         self._purges_by_id[purge_id] = PurgeStatus() | ||||
|         run_in_background( | ||||
|             self._purge_history, | ||||
|             purge_id, room_id, topological_ordering, delete_local_events, | ||||
|             purge_id, room_id, token, delete_local_events, | ||||
|         ) | ||||
|         return purge_id | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _purge_history(self, purge_id, room_id, topological_ordering, | ||||
|     def _purge_history(self, purge_id, room_id, token, | ||||
|                        delete_local_events): | ||||
|         """Carry out a history purge on a room. | ||||
| 
 | ||||
|         Args: | ||||
|             purge_id (str): The id for this purge | ||||
|             room_id (str): The room to purge from | ||||
|             topological_ordering (int): minimum topo ordering to preserve | ||||
|             token (str): topological token to delete events before | ||||
|             delete_local_events (bool): True to delete local events as well as | ||||
|                 remote ones | ||||
| 
 | ||||
|  | @ -138,7 +143,7 @@ class MessageHandler(BaseHandler): | |||
|         try: | ||||
|             with (yield self.pagination_lock.write(room_id)): | ||||
|                 yield self.store.purge_history( | ||||
|                     room_id, topological_ordering, delete_local_events, | ||||
|                     room_id, token, delete_local_events, | ||||
|                 ) | ||||
|             logger.info("[purge] complete") | ||||
|             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE | ||||
|  | @ -431,6 +436,9 @@ class EventCreationHandler(object): | |||
| 
 | ||||
|         self.spam_checker = hs.get_spam_checker() | ||||
| 
 | ||||
|         if self.config.block_events_without_consent_error is not None: | ||||
|             self._consent_uri_builder = ConsentURIBuilder(self.config) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def create_event(self, requester, event_dict, token_id=None, txn_id=None, | ||||
|                      prev_events_and_hashes=None): | ||||
|  | @ -482,6 +490,10 @@ class EventCreationHandler(object): | |||
|                         target, e | ||||
|                     ) | ||||
| 
 | ||||
|         is_exempt = yield self._is_exempt_from_privacy_policy(builder) | ||||
|         if not is_exempt: | ||||
|             yield self.assert_accepted_privacy_policy(requester) | ||||
| 
 | ||||
|         if token_id is not None: | ||||
|             builder.internal_metadata.token_id = token_id | ||||
| 
 | ||||
|  | @ -496,6 +508,78 @@ class EventCreationHandler(object): | |||
| 
 | ||||
|         defer.returnValue((event, context)) | ||||
| 
 | ||||
|     def _is_exempt_from_privacy_policy(self, builder): | ||||
|         """"Determine if an event to be sent is exempt from having to consent | ||||
|         to the privacy policy | ||||
| 
 | ||||
|         Args: | ||||
|             builder (synapse.events.builder.EventBuilder): event being created | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred[bool]: true if the event can be sent without the user | ||||
|                 consenting | ||||
|         """ | ||||
|         # the only thing the user can do is join the server notices room. | ||||
|         if builder.type == EventTypes.Member: | ||||
|             membership = builder.content.get("membership", None) | ||||
|             if membership == Membership.JOIN: | ||||
|                 return self._is_server_notices_room(builder.room_id) | ||||
|         return succeed(False) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _is_server_notices_room(self, room_id): | ||||
|         if self.config.server_notices_mxid is None: | ||||
|             defer.returnValue(False) | ||||
|         user_ids = yield self.store.get_users_in_room(room_id) | ||||
|         defer.returnValue(self.config.server_notices_mxid in user_ids) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def assert_accepted_privacy_policy(self, requester): | ||||
|         """Check if a user has accepted the privacy policy | ||||
| 
 | ||||
|         Called when the given user is about to do something that requires | ||||
|         privacy consent. We see if the user is exempt and otherwise check that | ||||
|         they have given consent. If they have not, a ConsentNotGiven error is | ||||
|         raised. | ||||
| 
 | ||||
|         Args: | ||||
|             requester (synapse.types.Requester): | ||||
|                 The user making the request | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred[None]: returns normally if the user has consented or is | ||||
|                 exempt | ||||
| 
 | ||||
|         Raises: | ||||
|             ConsentNotGivenError: if the user has not given consent yet | ||||
|         """ | ||||
|         if self.config.block_events_without_consent_error is None: | ||||
|             return | ||||
| 
 | ||||
|         # exempt AS users from needing consent | ||||
|         if requester.app_service is not None: | ||||
|             return | ||||
| 
 | ||||
|         user_id = requester.user.to_string() | ||||
| 
 | ||||
|         # exempt the system notices user | ||||
|         if ( | ||||
|             self.config.server_notices_mxid is not None and | ||||
|             user_id == self.config.server_notices_mxid | ||||
|         ): | ||||
|             return | ||||
| 
 | ||||
|         u = yield self.store.get_user_by_id(user_id) | ||||
|         assert u is not None | ||||
|         if u["consent_version"] == self.config.user_consent_version: | ||||
|             return | ||||
| 
 | ||||
|         consent_uri = self._consent_uri_builder.build_user_consent_uri(user_id) | ||||
|         raise ConsentNotGivenError( | ||||
|             msg=self.config.block_events_without_consent_error, | ||||
|             consent_uri=consent_uri, | ||||
|         ) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def send_nonmember_event(self, requester, event, context, ratelimit=True): | ||||
|         """ | ||||
|  |  | |||
|  | @ -87,6 +87,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER | |||
| class PresenceHandler(object): | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
| 
 | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
|         self.is_mine = hs.is_mine | ||||
|         self.is_mine_id = hs.is_mine_id | ||||
|         self.clock = hs.get_clock() | ||||
|  | @ -94,7 +99,6 @@ class PresenceHandler(object): | |||
|         self.wheel_timer = WheelTimer() | ||||
|         self.notifier = hs.get_notifier() | ||||
|         self.federation = hs.get_federation_sender() | ||||
| 
 | ||||
|         self.state = hs.get_state_handler() | ||||
| 
 | ||||
|         federation_registry = hs.get_federation_registry() | ||||
|  | @ -463,61 +467,6 @@ class PresenceHandler(object): | |||
|             syncing_user_ids.update(user_ids) | ||||
|         return syncing_user_ids | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def update_external_syncs(self, process_id, syncing_user_ids): | ||||
|         """Update the syncing users for an external process | ||||
| 
 | ||||
|         Args: | ||||
|             process_id(str): An identifier for the process the users are | ||||
|                 syncing against. This allows synapse to process updates | ||||
|                 as user start and stop syncing against a given process. | ||||
|             syncing_user_ids(set(str)): The set of user_ids that are | ||||
|                 currently syncing on that server. | ||||
|         """ | ||||
| 
 | ||||
|         # Grab the previous list of user_ids that were syncing on that process | ||||
|         prev_syncing_user_ids = ( | ||||
|             self.external_process_to_current_syncs.get(process_id, set()) | ||||
|         ) | ||||
|         # Grab the current presence state for both the users that are syncing | ||||
|         # now and the users that were syncing before this update. | ||||
|         prev_states = yield self.current_state_for_users( | ||||
|             syncing_user_ids | prev_syncing_user_ids | ||||
|         ) | ||||
|         updates = [] | ||||
|         time_now_ms = self.clock.time_msec() | ||||
| 
 | ||||
|         # For each new user that is syncing check if we need to mark them as | ||||
|         # being online. | ||||
|         for new_user_id in syncing_user_ids - prev_syncing_user_ids: | ||||
|             prev_state = prev_states[new_user_id] | ||||
|             if prev_state.state == PresenceState.OFFLINE: | ||||
|                 updates.append(prev_state.copy_and_replace( | ||||
|                     state=PresenceState.ONLINE, | ||||
|                     last_active_ts=time_now_ms, | ||||
|                     last_user_sync_ts=time_now_ms, | ||||
|                 )) | ||||
|             else: | ||||
|                 updates.append(prev_state.copy_and_replace( | ||||
|                     last_user_sync_ts=time_now_ms, | ||||
|                 )) | ||||
| 
 | ||||
|         # For each user that is still syncing or stopped syncing update the | ||||
|         # last sync time so that we will correctly apply the grace period when | ||||
|         # they stop syncing. | ||||
|         for old_user_id in prev_syncing_user_ids: | ||||
|             prev_state = prev_states[old_user_id] | ||||
|             updates.append(prev_state.copy_and_replace( | ||||
|                 last_user_sync_ts=time_now_ms, | ||||
|             )) | ||||
| 
 | ||||
|         yield self._update_states(updates) | ||||
| 
 | ||||
|         # Update the last updated time for the process. We expire the entries | ||||
|         # if we don't receive an update in the given timeframe. | ||||
|         self.external_process_last_updated_ms[process_id] = self.clock.time_msec() | ||||
|         self.external_process_to_current_syncs[process_id] = syncing_user_ids | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): | ||||
|         """Update the syncing users for an external process as a delta. | ||||
|  |  | |||
|  | @ -34,6 +34,11 @@ logger = logging.getLogger(__name__) | |||
| class RegistrationHandler(BaseHandler): | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
| 
 | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
|         super(RegistrationHandler, self).__init__(hs) | ||||
| 
 | ||||
|         self.auth = hs.get_auth() | ||||
|  | @ -49,6 +54,7 @@ class RegistrationHandler(BaseHandler): | |||
|         self._generate_user_id_linearizer = Linearizer( | ||||
|             name="_generate_user_id_linearizer", | ||||
|         ) | ||||
|         self._server_notices_mxid = hs.config.server_notices_mxid | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def check_username(self, localpart, guest_access_token=None, | ||||
|  | @ -338,6 +344,14 @@ class RegistrationHandler(BaseHandler): | |||
|             yield identity_handler.bind_threepid(c, user_id) | ||||
| 
 | ||||
|     def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): | ||||
|         # don't allow people to register the server notices mxid | ||||
|         if self._server_notices_mxid is not None: | ||||
|             if user_id == self._server_notices_mxid: | ||||
|                 raise SynapseError( | ||||
|                     400, "This user ID is reserved.", | ||||
|                     errcode=Codes.EXCLUSIVE | ||||
|                 ) | ||||
| 
 | ||||
|         # valid user IDs must not clash with any user ID namespaces claimed by | ||||
|         # application services. | ||||
|         services = self.store.get_app_services() | ||||
|  |  | |||
|  | @ -68,14 +68,27 @@ class RoomCreationHandler(BaseHandler): | |||
|         self.event_creation_handler = hs.get_event_creation_handler() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def create_room(self, requester, config, ratelimit=True): | ||||
|     def create_room(self, requester, config, ratelimit=True, | ||||
|                     creator_join_profile=None): | ||||
|         """ Creates a new room. | ||||
| 
 | ||||
|         Args: | ||||
|             requester (Requester): The user who requested the room creation. | ||||
|             requester (synapse.types.Requester): | ||||
|                 The user who requested the room creation. | ||||
|             config (dict) : A dict of configuration options. | ||||
|             ratelimit (bool): set to False to disable the rate limiter | ||||
| 
 | ||||
|             creator_join_profile (dict|None): | ||||
|                 Set to override the displayname and avatar for the creating | ||||
|                 user in this room. If unset, displayname and avatar will be | ||||
|                 derived from the user's profile. If set, should contain the | ||||
|                 values to go in the body of the 'join' event (typically | ||||
|                 `avatar_url` and/or `displayname`. | ||||
| 
 | ||||
|         Returns: | ||||
|             The new room ID. | ||||
|             Deferred[dict]: | ||||
|                 a dict containing the keys `room_id` and, if an alias was | ||||
|                 requested, `room_alias`. | ||||
|         Raises: | ||||
|             SynapseError if the room ID couldn't be stored, or something went | ||||
|             horribly wrong. | ||||
|  | @ -113,6 +126,10 @@ class RoomCreationHandler(BaseHandler): | |||
|             except Exception: | ||||
|                 raise SynapseError(400, "Invalid user_id: %s" % (i,)) | ||||
| 
 | ||||
|         yield self.event_creation_handler.assert_accepted_privacy_policy( | ||||
|             requester, | ||||
|         ) | ||||
| 
 | ||||
|         invite_3pid_list = config.get("invite_3pid", []) | ||||
| 
 | ||||
|         visibility = config.get("visibility", None) | ||||
|  | @ -176,7 +193,8 @@ class RoomCreationHandler(BaseHandler): | |||
|             initial_state=initial_state, | ||||
|             creation_content=creation_content, | ||||
|             room_alias=room_alias, | ||||
|             power_level_content_override=config.get("power_level_content_override", {}) | ||||
|             power_level_content_override=config.get("power_level_content_override", {}), | ||||
|             creator_join_profile=creator_join_profile, | ||||
|         ) | ||||
| 
 | ||||
|         if "name" in config: | ||||
|  | @ -256,6 +274,7 @@ class RoomCreationHandler(BaseHandler): | |||
|             creation_content, | ||||
|             room_alias, | ||||
|             power_level_content_override, | ||||
|             creator_join_profile, | ||||
|     ): | ||||
|         def create(etype, content, **kwargs): | ||||
|             e = { | ||||
|  | @ -299,6 +318,7 @@ class RoomCreationHandler(BaseHandler): | |||
|             room_id, | ||||
|             "join", | ||||
|             ratelimit=False, | ||||
|             content=creator_join_profile, | ||||
|         ) | ||||
| 
 | ||||
|         # We treat the power levels override specially as this needs to be one | ||||
|  |  | |||
|  | @ -17,11 +17,14 @@ | |||
| import abc | ||||
| import logging | ||||
| 
 | ||||
| from six.moves import http_client | ||||
| 
 | ||||
| from signedjson.key import decode_verify_key_bytes | ||||
| from signedjson.sign import verify_signed_json | ||||
| from twisted.internet import defer | ||||
| from unpaddedbase64 import decode_base64 | ||||
| 
 | ||||
| import synapse.server | ||||
| import synapse.types | ||||
| from synapse.api.constants import ( | ||||
|     EventTypes, Membership, | ||||
|  | @ -46,6 +49,11 @@ class RoomMemberHandler(object): | |||
|     __metaclass__ = abc.ABCMeta | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
| 
 | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
|         self.hs = hs | ||||
|         self.store = hs.get_datastore() | ||||
|         self.auth = hs.get_auth() | ||||
|  | @ -63,6 +71,7 @@ class RoomMemberHandler(object): | |||
| 
 | ||||
|         self.clock = hs.get_clock() | ||||
|         self.spam_checker = hs.get_spam_checker() | ||||
|         self._server_notices_mxid = self.config.server_notices_mxid | ||||
| 
 | ||||
|     @abc.abstractmethod | ||||
|     def _remote_join(self, requester, remote_room_hosts, room_id, user, content): | ||||
|  | @ -289,12 +298,36 @@ class RoomMemberHandler(object): | |||
|             is_blocked = yield self.store.is_room_blocked(room_id) | ||||
|             if is_blocked: | ||||
|                 raise SynapseError(403, "This room has been blocked on this server") | ||||
|         else: | ||||
|             # we don't allow people to reject invites to, or leave, the | ||||
|             # server notice room. | ||||
|             is_blocked = yield self._is_server_notice_room(room_id) | ||||
|             if is_blocked: | ||||
|                 raise SynapseError( | ||||
|                     http_client.FORBIDDEN, | ||||
|                     "You cannot leave this room", | ||||
|                 ) | ||||
| 
 | ||||
|         if effective_membership_state == Membership.INVITE: | ||||
|             # block any attempts to invite the server notices mxid | ||||
|             if target.to_string() == self._server_notices_mxid: | ||||
|                 raise SynapseError( | ||||
|                     http_client.FORBIDDEN, | ||||
|                     "Cannot invite this user", | ||||
|                 ) | ||||
| 
 | ||||
|         if effective_membership_state == "invite": | ||||
|             block_invite = False | ||||
|             is_requester_admin = yield self.auth.is_server_admin( | ||||
|                 requester.user, | ||||
|             ) | ||||
| 
 | ||||
|             if (self._server_notices_mxid is not None and | ||||
|                     requester.user.to_string() == self._server_notices_mxid): | ||||
|                 # allow the server notices mxid to send invites | ||||
|                 is_requester_admin = True | ||||
| 
 | ||||
|             else: | ||||
|                 is_requester_admin = yield self.auth.is_server_admin( | ||||
|                     requester.user, | ||||
|                 ) | ||||
| 
 | ||||
|             if not is_requester_admin: | ||||
|                 if self.config.block_non_admin_invites: | ||||
|                     logger.info( | ||||
|  | @ -844,6 +877,13 @@ class RoomMemberHandler(object): | |||
| 
 | ||||
|         defer.returnValue(False) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _is_server_notice_room(self, room_id): | ||||
|         if self._server_notices_mxid is None: | ||||
|             defer.returnValue(False) | ||||
|         user_ids = yield self.store.get_users_in_room(room_id) | ||||
|         defer.returnValue(self._server_notices_mxid in user_ids) | ||||
| 
 | ||||
| 
 | ||||
| class RoomMemberMasterHandler(RoomMemberHandler): | ||||
|     def __init__(self, hs): | ||||
|  |  | |||
|  | @ -17,6 +17,7 @@ | |||
| import logging | ||||
| 
 | ||||
| from prometheus_client.core import Counter, Histogram | ||||
| from synapse.metrics import LaterGauge | ||||
| 
 | ||||
| from synapse.util.logcontext import LoggingContext | ||||
| 
 | ||||
|  | @ -50,14 +51,75 @@ response_db_sched_duration = Counter("synapse_http_request_response_db_sched_dur | |||
| response_size = Counter("synapse_http_request_response_size", "", ["method", "servlet", "tag"] | ||||
| ) | ||||
| 
 | ||||
| # In flight metrics are incremented while the requests are in flight, rather | ||||
| # than when the response was written. | ||||
| 
 | ||||
| in_flight_requests_ru_utime = Counter("synapse_http_request_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]) | ||||
| 
 | ||||
| in_flight_requests_ru_stime = Counter("synapse_http_request_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]) | ||||
| 
 | ||||
| in_flight_requests_db_txn_count = Counter("synapse_http_request_in_flight_requests_db_txn_count", "", ["method", "servlet"]) | ||||
| 
 | ||||
| # seconds spent waiting for db txns, excluding scheduling time, when processing | ||||
| # this request | ||||
| in_flight_requests_db_txn_duration = Counter("synapse_http_request_in_flight_requests_db_txn_duration_seconds", "", ["method", "servlet"]) | ||||
| 
 | ||||
| # seconds spent waiting for a db connection, when processing this request | ||||
| in_flight_requests_db_sched_duration = Counter("synapse_http_request_in_flight_requests_db_sched_duration_seconds", "", ["method", "servlet"]) | ||||
| 
 | ||||
| # The set of all in flight requests, set[RequestMetrics] | ||||
| _in_flight_requests = set() | ||||
| 
 | ||||
| 
 | ||||
| def _collect_in_flight(): | ||||
|     """Called just before metrics are collected, so we use it to update all | ||||
|     the in flight request metrics | ||||
|     """ | ||||
| 
 | ||||
|     for rm in _in_flight_requests: | ||||
|         rm.update_metrics() | ||||
| 
 | ||||
| 
 | ||||
| metrics.register_collector(_collect_in_flight) | ||||
| 
 | ||||
| 
 | ||||
| def _get_in_flight_counts(): | ||||
|     """Returns a count of all in flight requests by (method, server_name) | ||||
| 
 | ||||
|     Returns: | ||||
|         dict[tuple[str, str], int] | ||||
|     """ | ||||
| 
 | ||||
|     # Map from (method, name) -> int, the number of in flight requests of that | ||||
|     # type | ||||
|     counts = {} | ||||
|     for rm in _in_flight_requests: | ||||
|         key = (rm.method, rm.name,) | ||||
|         counts[key] = counts.get(key, 0) + 1 | ||||
| 
 | ||||
|     return counts | ||||
| 
 | ||||
| LaterGauge( | ||||
|     "synapse_http_request_metrics_in_flight_requests_count", "", | ||||
|     ["method", "servlet"], | ||||
|     _get_in_flight_counts | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| class RequestMetrics(object): | ||||
|     def start(self, time_msec, name): | ||||
|     def start(self, time_msec, name, method): | ||||
|         self.start = time_msec | ||||
|         self.start_context = LoggingContext.current_context() | ||||
|         self.name = name | ||||
|         self.method = method | ||||
| 
 | ||||
|         self._request_stats = _RequestStats.from_context(self.start_context) | ||||
| 
 | ||||
|         _in_flight_requests.add(self) | ||||
| 
 | ||||
|     def stop(self, time_msec, request): | ||||
|         _in_flight_requests.discard(self) | ||||
| 
 | ||||
|         context = LoggingContext.current_context() | ||||
| 
 | ||||
|         tag = "" | ||||
|  | @ -87,3 +149,76 @@ class RequestMetrics(object): | |||
|             context.db_sched_duration_ms / 1000.) | ||||
| 
 | ||||
|         response_size.labels(request.method, self.name, tag).inc(request.sentLength) | ||||
| 
 | ||||
|         # We always call this at the end to ensure that we update the metrics | ||||
|         # regardless of whether a call to /metrics while the request was in | ||||
|         # flight. | ||||
|         self.update_metrics() | ||||
| 
 | ||||
|     def update_metrics(self): | ||||
|         """Updates the in flight metrics with values from this request. | ||||
|         """ | ||||
|         diff = self._request_stats.update(self.start_context) | ||||
| 
 | ||||
|         in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) | ||||
|         in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) | ||||
| 
 | ||||
|         in_flight_requests_db_txn_count.labels(self.method, self.name).inc(diff.db_txn_count) | ||||
| 
 | ||||
|         in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(diff.db_txn_duration_ms / 1000.) | ||||
| 
 | ||||
|         in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(diff.db_sched_duration_ms / 1000.) | ||||
| 
 | ||||
| 
 | ||||
| class _RequestStats(object): | ||||
|     """Keeps tracks of various metrics for an in flight request. | ||||
|     """ | ||||
| 
 | ||||
|     __slots__ = [ | ||||
|         "ru_utime", "ru_stime", | ||||
|         "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", | ||||
|     ] | ||||
| 
 | ||||
|     def __init__(self, ru_utime, ru_stime, db_txn_count, | ||||
|                  db_txn_duration_ms, db_sched_duration_ms): | ||||
|         self.ru_utime = ru_utime | ||||
|         self.ru_stime = ru_stime | ||||
|         self.db_txn_count = db_txn_count | ||||
|         self.db_txn_duration_ms = db_txn_duration_ms | ||||
|         self.db_sched_duration_ms = db_sched_duration_ms | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def from_context(context): | ||||
|         ru_utime, ru_stime = context.get_resource_usage() | ||||
| 
 | ||||
|         return _RequestStats( | ||||
|             ru_utime, ru_stime, | ||||
|             context.db_txn_count, | ||||
|             context.db_txn_duration_ms, | ||||
|             context.db_sched_duration_ms, | ||||
|         ) | ||||
| 
 | ||||
|     def update(self, context): | ||||
|         """Updates the current values and returns the difference between the | ||||
|         old and new values. | ||||
| 
 | ||||
|         Returns: | ||||
|             _RequestStats: The difference between the old and new values | ||||
|         """ | ||||
|         new = _RequestStats.from_context(context) | ||||
| 
 | ||||
|         diff = _RequestStats( | ||||
|             new.ru_utime - self.ru_utime, | ||||
|             new.ru_stime - self.ru_stime, | ||||
|             new.db_txn_count - self.db_txn_count, | ||||
|             new.db_txn_duration_ms - self.db_txn_duration_ms, | ||||
|             new.db_sched_duration_ms - self.db_sched_duration_ms, | ||||
|         ) | ||||
| 
 | ||||
|         self.ru_utime = new.ru_utime | ||||
|         self.ru_stime = new.ru_stime | ||||
|         self.db_txn_count = new.db_txn_count | ||||
|         self.db_txn_duration_ms = new.db_txn_duration_ms | ||||
|         self.db_sched_duration_ms = new.db_sched_duration_ms | ||||
| 
 | ||||
|         return diff | ||||
|  |  | |||
|  | @ -13,7 +13,8 @@ | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| import cgi | ||||
| from six.moves import http_client | ||||
| 
 | ||||
| from synapse.api.errors import ( | ||||
|     cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes | ||||
|  | @ -44,6 +45,18 @@ import simplejson | |||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| HTML_ERROR_TEMPLATE = """<!DOCTYPE html> | ||||
| <html lang=en> | ||||
|   <head> | ||||
|     <meta charset="utf-8"> | ||||
|     <title>Error {code}</title> | ||||
|   </head> | ||||
|   <body> | ||||
|      <p>{msg}</p> | ||||
|   </body> | ||||
| </html> | ||||
| """ | ||||
| 
 | ||||
| 
 | ||||
| def wrap_json_request_handler(h): | ||||
|     """Wraps a request handler method with exception handling. | ||||
|  | @ -102,6 +115,65 @@ def wrap_json_request_handler(h): | |||
|     return wrap_request_handler_with_logging(wrapped_request_handler) | ||||
| 
 | ||||
| 
 | ||||
| def wrap_html_request_handler(h): | ||||
|     """Wraps a request handler method with exception handling. | ||||
| 
 | ||||
|     Also adds logging as per wrap_request_handler_with_logging. | ||||
| 
 | ||||
|     The handler method must have a signature of "handle_foo(self, request)", | ||||
|     where "self" must have a "clock" attribute (and "request" must be a | ||||
|     SynapseRequest). | ||||
|     """ | ||||
|     def wrapped_request_handler(self, request): | ||||
|         d = defer.maybeDeferred(h, self, request) | ||||
|         d.addErrback(_return_html_error, request) | ||||
|         return d | ||||
| 
 | ||||
|     return wrap_request_handler_with_logging(wrapped_request_handler) | ||||
| 
 | ||||
| 
 | ||||
| def _return_html_error(f, request): | ||||
|     """Sends an HTML error page corresponding to the given failure | ||||
| 
 | ||||
|     Args: | ||||
|         f (twisted.python.failure.Failure): | ||||
|         request (twisted.web.iweb.IRequest): | ||||
|     """ | ||||
|     if f.check(CodeMessageException): | ||||
|         cme = f.value | ||||
|         code = cme.code | ||||
|         msg = cme.msg | ||||
| 
 | ||||
|         if isinstance(cme, SynapseError): | ||||
|             logger.info( | ||||
|                 "%s SynapseError: %s - %s", request, code, msg | ||||
|             ) | ||||
|         else: | ||||
|             logger.error( | ||||
|                 "Failed handle request %r: %s", | ||||
|                 request, | ||||
|                 f.getTraceback().rstrip(), | ||||
|             ) | ||||
|     else: | ||||
|         code = http_client.INTERNAL_SERVER_ERROR | ||||
|         msg = "Internal server error" | ||||
| 
 | ||||
|         logger.error( | ||||
|             "Failed handle request %r: %s", | ||||
|             request, | ||||
|             f.getTraceback().rstrip(), | ||||
|         ) | ||||
| 
 | ||||
|     body = HTML_ERROR_TEMPLATE.format( | ||||
|         code=code, msg=cgi.escape(msg), | ||||
|     ).encode("utf-8") | ||||
|     request.setResponseCode(code) | ||||
|     request.setHeader(b"Content-Type", b"text/html; charset=utf-8") | ||||
|     request.setHeader(b"Content-Length", b"%i" % (len(body),)) | ||||
|     request.write(body) | ||||
|     finish_request(request) | ||||
| 
 | ||||
| 
 | ||||
| def wrap_request_handler_with_logging(h): | ||||
|     """Wraps a request handler to provide logging and metrics | ||||
| 
 | ||||
|  | @ -132,7 +204,7 @@ def wrap_request_handler_with_logging(h): | |||
|                 servlet_name = self.__class__.__name__ | ||||
|                 with request.processing(servlet_name): | ||||
|                     with PreserveLoggingContext(request_context): | ||||
|                         d = h(self, request) | ||||
|                         d = defer.maybeDeferred(h, self, request) | ||||
| 
 | ||||
|                         # record the arrival of the request *after* | ||||
|                         # dispatching to the handler, so that the handler | ||||
|  |  | |||
|  | @ -85,7 +85,9 @@ class SynapseRequest(Request): | |||
|     def _started_processing(self, servlet_name): | ||||
|         self.start_time = int(time.time() * 1000) | ||||
|         self.request_metrics = RequestMetrics() | ||||
|         self.request_metrics.start(self.start_time, name=servlet_name) | ||||
|         self.request_metrics.start( | ||||
|             self.start_time, name=servlet_name, method=self.method, | ||||
|         ) | ||||
| 
 | ||||
|         self.site.access_logger.info( | ||||
|             "%s - %s - Received request: %s %s", | ||||
|  |  | |||
|  | @ -62,8 +62,12 @@ class LaterGauge(object): | |||
| 
 | ||||
|         yield g | ||||
| 
 | ||||
|     def register(self): | ||||
|     def __attrs_post_init__(self): | ||||
|         self._register() | ||||
| 
 | ||||
|     def _register(self): | ||||
|         if self.name in all_gauges.keys(): | ||||
|             logger.warning("%s already registered, reregistering" % (self.name,)) | ||||
|             REGISTRY.unregister(all_gauges.pop(self.name)) | ||||
| 
 | ||||
|         REGISTRY.register(self) | ||||
|  |  | |||
|  | @ -68,6 +68,7 @@ class ReplicationStreamer(object): | |||
|         self.presence_handler = hs.get_presence_handler() | ||||
|         self.clock = hs.get_clock() | ||||
|         self.notifier = hs.get_notifier() | ||||
|         self._server_notices_sender = hs.get_server_notices_sender() | ||||
| 
 | ||||
|         # Current connections. | ||||
|         self.connections = [] | ||||
|  | @ -251,6 +252,7 @@ class ReplicationStreamer(object): | |||
|         yield self.store.insert_client_ip( | ||||
|             user_id, access_token, ip, user_agent, device_id, last_seen, | ||||
|         ) | ||||
|         yield self._server_notices_sender.on_user_ip(user_id) | ||||
| 
 | ||||
|     def send_sync_to_all_connections(self, data): | ||||
|         """Sends a SYNC command to all clients. | ||||
|  |  | |||
|  | @ -19,6 +19,7 @@ import logging | |||
| 
 | ||||
| from synapse.api.auth import get_access_token_from_request | ||||
| from synapse.util.async import ObservableDeferred | ||||
| from synapse.util.logcontext import make_deferred_yieldable, run_in_background | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
|  | @ -80,27 +81,26 @@ class HttpTransactionCache(object): | |||
|         Returns: | ||||
|             Deferred which resolves to a tuple of (response_code, response_dict). | ||||
|         """ | ||||
|         try: | ||||
|             return self.transactions[txn_key][0].observe() | ||||
|         except (KeyError, IndexError): | ||||
|             pass  # execute the function instead. | ||||
|         if txn_key in self.transactions: | ||||
|             observable = self.transactions[txn_key][0] | ||||
|         else: | ||||
|             # execute the function instead. | ||||
|             deferred = run_in_background(fn, *args, **kwargs) | ||||
| 
 | ||||
|         deferred = fn(*args, **kwargs) | ||||
|             observable = ObservableDeferred(deferred) | ||||
|             self.transactions[txn_key] = (observable, self.clock.time_msec()) | ||||
| 
 | ||||
|         # if the request fails with a Twisted failure, remove it | ||||
|         # from the transaction map. This is done to ensure that we don't | ||||
|         # cache transient errors like rate-limiting errors, etc. | ||||
|         def remove_from_map(err): | ||||
|             self.transactions.pop(txn_key, None) | ||||
|             return err | ||||
|         deferred.addErrback(remove_from_map) | ||||
|             # if the request fails with an exception, remove it | ||||
|             # from the transaction map. This is done to ensure that we don't | ||||
|             # cache transient errors like rate-limiting errors, etc. | ||||
|             def remove_from_map(err): | ||||
|                 self.transactions.pop(txn_key, None) | ||||
|                 # we deliberately do not propagate the error any further, as we | ||||
|                 # expect the observers to have reported it. | ||||
| 
 | ||||
|         # We don't add any other errbacks to the raw deferred, so we ask | ||||
|         # ObservableDeferred to swallow the error. This is fine as the error will | ||||
|         # still be reported to the observers. | ||||
|         observable = ObservableDeferred(deferred, consumeErrors=True) | ||||
|         self.transactions[txn_key] = (observable, self.clock.time_msec()) | ||||
|         return observable.observe() | ||||
|             deferred.addErrback(remove_from_map) | ||||
| 
 | ||||
|         return make_deferred_yieldable(observable.observe()) | ||||
| 
 | ||||
|     def _cleanup(self): | ||||
|         now = self.clock.time_msec() | ||||
|  |  | |||
|  | @ -151,10 +151,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): | |||
|             if event.room_id != room_id: | ||||
|                 raise SynapseError(400, "Event is for wrong room.") | ||||
| 
 | ||||
|             depth = event.depth | ||||
|             token = yield self.store.get_topological_token_for_event(event_id) | ||||
| 
 | ||||
|             logger.info( | ||||
|                 "[purge] purging up to depth %i (event_id %s)", | ||||
|                 depth, event_id, | ||||
|                 "[purge] purging up to token %s (event_id %s)", | ||||
|                 token, event_id, | ||||
|             ) | ||||
|         elif 'purge_up_to_ts' in body: | ||||
|             ts = body['purge_up_to_ts'] | ||||
|  | @ -174,7 +175,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): | |||
|                 ) | ||||
|             ) | ||||
|             if room_event_after_stream_ordering: | ||||
|                 (_, depth, _) = room_event_after_stream_ordering | ||||
|                 token = yield self.store.get_topological_token_for_event( | ||||
|                     room_event_after_stream_ordering, | ||||
|                 ) | ||||
|             else: | ||||
|                 logger.warn( | ||||
|                     "[purge] purging events not possible: No event found " | ||||
|  | @ -187,9 +190,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): | |||
|                     errcode=Codes.NOT_FOUND, | ||||
|                 ) | ||||
|             logger.info( | ||||
|                 "[purge] purging up to depth %i (received_ts %i => " | ||||
|                 "[purge] purging up to token %d (received_ts %i => " | ||||
|                 "stream_ordering %i)", | ||||
|                 depth, ts, stream_ordering, | ||||
|                 token, ts, stream_ordering, | ||||
|             ) | ||||
|         else: | ||||
|             raise SynapseError( | ||||
|  | @ -199,7 +202,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): | |||
|             ) | ||||
| 
 | ||||
|         purge_id = yield self.handlers.message_handler.start_purge_history( | ||||
|             room_id, depth, | ||||
|             room_id, token, | ||||
|             delete_local_events=delete_local_events, | ||||
|         ) | ||||
| 
 | ||||
|  | @ -273,8 +276,8 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): | |||
|     def __init__(self, hs): | ||||
|         super(ShutdownRoomRestServlet, self).__init__(hs) | ||||
|         self.store = hs.get_datastore() | ||||
|         self.handlers = hs.get_handlers() | ||||
|         self.state = hs.get_state_handler() | ||||
|         self._room_creation_handler = hs.get_room_creation_handler() | ||||
|         self.event_creation_handler = hs.get_event_creation_handler() | ||||
|         self.room_member_handler = hs.get_room_member_handler() | ||||
| 
 | ||||
|  | @ -296,7 +299,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): | |||
|         message = content.get("message", self.DEFAULT_MESSAGE) | ||||
|         room_name = content.get("room_name", "Content Violation Notification") | ||||
| 
 | ||||
|         info = yield self.handlers.room_creation_handler.create_room( | ||||
|         info = yield self._room_creation_handler.create_room( | ||||
|             room_creator_requester, | ||||
|             config={ | ||||
|                 "preset": "public_chat", | ||||
|  |  | |||
|  | @ -41,7 +41,7 @@ class RoomCreateRestServlet(ClientV1RestServlet): | |||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         super(RoomCreateRestServlet, self).__init__(hs) | ||||
|         self.handlers = hs.get_handlers() | ||||
|         self._room_creation_handler = hs.get_room_creation_handler() | ||||
| 
 | ||||
|     def register(self, http_server): | ||||
|         PATTERNS = "/createRoom" | ||||
|  | @ -64,8 +64,7 @@ class RoomCreateRestServlet(ClientV1RestServlet): | |||
|     def on_POST(self, request): | ||||
|         requester = yield self.auth.get_user_by_req(request) | ||||
| 
 | ||||
|         handler = self.handlers.room_creation_handler | ||||
|         info = yield handler.create_room( | ||||
|         info = yield self._room_creation_handler.create_room( | ||||
|             requester, self.get_room_config(request) | ||||
|         ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -85,6 +85,7 @@ class SyncRestServlet(RestServlet): | |||
|         self.clock = hs.get_clock() | ||||
|         self.filtering = hs.get_filtering() | ||||
|         self.presence_handler = hs.get_presence_handler() | ||||
|         self._server_notices_sender = hs.get_server_notices_sender() | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def on_GET(self, request): | ||||
|  | @ -149,6 +150,9 @@ class SyncRestServlet(RestServlet): | |||
|         else: | ||||
|             since_token = None | ||||
| 
 | ||||
|         # send any outstanding server notices to the user. | ||||
|         yield self._server_notices_sender.on_user_syncing(user.to_string()) | ||||
| 
 | ||||
|         affect_presence = set_presence != PresenceState.OFFLINE | ||||
| 
 | ||||
|         if affect_presence: | ||||
|  |  | |||
|  | @ -0,0 +1,222 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from hashlib import sha256 | ||||
| import hmac | ||||
| import logging | ||||
| from os import path | ||||
| from six.moves import http_client | ||||
| 
 | ||||
| import jinja2 | ||||
| from jinja2 import TemplateNotFound | ||||
| from twisted.internet import defer | ||||
| from twisted.web.resource import Resource | ||||
| from twisted.web.server import NOT_DONE_YET | ||||
| 
 | ||||
| from synapse.api.errors import NotFoundError, SynapseError, StoreError | ||||
| from synapse.config import ConfigError | ||||
| from synapse.http.server import ( | ||||
|     finish_request, | ||||
|     wrap_html_request_handler, | ||||
| ) | ||||
| from synapse.http.servlet import parse_string | ||||
| from synapse.types import UserID | ||||
| 
 | ||||
| 
 | ||||
| # language to use for the templates. TODO: figure this out from Accept-Language | ||||
| TEMPLATE_LANGUAGE = "en" | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| # use hmac.compare_digest if we have it (python 2.7.7), else just use equality | ||||
| if hasattr(hmac, "compare_digest"): | ||||
|     compare_digest = hmac.compare_digest | ||||
| else: | ||||
|     def compare_digest(a, b): | ||||
|         return a == b | ||||
| 
 | ||||
| 
 | ||||
| class ConsentResource(Resource): | ||||
|     """A twisted Resource to display a privacy policy and gather consent to it | ||||
| 
 | ||||
|     When accessed via GET, returns the privacy policy via a template. | ||||
| 
 | ||||
|     When accessed via POST, records the user's consent in the database and | ||||
|     displays a success page. | ||||
| 
 | ||||
|     The config should include a template_dir setting which contains templates | ||||
|     for the HTML. The directory should contain one subdirectory per language | ||||
|     (eg, 'en', 'fr'), and each language directory should contain the policy | ||||
|     document (named as '<version>.html') and a success page (success.html). | ||||
| 
 | ||||
|     Both forms take a set of parameters from the browser. For the POST form, | ||||
|     these are normally sent as form parameters (but may be query-params); for | ||||
|     GET requests they must be query params. These are: | ||||
| 
 | ||||
|         u: the complete mxid, or the localpart of the user giving their | ||||
|            consent. Required for both GET (where it is used as an input to the | ||||
|            template) and for POST (where it is used to find the row in the db | ||||
|            to update). | ||||
| 
 | ||||
|         h: hmac_sha256(secret, u), where 'secret' is the privacy_secret in the | ||||
|            config file. If it doesn't match, the request is 403ed. | ||||
| 
 | ||||
|         v: the version of the privacy policy being agreed to. | ||||
| 
 | ||||
|            For GET: optional, and defaults to whatever was set in the config | ||||
|            file. Used to choose the version of the policy to pick from the | ||||
|            templates directory. | ||||
| 
 | ||||
|            For POST: required; gives the value to be recorded in the database | ||||
|            against the user. | ||||
|     """ | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): homeserver | ||||
|         """ | ||||
|         Resource.__init__(self) | ||||
| 
 | ||||
|         self.hs = hs | ||||
|         self.store = hs.get_datastore() | ||||
| 
 | ||||
|         # this is required by the request_handler wrapper | ||||
|         self.clock = hs.get_clock() | ||||
| 
 | ||||
|         self._default_consent_version = hs.config.user_consent_version | ||||
|         if self._default_consent_version is None: | ||||
|             raise ConfigError( | ||||
|                 "Consent resource is enabled but user_consent section is " | ||||
|                 "missing in config file.", | ||||
|             ) | ||||
| 
 | ||||
|         # daemonize changes the cwd to /, so make the path absolute now. | ||||
|         consent_template_directory = path.abspath( | ||||
|             hs.config.user_consent_template_dir, | ||||
|         ) | ||||
|         if not path.isdir(consent_template_directory): | ||||
|             raise ConfigError( | ||||
|                 "Could not find template directory '%s'" % ( | ||||
|                     consent_template_directory, | ||||
|                 ), | ||||
|             ) | ||||
| 
 | ||||
|         loader = jinja2.FileSystemLoader(consent_template_directory) | ||||
|         self._jinja_env = jinja2.Environment( | ||||
|             loader=loader, | ||||
|             autoescape=jinja2.select_autoescape(['html', 'htm', 'xml']), | ||||
|         ) | ||||
| 
 | ||||
|         if hs.config.form_secret is None: | ||||
|             raise ConfigError( | ||||
|                 "Consent resource is enabled but form_secret is not set in " | ||||
|                 "config file. It should be set to an arbitrary secret string.", | ||||
|             ) | ||||
| 
 | ||||
|         self._hmac_secret = hs.config.form_secret.encode("utf-8") | ||||
| 
 | ||||
|     def render_GET(self, request): | ||||
|         self._async_render_GET(request) | ||||
|         return NOT_DONE_YET | ||||
| 
 | ||||
|     @wrap_html_request_handler | ||||
|     @defer.inlineCallbacks | ||||
|     def _async_render_GET(self, request): | ||||
|         """ | ||||
|         Args: | ||||
|             request (twisted.web.http.Request): | ||||
|         """ | ||||
| 
 | ||||
|         version = parse_string(request, "v", | ||||
|                                default=self._default_consent_version) | ||||
|         username = parse_string(request, "u", required=True) | ||||
|         userhmac = parse_string(request, "h", required=True) | ||||
| 
 | ||||
|         self._check_hash(username, userhmac) | ||||
| 
 | ||||
|         if username.startswith('@'): | ||||
|             qualified_user_id = username | ||||
|         else: | ||||
|             qualified_user_id = UserID(username, self.hs.hostname).to_string() | ||||
| 
 | ||||
|         u = yield self.store.get_user_by_id(qualified_user_id) | ||||
|         if u is None: | ||||
|             raise NotFoundError("Unknown user") | ||||
| 
 | ||||
|         try: | ||||
|             self._render_template( | ||||
|                 request, "%s.html" % (version,), | ||||
|                 user=username, userhmac=userhmac, version=version, | ||||
|                 has_consented=(u["consent_version"] == version), | ||||
|             ) | ||||
|         except TemplateNotFound: | ||||
|             raise NotFoundError("Unknown policy version") | ||||
| 
 | ||||
|     def render_POST(self, request): | ||||
|         self._async_render_POST(request) | ||||
|         return NOT_DONE_YET | ||||
| 
 | ||||
|     @wrap_html_request_handler | ||||
|     @defer.inlineCallbacks | ||||
|     def _async_render_POST(self, request): | ||||
|         """ | ||||
|         Args: | ||||
|             request (twisted.web.http.Request): | ||||
|         """ | ||||
|         version = parse_string(request, "v", required=True) | ||||
|         username = parse_string(request, "u", required=True) | ||||
|         userhmac = parse_string(request, "h", required=True) | ||||
| 
 | ||||
|         self._check_hash(username, userhmac) | ||||
| 
 | ||||
|         if username.startswith('@'): | ||||
|             qualified_user_id = username | ||||
|         else: | ||||
|             qualified_user_id = UserID(username, self.hs.hostname).to_string() | ||||
| 
 | ||||
|         try: | ||||
|             yield self.store.user_set_consent_version(qualified_user_id, version) | ||||
|         except StoreError as e: | ||||
|             if e.code != 404: | ||||
|                 raise | ||||
|             raise NotFoundError("Unknown user") | ||||
| 
 | ||||
|         try: | ||||
|             self._render_template(request, "success.html") | ||||
|         except TemplateNotFound: | ||||
|             raise NotFoundError("success.html not found") | ||||
| 
 | ||||
|     def _render_template(self, request, template_name, **template_args): | ||||
|         # get_template checks for ".." so we don't need to worry too much | ||||
|         # about path traversal here. | ||||
|         template_html = self._jinja_env.get_template( | ||||
|             path.join(TEMPLATE_LANGUAGE, template_name) | ||||
|         ) | ||||
|         html_bytes = template_html.render(**template_args).encode("utf8") | ||||
| 
 | ||||
|         request.setHeader(b"Content-Type", b"text/html; charset=utf-8") | ||||
|         request.setHeader(b"Content-Length", b"%i" % len(html_bytes)) | ||||
|         request.write(html_bytes) | ||||
|         finish_request(request) | ||||
| 
 | ||||
|     def _check_hash(self, userid, userhmac): | ||||
|         want_mac = hmac.new( | ||||
|             key=self._hmac_secret, | ||||
|             msg=userid, | ||||
|             digestmod=sha256, | ||||
|         ).hexdigest() | ||||
| 
 | ||||
|         if not compare_digest(want_mac, userhmac): | ||||
|             raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect") | ||||
|  | @ -46,6 +46,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler | |||
| from synapse.handlers.device import DeviceHandler | ||||
| from synapse.handlers.e2e_keys import E2eKeysHandler | ||||
| from synapse.handlers.presence import PresenceHandler | ||||
| from synapse.handlers.room import RoomCreationHandler | ||||
| from synapse.handlers.room_list import RoomListHandler | ||||
| from synapse.handlers.room_member import RoomMemberMasterHandler | ||||
| from synapse.handlers.room_member_worker import RoomMemberWorkerHandler | ||||
|  | @ -71,6 +72,11 @@ from synapse.rest.media.v1.media_repository import ( | |||
|     MediaRepository, | ||||
|     MediaRepositoryResource, | ||||
| ) | ||||
| from synapse.server_notices.server_notices_manager import ServerNoticesManager | ||||
| from synapse.server_notices.server_notices_sender import ServerNoticesSender | ||||
| from synapse.server_notices.worker_server_notices_sender import ( | ||||
|     WorkerServerNoticesSender, | ||||
| ) | ||||
| from synapse.state import StateHandler, StateResolutionHandler | ||||
| from synapse.storage import DataStore | ||||
| from synapse.streams.events import EventSources | ||||
|  | @ -97,6 +103,9 @@ class HomeServer(object): | |||
|     which must be implemented by the subclass. This code may call any of the | ||||
|     required "get" methods on the instance to obtain the sub-dependencies that | ||||
|     one requires. | ||||
| 
 | ||||
|     Attributes: | ||||
|         config (synapse.config.homeserver.HomeserverConfig): | ||||
|     """ | ||||
| 
 | ||||
|     DEPENDENCIES = [ | ||||
|  | @ -106,6 +115,7 @@ class HomeServer(object): | |||
|         'federation_server', | ||||
|         'handlers', | ||||
|         'auth', | ||||
|         'room_creation_handler', | ||||
|         'state_handler', | ||||
|         'state_resolution_handler', | ||||
|         'presence_handler', | ||||
|  | @ -151,6 +161,8 @@ class HomeServer(object): | |||
|         'spam_checker', | ||||
|         'room_member_handler', | ||||
|         'federation_registry', | ||||
|         'server_notices_manager', | ||||
|         'server_notices_sender', | ||||
|     ] | ||||
| 
 | ||||
|     def __init__(self, hostname, **kwargs): | ||||
|  | @ -224,6 +236,9 @@ class HomeServer(object): | |||
|     def build_simple_http_client(self): | ||||
|         return SimpleHttpClient(self) | ||||
| 
 | ||||
|     def build_room_creation_handler(self): | ||||
|         return RoomCreationHandler(self) | ||||
| 
 | ||||
|     def build_state_handler(self): | ||||
|         return StateHandler(self) | ||||
| 
 | ||||
|  | @ -390,6 +405,16 @@ class HomeServer(object): | |||
|     def build_federation_registry(self): | ||||
|         return FederationHandlerRegistry() | ||||
| 
 | ||||
|     def build_server_notices_manager(self): | ||||
|         if self.config.worker_app: | ||||
|             raise Exception("Workers cannot send server notices") | ||||
|         return ServerNoticesManager(self) | ||||
| 
 | ||||
|     def build_server_notices_sender(self): | ||||
|         if self.config.worker_app: | ||||
|             return WorkerServerNoticesSender(self) | ||||
|         return ServerNoticesSender(self) | ||||
| 
 | ||||
|     def remove_pusher(self, app_id, push_key, user_id): | ||||
|         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,4 +1,5 @@ | |||
| import synapse.api.auth | ||||
| import synapse.config.homeserver | ||||
| import synapse.federation.transaction_queue | ||||
| import synapse.federation.transport.client | ||||
| import synapse.handlers | ||||
|  | @ -8,11 +9,17 @@ import synapse.handlers.device | |||
| import synapse.handlers.e2e_keys | ||||
| import synapse.handlers.set_password | ||||
| import synapse.rest.media.v1.media_repository | ||||
| import synapse.server_notices.server_notices_manager | ||||
| import synapse.server_notices.server_notices_sender | ||||
| import synapse.state | ||||
| import synapse.storage | ||||
| 
 | ||||
| 
 | ||||
| class HomeServer(object): | ||||
|     @property | ||||
|     def config(self) -> synapse.config.homeserver.HomeServerConfig: | ||||
|         pass | ||||
| 
 | ||||
|     def get_auth(self) -> synapse.api.auth.Auth: | ||||
|         pass | ||||
| 
 | ||||
|  | @ -40,6 +47,12 @@ class HomeServer(object): | |||
|     def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler: | ||||
|         pass | ||||
| 
 | ||||
|     def get_room_creation_handler(self) -> synapse.handlers.room.RoomCreationHandler: | ||||
|         pass | ||||
| 
 | ||||
|     def get_event_creation_handler(self) -> synapse.handlers.message.EventCreationHandler: | ||||
|         pass | ||||
| 
 | ||||
|     def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler: | ||||
|         pass | ||||
| 
 | ||||
|  | @ -54,3 +67,9 @@ class HomeServer(object): | |||
| 
 | ||||
|     def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository: | ||||
|         pass | ||||
| 
 | ||||
|     def get_server_notices_manager(self) -> synapse.server_notices.server_notices_manager.ServerNoticesManager: | ||||
|         pass | ||||
| 
 | ||||
|     def get_server_notices_sender(self) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: | ||||
|         pass | ||||
|  |  | |||
|  | @ -0,0 +1,95 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| import logging | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.api.errors import SynapseError | ||||
| from synapse.config import ConfigError | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class ConsentServerNotices(object): | ||||
|     """Keeps track of whether we need to send users server_notices about | ||||
|     privacy policy consent, and sends one if we do. | ||||
|     """ | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
| 
 | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
|         self._server_notices_manager = hs.get_server_notices_manager() | ||||
|         self._store = hs.get_datastore() | ||||
| 
 | ||||
|         self._users_in_progress = set() | ||||
| 
 | ||||
|         self._current_consent_version = hs.config.user_consent_version | ||||
|         self._server_notice_content = hs.config.user_consent_server_notice_content | ||||
| 
 | ||||
|         if self._server_notice_content is not None: | ||||
|             if not self._server_notices_manager.is_enabled(): | ||||
|                 raise ConfigError( | ||||
|                     "user_consent configuration requires server notices, but " | ||||
|                     "server notices are not enabled.", | ||||
|                 ) | ||||
|             if 'body' not in self._server_notice_content: | ||||
|                 raise ConfigError( | ||||
|                     "user_consent server_notice_consent must contain a 'body' " | ||||
|                     "key.", | ||||
|                 ) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def maybe_send_server_notice_to_user(self, user_id): | ||||
|         """Check if we need to send a notice to this user, and does so if so | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): user to check | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred | ||||
|         """ | ||||
|         if self._server_notice_content is None: | ||||
|             # not enabled | ||||
|             return | ||||
| 
 | ||||
|         # make sure we don't send two messages to the same user at once | ||||
|         if user_id in self._users_in_progress: | ||||
|             return | ||||
|         self._users_in_progress.add(user_id) | ||||
|         try: | ||||
|             u = yield self._store.get_user_by_id(user_id) | ||||
| 
 | ||||
|             if u["consent_version"] == self._current_consent_version: | ||||
|                 # user has already consented | ||||
|                 return | ||||
| 
 | ||||
|             if u["consent_server_notice_sent"] == self._current_consent_version: | ||||
|                 # we've already sent a notice to the user | ||||
|                 return | ||||
| 
 | ||||
|             # need to send a message | ||||
|             try: | ||||
|                 yield self._server_notices_manager.send_notice( | ||||
|                     user_id, self._server_notice_content, | ||||
|                 ) | ||||
|                 yield self._store.user_set_consent_server_notice_sent( | ||||
|                     user_id, self._current_consent_version, | ||||
|                 ) | ||||
|             except SynapseError as e: | ||||
|                 logger.error("Error sending server notice about user consent: %s", e) | ||||
|         finally: | ||||
|             self._users_in_progress.remove(user_id) | ||||
|  | @ -0,0 +1,131 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| import logging | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.api.constants import EventTypes, Membership, RoomCreationPreset | ||||
| from synapse.types import create_requester | ||||
| from synapse.util.caches.descriptors import cachedInlineCallbacks | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class ServerNoticesManager(object): | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
| 
 | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
| 
 | ||||
|         self._store = hs.get_datastore() | ||||
|         self._config = hs.config | ||||
|         self._room_creation_handler = hs.get_room_creation_handler() | ||||
|         self._event_creation_handler = hs.get_event_creation_handler() | ||||
| 
 | ||||
|     def is_enabled(self): | ||||
|         """Checks if server notices are enabled on this server. | ||||
| 
 | ||||
|         Returns: | ||||
|             bool | ||||
|         """ | ||||
|         return self._config.server_notices_mxid is not None | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def send_notice(self, user_id, event_content): | ||||
|         """Send a notice to the given user | ||||
| 
 | ||||
|         Creates the server notices room, if none exists. | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): mxid of user to send event to. | ||||
|             event_content (dict): content of event to send | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferrred[None] | ||||
|         """ | ||||
|         room_id = yield self.get_notice_room_for_user(user_id) | ||||
| 
 | ||||
|         system_mxid = self._config.server_notices_mxid | ||||
|         requester = create_requester(system_mxid) | ||||
| 
 | ||||
|         logger.info("Sending server notice to %s", user_id) | ||||
| 
 | ||||
|         yield self._event_creation_handler.create_and_send_nonmember_event( | ||||
|             requester, { | ||||
|                 "type": EventTypes.Message, | ||||
|                 "room_id": room_id, | ||||
|                 "sender": system_mxid, | ||||
|                 "content": event_content, | ||||
|             }, | ||||
|             ratelimit=False, | ||||
|         ) | ||||
| 
 | ||||
|     @cachedInlineCallbacks() | ||||
|     def get_notice_room_for_user(self, user_id): | ||||
|         """Get the room for notices for a given user | ||||
| 
 | ||||
|         If we have not yet created a notice room for this user, create it | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): complete user id for the user we want a room for | ||||
| 
 | ||||
|         Returns: | ||||
|             str: room id of notice room. | ||||
|         """ | ||||
|         if not self.is_enabled(): | ||||
|             raise Exception("Server notices not enabled") | ||||
| 
 | ||||
|         rooms = yield self._store.get_rooms_for_user_where_membership_is( | ||||
|             user_id, [Membership.INVITE, Membership.JOIN], | ||||
|         ) | ||||
|         system_mxid = self._config.server_notices_mxid | ||||
|         for room in rooms: | ||||
|             # it's worth noting that there is an asymmetry here in that we | ||||
|             # expect the user to be invited or joined, but the system user must | ||||
|             # be joined. This is kinda deliberate, in that if somebody somehow | ||||
|             # manages to invite the system user to a room, that doesn't make it | ||||
|             # the server notices room. | ||||
|             user_ids = yield self._store.get_users_in_room(room.room_id) | ||||
|             if system_mxid in user_ids: | ||||
|                 # we found a room which our user shares with the system notice | ||||
|                 # user | ||||
|                 logger.info("Using room %s", room.room_id) | ||||
|                 defer.returnValue(room.room_id) | ||||
| 
 | ||||
|         # apparently no existing notice room: create a new one | ||||
|         logger.info("Creating server notices room for %s", user_id) | ||||
| 
 | ||||
|         requester = create_requester(system_mxid) | ||||
|         info = yield self._room_creation_handler.create_room( | ||||
|             requester, | ||||
|             config={ | ||||
|                 "preset": RoomCreationPreset.PRIVATE_CHAT, | ||||
|                 "name": self._config.server_notices_room_name, | ||||
|                 "power_level_content_override": { | ||||
|                     "users_default": -10, | ||||
|                 }, | ||||
|                 "invite": (user_id,) | ||||
|             }, | ||||
|             ratelimit=False, | ||||
|             creator_join_profile={ | ||||
|                 "displayname": self._config.server_notices_mxid_display_name, | ||||
|             }, | ||||
|         ) | ||||
|         room_id = info['room_id'] | ||||
| 
 | ||||
|         logger.info("Created server notices room %s for %s", room_id, user_id) | ||||
|         defer.returnValue(room_id) | ||||
|  | @ -0,0 +1,58 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| from synapse.server_notices.consent_server_notices import ConsentServerNotices | ||||
| 
 | ||||
| 
 | ||||
| class ServerNoticesSender(object): | ||||
|     """A centralised place which sends server notices automatically when | ||||
|     Certain Events take place | ||||
|     """ | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
| 
 | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
|         # todo: it would be nice to make this more dynamic | ||||
|         self._consent_server_notices = ConsentServerNotices(hs) | ||||
| 
 | ||||
|     def on_user_syncing(self, user_id): | ||||
|         """Called when the user performs a sync operation. | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): mxid of user who synced | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred | ||||
|         """ | ||||
|         return self._consent_server_notices.maybe_send_server_notice_to_user( | ||||
|             user_id, | ||||
|         ) | ||||
| 
 | ||||
|     def on_user_ip(self, user_id): | ||||
|         """Called on the master when a worker process saw a client request. | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): mxid | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred | ||||
|         """ | ||||
|         # The synchrotrons use a stubbed version of ServerNoticesSender, so | ||||
|         # we check for notices to send to the user in on_user_ip as well as | ||||
|         # in on_user_syncing | ||||
|         return self._consent_server_notices.maybe_send_server_notice_to_user( | ||||
|             user_id, | ||||
|         ) | ||||
|  | @ -0,0 +1,46 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2018 New Vector Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| 
 | ||||
| class WorkerServerNoticesSender(object): | ||||
|     """Stub impl of ServerNoticesSender which does nothing""" | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): | ||||
|         """ | ||||
| 
 | ||||
|     def on_user_syncing(self, user_id): | ||||
|         """Called when the user performs a sync operation. | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): mxid of user who synced | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred | ||||
|         """ | ||||
|         return defer.succeed() | ||||
| 
 | ||||
|     def on_user_ip(self, user_id): | ||||
|         """Called on the master when a worker process saw a client request. | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): mxid | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred | ||||
|         """ | ||||
|         raise AssertionError("on_user_ip unexpectedly called on worker") | ||||
|  | @ -14,6 +14,11 @@ | |||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| import datetime | ||||
| from dateutil import tz | ||||
| import time | ||||
| import logging | ||||
| 
 | ||||
| from synapse.storage.devices import DeviceStore | ||||
| from .appservice import ( | ||||
|     ApplicationServiceStore, ApplicationServiceTransactionStore | ||||
|  | @ -55,10 +60,6 @@ from .engines import PostgresEngine | |||
| from synapse.api.constants import PresenceState | ||||
| from synapse.util.caches.stream_change_cache import StreamChangeCache | ||||
| 
 | ||||
| 
 | ||||
| import logging | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -213,6 +214,9 @@ class DataStore(RoomMemberStore, RoomStore, | |||
|         self._stream_order_on_start = self.get_room_max_stream_ordering() | ||||
|         self._min_stream_order_on_start = self.get_room_min_stream_ordering() | ||||
| 
 | ||||
|         # Used in _generate_user_daily_visits to keep track of progress | ||||
|         self._last_user_visit_update = self._get_start_of_day() | ||||
| 
 | ||||
|         super(DataStore, self).__init__(db_conn, hs) | ||||
| 
 | ||||
|     def take_presence_startup_info(self): | ||||
|  | @ -347,6 +351,69 @@ class DataStore(RoomMemberStore, RoomStore, | |||
| 
 | ||||
|         return self.runInteraction("count_r30_users", _count_r30_users) | ||||
| 
 | ||||
|     def _get_start_of_day(self): | ||||
|         """ | ||||
|         Returns millisecond unixtime for start of UTC day. | ||||
|         """ | ||||
|         now = datetime.datetime.utcnow() | ||||
|         today_start = datetime.datetime(now.year, now.month, | ||||
|                                         now.day, tzinfo=tz.tzutc()) | ||||
|         return int(time.mktime(today_start.timetuple())) * 1000 | ||||
| 
 | ||||
|     def generate_user_daily_visits(self): | ||||
|         """ | ||||
|         Generates daily visit data for use in cohort/ retention analysis | ||||
|         """ | ||||
|         def _generate_user_daily_visits(txn): | ||||
|             logger.info("Calling _generate_user_daily_visits") | ||||
|             today_start = self._get_start_of_day() | ||||
|             a_day_in_milliseconds = 24 * 60 * 60 * 1000 | ||||
|             now = self.clock.time_msec() | ||||
| 
 | ||||
|             sql = """ | ||||
|                 INSERT INTO user_daily_visits (user_id, device_id, timestamp) | ||||
|                     SELECT u.user_id, u.device_id, ? | ||||
|                     FROM user_ips AS u | ||||
|                     LEFT JOIN ( | ||||
|                       SELECT user_id, device_id, timestamp FROM user_daily_visits | ||||
|                       WHERE timestamp = ? | ||||
|                     ) udv | ||||
|                     ON u.user_id = udv.user_id AND u.device_id=udv.device_id | ||||
|                     INNER JOIN users ON users.name=u.user_id | ||||
|                     WHERE last_seen > ? AND last_seen <= ? | ||||
|                     AND udv.timestamp IS NULL AND users.is_guest=0 | ||||
|                     AND users.appservice_id IS NULL | ||||
|                     GROUP BY u.user_id, u.device_id | ||||
|             """ | ||||
| 
 | ||||
|             # This means that the day has rolled over but there could still | ||||
|             # be entries from the previous day. There is an edge case | ||||
|             # where if the user logs in at 23:59 and overwrites their | ||||
|             # last_seen at 00:01 then they will not be counted in the | ||||
|             # previous day's stats - it is important that the query is run | ||||
|             # often to minimise this case. | ||||
|             if today_start > self._last_user_visit_update: | ||||
|                 yesterday_start = today_start - a_day_in_milliseconds | ||||
|                 txn.execute(sql, ( | ||||
|                     yesterday_start, yesterday_start, | ||||
|                     self._last_user_visit_update, today_start | ||||
|                 )) | ||||
|                 self._last_user_visit_update = today_start | ||||
| 
 | ||||
|             txn.execute(sql, ( | ||||
|                 today_start, today_start, | ||||
|                 self._last_user_visit_update, | ||||
|                 now | ||||
|             )) | ||||
|             # Update _last_user_visit_update to now. The reason to do this | ||||
|             # rather just clamping to the beginning of the day is to limit | ||||
|             # the size of the join - meaning that the query can be run more | ||||
|             # frequently | ||||
|             self._last_user_visit_update = now | ||||
| 
 | ||||
|         return self.runInteraction("generate_user_daily_visits", | ||||
|                                    _generate_user_daily_visits) | ||||
| 
 | ||||
|     def get_users(self): | ||||
|         """Function to reterive a list of users in users table. | ||||
| 
 | ||||
|  |  | |||
|  | @ -55,6 +55,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): | |||
|             columns=["user_id", "last_seen"], | ||||
|         ) | ||||
| 
 | ||||
|         self.register_background_index_update( | ||||
|             "user_ips_last_seen_only_index", | ||||
|             index_name="user_ips_last_seen_only", | ||||
|             table="user_ips", | ||||
|             columns=["last_seen"], | ||||
|         ) | ||||
| 
 | ||||
|         # (user_id, access_token, ip) -> (user_agent, device_id, last_seen) | ||||
|         self._batch_row_update = {} | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,8 +18,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction | |||
| from twisted.internet import defer | ||||
| from synapse.util.async import sleep | ||||
| from synapse.util.caches.descriptors import cachedInlineCallbacks | ||||
| from synapse.types import RoomStreamToken | ||||
| from .stream import lower_bound | ||||
| 
 | ||||
| import logging | ||||
| import simplejson as json | ||||
|  | @ -99,7 +97,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|     def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, | ||||
|                                           last_read_event_id): | ||||
|         sql = ( | ||||
|             "SELECT stream_ordering, topological_ordering" | ||||
|             "SELECT stream_ordering" | ||||
|             " FROM events" | ||||
|             " WHERE room_id = ? AND event_id = ?" | ||||
|         ) | ||||
|  | @ -111,17 +109,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|             return {"notify_count": 0, "highlight_count": 0} | ||||
| 
 | ||||
|         stream_ordering = results[0][0] | ||||
|         topological_ordering = results[0][1] | ||||
| 
 | ||||
|         return self._get_unread_counts_by_pos_txn( | ||||
|             txn, room_id, user_id, topological_ordering, stream_ordering | ||||
|             txn, room_id, user_id, stream_ordering | ||||
|         ) | ||||
| 
 | ||||
|     def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering, | ||||
|                                       stream_ordering): | ||||
|         token = RoomStreamToken( | ||||
|             topological_ordering, stream_ordering | ||||
|         ) | ||||
|     def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): | ||||
| 
 | ||||
|         # First get number of notifications. | ||||
|         # We don't need to put a notif=1 clause as all rows always have | ||||
|  | @ -132,10 +125,10 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|             " WHERE" | ||||
|             " user_id = ?" | ||||
|             " AND room_id = ?" | ||||
|             " AND %s" | ||||
|         ) % (lower_bound(token, self.database_engine, inclusive=False),) | ||||
|             " AND stream_ordering > ?" | ||||
|         ) | ||||
| 
 | ||||
|         txn.execute(sql, (user_id, room_id)) | ||||
|         txn.execute(sql, (user_id, room_id, stream_ordering)) | ||||
|         row = txn.fetchone() | ||||
|         notify_count = row[0] if row else 0 | ||||
| 
 | ||||
|  | @ -155,10 +148,10 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|             " highlight = 1" | ||||
|             " AND user_id = ?" | ||||
|             " AND room_id = ?" | ||||
|             " AND %s" | ||||
|         ) % (lower_bound(token, self.database_engine, inclusive=False),) | ||||
|             " AND stream_ordering > ?" | ||||
|         ) | ||||
| 
 | ||||
|         txn.execute(sql, (user_id, room_id)) | ||||
|         txn.execute(sql, (user_id, room_id, stream_ordering)) | ||||
|         row = txn.fetchone() | ||||
|         highlight_count = row[0] if row else 0 | ||||
| 
 | ||||
|  | @ -209,7 +202,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|                 "   ep.highlight " | ||||
|                 " FROM (" | ||||
|                 "   SELECT room_id," | ||||
|                 "       MAX(topological_ordering) as topological_ordering," | ||||
|                 "       MAX(stream_ordering) as stream_ordering" | ||||
|                 "   FROM events" | ||||
|                 "   INNER JOIN receipts_linearized USING (room_id, event_id)" | ||||
|  | @ -219,13 +211,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|                 " event_push_actions AS ep" | ||||
|                 " WHERE" | ||||
|                 "   ep.room_id = rl.room_id" | ||||
|                 "   AND (" | ||||
|                 "       ep.topological_ordering > rl.topological_ordering" | ||||
|                 "       OR (" | ||||
|                 "           ep.topological_ordering = rl.topological_ordering" | ||||
|                 "           AND ep.stream_ordering > rl.stream_ordering" | ||||
|                 "       )" | ||||
|                 "   )" | ||||
|                 "   AND ep.stream_ordering > rl.stream_ordering" | ||||
|                 "   AND ep.user_id = ?" | ||||
|                 "   AND ep.stream_ordering > ?" | ||||
|                 "   AND ep.stream_ordering <= ?" | ||||
|  | @ -318,7 +304,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|                 "  ep.highlight, e.received_ts" | ||||
|                 " FROM (" | ||||
|                 "   SELECT room_id," | ||||
|                 "       MAX(topological_ordering) as topological_ordering," | ||||
|                 "       MAX(stream_ordering) as stream_ordering" | ||||
|                 "   FROM events" | ||||
|                 "   INNER JOIN receipts_linearized USING (room_id, event_id)" | ||||
|  | @ -329,13 +314,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): | |||
|                 " INNER JOIN events AS e USING (room_id, event_id)" | ||||
|                 " WHERE" | ||||
|                 "   ep.room_id = rl.room_id" | ||||
|                 "   AND (" | ||||
|                 "       ep.topological_ordering > rl.topological_ordering" | ||||
|                 "       OR (" | ||||
|                 "           ep.topological_ordering = rl.topological_ordering" | ||||
|                 "           AND ep.stream_ordering > rl.stream_ordering" | ||||
|                 "       )" | ||||
|                 "   )" | ||||
|                 "   AND ep.stream_ordering > rl.stream_ordering" | ||||
|                 "   AND ep.user_id = ?" | ||||
|                 "   AND ep.stream_ordering > ?" | ||||
|                 "   AND ep.stream_ordering <= ?" | ||||
|  | @ -762,10 +741,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore): | |||
|         ) | ||||
| 
 | ||||
|     def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, | ||||
|                                             topological_ordering, stream_ordering): | ||||
|                                             stream_ordering): | ||||
|         """ | ||||
|         Purges old push actions for a user and room before a given | ||||
|         topological_ordering. | ||||
|         stream_ordering. | ||||
| 
 | ||||
|         We however keep a months worth of highlighted notifications, so that | ||||
|         users can still get a list of recent highlights. | ||||
|  | @ -774,7 +753,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): | |||
|             txn: The transcation | ||||
|             room_id: Room ID to delete from | ||||
|             user_id: user ID to delete for | ||||
|             topological_ordering: The lowest topological ordering which will | ||||
|             stream_ordering: The lowest stream ordering which will | ||||
|                                   not be deleted. | ||||
|         """ | ||||
|         txn.call_after( | ||||
|  | @ -793,9 +772,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore): | |||
|         txn.execute( | ||||
|             "DELETE FROM event_push_actions " | ||||
|             " WHERE user_id = ? AND room_id = ? AND " | ||||
|             " topological_ordering <= ?" | ||||
|             " stream_ordering <= ?" | ||||
|             " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", | ||||
|             (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) | ||||
|             (user_id, room_id, stream_ordering, self.stream_ordering_month_ago) | ||||
|         ) | ||||
| 
 | ||||
|         txn.execute(""" | ||||
|  |  | |||
|  | @ -33,7 +33,7 @@ from synapse.util.metrics import Measure | |||
| from synapse.api.constants import EventTypes | ||||
| from synapse.api.errors import SynapseError | ||||
| from synapse.util.caches.descriptors import cached, cachedInlineCallbacks | ||||
| from synapse.types import get_domain_from_id | ||||
| from synapse.types import get_domain_from_id, RoomStreamToken | ||||
| import synapse.metrics | ||||
| 
 | ||||
| # these are only included to make the type annotations work | ||||
|  | @ -1797,15 +1797,14 @@ class EventsStore(EventsWorkerStore): | |||
|         return self.runInteraction("get_all_new_events", get_all_new_events_txn) | ||||
| 
 | ||||
|     def purge_history( | ||||
|         self, room_id, topological_ordering, delete_local_events, | ||||
|         self, room_id, token, delete_local_events, | ||||
|     ): | ||||
|         """Deletes room history before a certain point | ||||
| 
 | ||||
|         Args: | ||||
|             room_id (str): | ||||
| 
 | ||||
|             topological_ordering (int): | ||||
|                 minimum topo ordering to preserve | ||||
|             token (str): A topological token to delete events before | ||||
| 
 | ||||
|             delete_local_events (bool): | ||||
|                 if True, we will delete local events as well as remote ones | ||||
|  | @ -1815,13 +1814,15 @@ class EventsStore(EventsWorkerStore): | |||
| 
 | ||||
|         return self.runInteraction( | ||||
|             "purge_history", | ||||
|             self._purge_history_txn, room_id, topological_ordering, | ||||
|             self._purge_history_txn, room_id, token, | ||||
|             delete_local_events, | ||||
|         ) | ||||
| 
 | ||||
|     def _purge_history_txn( | ||||
|         self, txn, room_id, topological_ordering, delete_local_events, | ||||
|         self, txn, room_id, token_str, delete_local_events, | ||||
|     ): | ||||
|         token = RoomStreamToken.parse(token_str) | ||||
| 
 | ||||
|         # Tables that should be pruned: | ||||
|         #     event_auth | ||||
|         #     event_backward_extremities | ||||
|  | @ -1866,6 +1867,13 @@ class EventsStore(EventsWorkerStore): | |||
|             " ON events_to_purge(should_delete)", | ||||
|         ) | ||||
| 
 | ||||
|         # We do joins against events_to_purge for e.g. calculating state | ||||
|         # groups to purge, etc., so lets make an index. | ||||
|         txn.execute( | ||||
|             "CREATE INDEX events_to_purge_id" | ||||
|             " ON events_to_purge(event_id)", | ||||
|         ) | ||||
| 
 | ||||
|         # First ensure that we're not about to delete all the forward extremeties | ||||
|         txn.execute( | ||||
|             "SELECT e.event_id, e.depth FROM events as e " | ||||
|  | @ -1878,7 +1886,7 @@ class EventsStore(EventsWorkerStore): | |||
|         rows = txn.fetchall() | ||||
|         max_depth = max(row[0] for row in rows) | ||||
| 
 | ||||
|         if max_depth <= topological_ordering: | ||||
|         if max_depth <= token.topological: | ||||
|             # We need to ensure we don't delete all the events from the datanase | ||||
|             # otherwise we wouldn't be able to send any events (due to not | ||||
|             # having any backwards extremeties) | ||||
|  | @ -1894,7 +1902,7 @@ class EventsStore(EventsWorkerStore): | |||
|             should_delete_expr += " AND event_id NOT LIKE ?" | ||||
|             should_delete_params += ("%:" + self.hs.hostname, ) | ||||
| 
 | ||||
|         should_delete_params += (room_id, topological_ordering) | ||||
|         should_delete_params += (room_id, token.topological) | ||||
| 
 | ||||
|         txn.execute( | ||||
|             "INSERT INTO events_to_purge" | ||||
|  | @ -1917,13 +1925,13 @@ class EventsStore(EventsWorkerStore): | |||
|         logger.info("[purge] Finding new backward extremities") | ||||
| 
 | ||||
|         # We calculate the new entries for the backward extremeties by finding | ||||
|         # all events that point to events that are to be purged | ||||
|         # events to be purged that are pointed to by events we're not going to | ||||
|         # purge. | ||||
|         txn.execute( | ||||
|             "SELECT DISTINCT e.event_id FROM events_to_purge AS e" | ||||
|             " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" | ||||
|             " INNER JOIN events AS e2 ON e2.event_id = ed.event_id" | ||||
|             " WHERE e2.topological_ordering >= ?", | ||||
|             (topological_ordering, ) | ||||
|             " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id" | ||||
|             " WHERE ep2.event_id IS NULL", | ||||
|         ) | ||||
|         new_backwards_extrems = txn.fetchall() | ||||
| 
 | ||||
|  | @ -1947,16 +1955,22 @@ class EventsStore(EventsWorkerStore): | |||
| 
 | ||||
|         # Get all state groups that are only referenced by events that are | ||||
|         # to be deleted. | ||||
|         txn.execute( | ||||
|             "SELECT state_group FROM event_to_state_groups" | ||||
|             " INNER JOIN events USING (event_id)" | ||||
|             " WHERE state_group IN (" | ||||
|             "   SELECT DISTINCT state_group FROM events_to_purge" | ||||
|             "   INNER JOIN event_to_state_groups USING (event_id)" | ||||
|             " )" | ||||
|             " GROUP BY state_group HAVING MAX(topological_ordering) < ?", | ||||
|             (topological_ordering, ) | ||||
|         ) | ||||
|         # This works by first getting state groups that we may want to delete, | ||||
|         # joining against event_to_state_groups to get events that use that | ||||
|         # state group, then left joining against events_to_purge again. Any | ||||
|         # state group where the left join produce *no nulls* are referenced | ||||
|         # only by events that are going to be purged. | ||||
|         txn.execute(""" | ||||
|             SELECT state_group FROM | ||||
|             ( | ||||
|                 SELECT DISTINCT state_group FROM events_to_purge | ||||
|                 INNER JOIN event_to_state_groups USING (event_id) | ||||
|             ) AS sp | ||||
|             INNER JOIN event_to_state_groups USING (state_group) | ||||
|             LEFT JOIN events_to_purge AS ep USING (event_id) | ||||
|             GROUP BY state_group | ||||
|             HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0 | ||||
|         """) | ||||
| 
 | ||||
|         state_rows = txn.fetchall() | ||||
|         logger.info("[purge] found %i redundant state groups", len(state_rows)) | ||||
|  | @ -2103,10 +2117,25 @@ class EventsStore(EventsWorkerStore): | |||
|         # | ||||
|         # So, let's stick it at the end so that we don't block event | ||||
|         # persistence. | ||||
|         logger.info("[purge] updating room_depth") | ||||
|         # | ||||
|         # We do this by calculating the minimum depth of the backwards | ||||
|         # extremities. However, the events in event_backward_extremities | ||||
|         # are ones we don't have yet so we need to look at the events that | ||||
|         # point to it via event_edges table. | ||||
|         txn.execute(""" | ||||
|             SELECT COALESCE(MIN(depth), 0) | ||||
|             FROM event_backward_extremities AS eb | ||||
|             INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id | ||||
|             INNER JOIN events AS e ON e.event_id = eg.event_id | ||||
|             WHERE eb.room_id = ? | ||||
|         """, (room_id,)) | ||||
|         min_depth, = txn.fetchone() | ||||
| 
 | ||||
|         logger.info("[purge] updating room_depth to %d", min_depth) | ||||
| 
 | ||||
|         txn.execute( | ||||
|             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", | ||||
|             (topological_ordering, room_id,) | ||||
|             (min_depth, room_id,) | ||||
|         ) | ||||
| 
 | ||||
|         # finally, drop the temp table. this will commit the txn in sqlite, | ||||
|  |  | |||
|  | @ -26,7 +26,7 @@ logger = logging.getLogger(__name__) | |||
| 
 | ||||
| # Remember to update this number every time a change is made to database | ||||
| # schema files, so the users will be informed on server restarts. | ||||
| SCHEMA_VERSION = 48 | ||||
| SCHEMA_VERSION = 49 | ||||
| 
 | ||||
| dir_path = os.path.abspath(os.path.dirname(__file__)) | ||||
| 
 | ||||
|  |  | |||
|  | @ -297,18 +297,22 @@ class ReceiptsWorkerStore(SQLBaseStore): | |||
|         if receipt_type != "m.read": | ||||
|             return | ||||
| 
 | ||||
|         # Returns an ObservableDeferred | ||||
|         # Returns either an ObservableDeferred or the raw result | ||||
|         res = self.get_users_with_read_receipts_in_room.cache.get( | ||||
|             room_id, None, update_metrics=False, | ||||
|         ) | ||||
| 
 | ||||
|         if res: | ||||
|             if isinstance(res, defer.Deferred) and res.called: | ||||
|         # first handle the Deferred case | ||||
|         if isinstance(res, defer.Deferred): | ||||
|             if res.called: | ||||
|                 res = res.result | ||||
|             if user_id in res: | ||||
|                 # We'd only be adding to the set, so no point invalidating if the | ||||
|                 # user is already there | ||||
|                 return | ||||
|             else: | ||||
|                 res = None | ||||
| 
 | ||||
|         if res and user_id in res: | ||||
|             # We'd only be adding to the set, so no point invalidating if the | ||||
|             # user is already there | ||||
|             return | ||||
| 
 | ||||
|         self.get_users_with_read_receipts_in_room.invalidate((room_id,)) | ||||
| 
 | ||||
|  | @ -407,7 +411,6 @@ class ReceiptsStore(ReceiptsWorkerStore): | |||
|                 txn, | ||||
|                 room_id=room_id, | ||||
|                 user_id=user_id, | ||||
|                 topological_ordering=topological_ordering, | ||||
|                 stream_ordering=stream_ordering, | ||||
|             ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -33,7 +33,10 @@ class RegistrationWorkerStore(SQLBaseStore): | |||
|             keyvalues={ | ||||
|                 "name": user_id, | ||||
|             }, | ||||
|             retcols=["name", "password_hash", "is_guest"], | ||||
|             retcols=[ | ||||
|                 "name", "password_hash", "is_guest", | ||||
|                 "consent_version", "consent_server_notice_sent", | ||||
|             ], | ||||
|             allow_none=True, | ||||
|             desc="get_user_by_id", | ||||
|         ) | ||||
|  | @ -286,6 +289,53 @@ class RegistrationStore(RegistrationWorkerStore, | |||
|             "user_set_password_hash", user_set_password_hash_txn | ||||
|         ) | ||||
| 
 | ||||
|     def user_set_consent_version(self, user_id, consent_version): | ||||
|         """Updates the user table to record privacy policy consent | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): full mxid of the user to update | ||||
|             consent_version (str): version of the policy the user has consented | ||||
|                 to | ||||
| 
 | ||||
|         Raises: | ||||
|             StoreError(404) if user not found | ||||
|         """ | ||||
|         def f(txn): | ||||
|             self._simple_update_one_txn( | ||||
|                 txn, | ||||
|                 table='users', | ||||
|                 keyvalues={'name': user_id, }, | ||||
|                 updatevalues={'consent_version': consent_version, }, | ||||
|             ) | ||||
|             self._invalidate_cache_and_stream( | ||||
|                 txn, self.get_user_by_id, (user_id,) | ||||
|             ) | ||||
|         return self.runInteraction("user_set_consent_version", f) | ||||
| 
 | ||||
|     def user_set_consent_server_notice_sent(self, user_id, consent_version): | ||||
|         """Updates the user table to record that we have sent the user a server | ||||
|         notice about privacy policy consent | ||||
| 
 | ||||
|         Args: | ||||
|             user_id (str): full mxid of the user to update | ||||
|             consent_version (str): version of the policy we have notified the | ||||
|                 user about | ||||
| 
 | ||||
|         Raises: | ||||
|             StoreError(404) if user not found | ||||
|         """ | ||||
|         def f(txn): | ||||
|             self._simple_update_one_txn( | ||||
|                 txn, | ||||
|                 table='users', | ||||
|                 keyvalues={'name': user_id, }, | ||||
|                 updatevalues={'consent_server_notice_sent': consent_version, }, | ||||
|             ) | ||||
|             self._invalidate_cache_and_stream( | ||||
|                 txn, self.get_user_by_id, (user_id,) | ||||
|             ) | ||||
|         return self.runInteraction("user_set_consent_server_notice_sent", f) | ||||
| 
 | ||||
|     def user_delete_access_tokens(self, user_id, except_token_id=None, | ||||
|                                   device_id=None): | ||||
|         """ | ||||
|  |  | |||
|  | @ -0,0 +1,18 @@ | |||
| /* Copyright 2018 New Vector Ltd | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| /* record the version of the privacy policy the user has consented to | ||||
|  */ | ||||
| ALTER TABLE users ADD COLUMN consent_version TEXT; | ||||
|  | @ -0,0 +1,20 @@ | |||
| /* Copyright 2018 New Vector Ltd | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| /* record whether we have sent a server notice about consenting to the | ||||
|  * privacy policy. Specifically records the version of the policy we sent | ||||
|  * a message about. | ||||
|  */ | ||||
| ALTER TABLE users ADD COLUMN consent_server_notice_sent TEXT; | ||||
|  | @ -0,0 +1,21 @@ | |||
| /* Copyright 2018 New Vector Ltd | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| 
 | ||||
| CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, | ||||
|                                  device_id TEXT, | ||||
|                                  timestamp BIGINT NOT NULL ); | ||||
| CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp); | ||||
| CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp); | ||||
|  | @ -0,0 +1,17 @@ | |||
| /* Copyright 2018 New Vector Ltd | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| INSERT into background_updates (update_name, progress_json) | ||||
|     VALUES ('user_ips_last_seen_only_index', '{}'); | ||||
|  | @ -60,7 +60,7 @@ class LoggingContext(object): | |||
|     __slots__ = [ | ||||
|         "previous_context", "name", "ru_stime", "ru_utime", | ||||
|         "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", | ||||
|         "usage_start", "usage_end", | ||||
|         "usage_start", | ||||
|         "main_thread", "alive", | ||||
|         "request", "tag", | ||||
|     ] | ||||
|  | @ -109,8 +109,10 @@ class LoggingContext(object): | |||
|         # ms spent waiting for db txns to be scheduled | ||||
|         self.db_sched_duration_ms = 0 | ||||
| 
 | ||||
|         # If alive has the thread resource usage when the logcontext last | ||||
|         # became active. | ||||
|         self.usage_start = None | ||||
|         self.usage_end = None | ||||
| 
 | ||||
|         self.main_thread = threading.current_thread() | ||||
|         self.request = None | ||||
|         self.tag = "" | ||||
|  | @ -159,7 +161,7 @@ class LoggingContext(object): | |||
|         """Restore the logging context in thread local storage to the state it | ||||
|         was before this context was entered. | ||||
|         Returns: | ||||
|             None to avoid suppressing any exeptions that were thrown. | ||||
|             None to avoid suppressing any exceptions that were thrown. | ||||
|         """ | ||||
|         current = self.set_current_context(self.previous_context) | ||||
|         if current is not self: | ||||
|  | @ -185,29 +187,43 @@ class LoggingContext(object): | |||
| 
 | ||||
|     def start(self): | ||||
|         if threading.current_thread() is not self.main_thread: | ||||
|             logger.warning("Started logcontext %s on different thread", self) | ||||
|             return | ||||
| 
 | ||||
|         if self.usage_start and self.usage_end: | ||||
|             self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime | ||||
|             self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime | ||||
|             self.usage_start = None | ||||
|             self.usage_end = None | ||||
| 
 | ||||
|         # If we haven't already started record the thread resource usage so | ||||
|         # far | ||||
|         if not self.usage_start: | ||||
|             self.usage_start = get_thread_resource_usage() | ||||
| 
 | ||||
|     def stop(self): | ||||
|         if threading.current_thread() is not self.main_thread: | ||||
|             logger.warning("Stopped logcontext %s on different thread", self) | ||||
|             return | ||||
| 
 | ||||
|         # When we stop, let's record the resource used since we started | ||||
|         if self.usage_start: | ||||
|             self.usage_end = get_thread_resource_usage() | ||||
|             usage_end = get_thread_resource_usage() | ||||
| 
 | ||||
|             self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime | ||||
|             self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime | ||||
| 
 | ||||
|             self.usage_start = None | ||||
|         else: | ||||
|             logger.warning("Called stop on logcontext %s without calling start", self) | ||||
| 
 | ||||
|     def get_resource_usage(self): | ||||
|         """Get CPU time used by this logcontext so far. | ||||
| 
 | ||||
|         Returns: | ||||
|             tuple[float, float]: The user and system CPU usage in seconds | ||||
|         """ | ||||
|         ru_utime = self.ru_utime | ||||
|         ru_stime = self.ru_stime | ||||
| 
 | ||||
|         if self.usage_start and threading.current_thread() is self.main_thread: | ||||
|         # If we are on the correct thread and we're currently running then we | ||||
|         # can include resource usage so far. | ||||
|         is_main_thread = threading.current_thread() is self.main_thread | ||||
|         if self.alive and self.usage_start and is_main_thread: | ||||
|             current = get_thread_resource_usage() | ||||
|             ru_utime += current.ru_utime - self.usage_start.ru_utime | ||||
|             ru_stime += current.ru_stime - self.usage_start.ru_stime | ||||
|  |  | |||
|  | @ -2,6 +2,9 @@ from synapse.rest.client.transactions import HttpTransactionCache | |||
| from synapse.rest.client.transactions import CLEANUP_PERIOD_MS | ||||
| from twisted.internet import defer | ||||
| from mock import Mock, call | ||||
| 
 | ||||
| from synapse.util import async | ||||
| from synapse.util.logcontext import LoggingContext | ||||
| from tests import unittest | ||||
| from tests.utils import MockClock | ||||
| 
 | ||||
|  | @ -39,6 +42,78 @@ class HttpTransactionCacheTestCase(unittest.TestCase): | |||
|         # expect only a single call to do the work | ||||
|         cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def test_logcontexts_with_async_result(self): | ||||
|         @defer.inlineCallbacks | ||||
|         def cb(): | ||||
|             yield async.sleep(0) | ||||
|             defer.returnValue("yay") | ||||
| 
 | ||||
|         @defer.inlineCallbacks | ||||
|         def test(): | ||||
|             with LoggingContext("c") as c1: | ||||
|                 res = yield self.cache.fetch_or_execute(self.mock_key, cb) | ||||
|                 self.assertIs(LoggingContext.current_context(), c1) | ||||
|                 self.assertEqual(res, "yay") | ||||
| 
 | ||||
|         # run the test twice in parallel | ||||
|         d = defer.gatherResults([test(), test()]) | ||||
|         self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel) | ||||
|         yield d | ||||
|         self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def test_does_not_cache_exceptions(self): | ||||
|         """Checks that, if the callback throws an exception, it is called again | ||||
|         for the next request. | ||||
|         """ | ||||
|         called = [False] | ||||
| 
 | ||||
|         def cb(): | ||||
|             if called[0]: | ||||
|                 # return a valid result the second time | ||||
|                 return defer.succeed(self.mock_http_response) | ||||
| 
 | ||||
|             called[0] = True | ||||
|             raise Exception("boo") | ||||
| 
 | ||||
|         with LoggingContext("test") as test_context: | ||||
|             try: | ||||
|                 yield self.cache.fetch_or_execute(self.mock_key, cb) | ||||
|             except Exception as e: | ||||
|                 self.assertEqual(e.message, "boo") | ||||
|             self.assertIs(LoggingContext.current_context(), test_context) | ||||
| 
 | ||||
|             res = yield self.cache.fetch_or_execute(self.mock_key, cb) | ||||
|             self.assertEqual(res, self.mock_http_response) | ||||
|             self.assertIs(LoggingContext.current_context(), test_context) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def test_does_not_cache_failures(self): | ||||
|         """Checks that, if the callback returns a failure, it is called again | ||||
|         for the next request. | ||||
|         """ | ||||
|         called = [False] | ||||
| 
 | ||||
|         def cb(): | ||||
|             if called[0]: | ||||
|                 # return a valid result the second time | ||||
|                 return defer.succeed(self.mock_http_response) | ||||
| 
 | ||||
|             called[0] = True | ||||
|             return defer.fail(Exception("boo")) | ||||
| 
 | ||||
|         with LoggingContext("test") as test_context: | ||||
|             try: | ||||
|                 yield self.cache.fetch_or_execute(self.mock_key, cb) | ||||
|             except Exception as e: | ||||
|                 self.assertEqual(e.message, "boo") | ||||
|             self.assertIs(LoggingContext.current_context(), test_context) | ||||
| 
 | ||||
|             res = yield self.cache.fetch_or_execute(self.mock_key, cb) | ||||
|             self.assertEqual(res, self.mock_http_response) | ||||
|             self.assertIs(LoggingContext.current_context(), test_context) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def test_cleans_up(self): | ||||
|         cb = Mock( | ||||
|  |  | |||
|  | @ -55,7 +55,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): | |||
|         def _assert_counts(noitf_count, highlight_count): | ||||
|             counts = yield self.store.runInteraction( | ||||
|                 "", self.store._get_unread_counts_by_pos_txn, | ||||
|                 room_id, user_id, 0, 0 | ||||
|                 room_id, user_id, 0 | ||||
|             ) | ||||
|             self.assertEquals( | ||||
|                 counts, | ||||
|  | @ -86,7 +86,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): | |||
|         def _mark_read(stream, depth): | ||||
|             return self.store.runInteraction( | ||||
|                 "", self.store._remove_old_push_actions_before_txn, | ||||
|                 room_id, user_id, depth, stream | ||||
|                 room_id, user_id, stream | ||||
|             ) | ||||
| 
 | ||||
|         yield _assert_counts(0, 0) | ||||
|  |  | |||
|  | @ -42,9 +42,14 @@ class RegistrationStoreTestCase(unittest.TestCase): | |||
|         yield self.store.register(self.user_id, self.tokens[0], self.pwhash) | ||||
| 
 | ||||
|         self.assertEquals( | ||||
|             # TODO(paul): Surely this field should be 'user_id', not 'name' | ||||
|             #  Additionally surely it shouldn't come in a 1-element list | ||||
|             {"name": self.user_id, "password_hash": self.pwhash, "is_guest": 0}, | ||||
|             { | ||||
|                 # TODO(paul): Surely this field should be 'user_id', not 'name' | ||||
|                 "name": self.user_id, | ||||
|                 "password_hash": self.pwhash, | ||||
|                 "is_guest": 0, | ||||
|                 "consent_version": None, | ||||
|                 "consent_server_notice_sent": None, | ||||
|             }, | ||||
|             (yield self.store.get_user_by_id(self.user_id)) | ||||
|         ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -63,6 +63,8 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): | |||
|         config.federation_rc_concurrent = 10 | ||||
|         config.filter_timeline_limit = 5000 | ||||
|         config.user_directory_search_all_users = False | ||||
|         config.user_consent_server_notice_content = None | ||||
|         config.block_events_without_consent_error = None | ||||
| 
 | ||||
|         # disable user directory updates, because they get done in the | ||||
|         # background, which upsets the test runner. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Amber Brown
						Amber Brown