Merge branch 'develop' of github.com:matrix-org/synapse into erikj/reliable_lookups
						commit
						1e4b4d85e7
					
				
							
								
								
									
										15
									
								
								CHANGES.md
								
								
								
								
							
							
						
						
									
										15
									
								
								CHANGES.md
								
								
								
								
							|  | @ -1,3 +1,18 @@ | |||
| Synapse 1.3.1 (2019-08-17) | ||||
| ========================== | ||||
| 
 | ||||
| Features | ||||
| -------- | ||||
| 
 | ||||
| - Drop hard dependency on `sdnotify` python package. ([\#5871](https://github.com/matrix-org/synapse/issues/5871)) | ||||
| 
 | ||||
| 
 | ||||
| Bugfixes | ||||
| -------- | ||||
| 
 | ||||
| - Fix startup issue (hang on ACME provisioning) due to ordering of Twisted reactor startup. Thanks to @chrismoos for supplying the fix. ([\#5867](https://github.com/matrix-org/synapse/issues/5867)) | ||||
| 
 | ||||
| 
 | ||||
| Synapse 1.3.0 (2019-08-15) | ||||
| ========================== | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1 @@ | |||
| Update opentracing docs to use the unified `trace` method. | ||||
|  | @ -0,0 +1 @@ | |||
| Add an admin API to purge old rooms from the database. | ||||
|  | @ -0,0 +1 @@ | |||
| Add retry to well-known lookups if we have recently seen a valid well-known record for the server. | ||||
|  | @ -0,0 +1 @@ | |||
| Opentracing for room and e2e keys. | ||||
|  | @ -0,0 +1 @@ | |||
| Add a tag recording a request's authenticated entity and corresponding servlet in opentracing. | ||||
|  | @ -0,0 +1 @@ | |||
| Fix database index so that different backup versions can have the same sessions. | ||||
|  | @ -0,0 +1 @@ | |||
| Remove log line for debugging issue #5407. | ||||
|  | @ -0,0 +1 @@ | |||
| Fix Synapse looking for config options `password_reset_failure_template` and `password_reset_success_template`, when they are actually `password_reset_template_failure_html`, `password_reset_template_success_html`. | ||||
|  | @ -0,0 +1 @@ | |||
| Fix stack overflow when recovering an appservice which had an outage. | ||||
|  | @ -0,0 +1 @@ | |||
| Refactor the Appservice scheduler code. | ||||
|  | @ -0,0 +1 @@ | |||
| Drop some unused tables. | ||||
|  | @ -0,0 +1 @@ | |||
| Add missing index on users_in_public_rooms to improve the performance of directory queries. | ||||
|  | @ -0,0 +1 @@ | |||
| Improve the logging when we have an error when fetching signing keys. | ||||
|  | @ -1,8 +1,19 @@ | |||
| matrix-synapse-py3 (1.3.1) stable; urgency=medium | ||||
| 
 | ||||
|   * New synapse release 1.3.1. | ||||
| 
 | ||||
|  -- Synapse Packaging team <packages@matrix.org>  Sat, 17 Aug 2019 09:15:49 +0100 | ||||
| 
 | ||||
| matrix-synapse-py3 (1.3.0) stable; urgency=medium | ||||
| 
 | ||||
|   [ Andrew Morgan ] | ||||
|   * Remove libsqlite3-dev from required build dependencies. | ||||
| 
 | ||||
|   [ Synapse Packaging team ] | ||||
|   * New synapse release 1.3.0. | ||||
| 
 | ||||
|  -- Synapse Packaging team <packages@matrix.org>  Thu, 15 Aug 2019 12:04:23 +0100 | ||||
| 
 | ||||
| matrix-synapse-py3 (1.2.0) stable; urgency=medium | ||||
| 
 | ||||
|   [ Amber Brown ] | ||||
|  | @ -13,9 +24,8 @@ matrix-synapse-py3 (1.2.0) stable; urgency=medium | |||
| 
 | ||||
|   [ Synapse Packaging team ] | ||||
|   * New synapse release 1.2.0. | ||||
|   * New synapse release 1.3.0. | ||||
| 
 | ||||
|  -- Synapse Packaging team <packages@matrix.org>  Thu, 15 Aug 2019 12:04:23 +0100 | ||||
|  -- Synapse Packaging team <packages@matrix.org>  Thu, 25 Jul 2019 14:10:07 +0100 | ||||
| 
 | ||||
| matrix-synapse-py3 (1.1.0) stable; urgency=medium | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,18 @@ | |||
| Purge room API | ||||
| ============== | ||||
| 
 | ||||
| This API will remove all trace of a room from your database. | ||||
| 
 | ||||
| All local users must have left the room before it can be removed. | ||||
| 
 | ||||
| The API is: | ||||
| 
 | ||||
| ``` | ||||
| POST /_synapse/admin/v1/purge_room | ||||
| 
 | ||||
| { | ||||
|     "room_id": "!room:id" | ||||
| } | ||||
| ``` | ||||
| 
 | ||||
| You must authenticate using the access token of an admin user. | ||||
|  | @ -35,4 +35,4 @@ try: | |||
| except ImportError: | ||||
|     pass | ||||
| 
 | ||||
| __version__ = "1.3.0" | ||||
| __version__ = "1.3.1" | ||||
|  |  | |||
|  | @ -22,6 +22,7 @@ from netaddr import IPAddress | |||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| import synapse.logging.opentracing as opentracing | ||||
| import synapse.types | ||||
| from synapse import event_auth | ||||
| from synapse.api.constants import EventTypes, JoinRules, Membership | ||||
|  | @ -178,6 +179,7 @@ class Auth(object): | |||
|     def get_public_keys(self, invite_event): | ||||
|         return event_auth.get_public_keys(invite_event) | ||||
| 
 | ||||
|     @opentracing.trace | ||||
|     @defer.inlineCallbacks | ||||
|     def get_user_by_req( | ||||
|         self, request, allow_guest=False, rights="access", allow_expired=False | ||||
|  | @ -209,6 +211,7 @@ class Auth(object): | |||
|             user_id, app_service = yield self._get_appservice_user_id(request) | ||||
|             if user_id: | ||||
|                 request.authenticated_entity = user_id | ||||
|                 opentracing.set_tag("authenticated_entity", user_id) | ||||
| 
 | ||||
|                 if ip_addr and self.hs.config.track_appservice_user_ips: | ||||
|                     yield self.store.insert_client_ip( | ||||
|  | @ -259,6 +262,7 @@ class Auth(object): | |||
|                 ) | ||||
| 
 | ||||
|             request.authenticated_entity = user.to_string() | ||||
|             opentracing.set_tag("authenticated_entity", user.to_string()) | ||||
| 
 | ||||
|             return synapse.types.create_requester( | ||||
|                 user, token_id, is_guest, device_id, app_service=app_service | ||||
|  |  | |||
|  | @ -17,10 +17,10 @@ import gc | |||
| import logging | ||||
| import os | ||||
| import signal | ||||
| import socket | ||||
| import sys | ||||
| import traceback | ||||
| 
 | ||||
| import sdnotify | ||||
| from daemonize import Daemonize | ||||
| 
 | ||||
| from twisted.internet import defer, error, reactor | ||||
|  | @ -246,13 +246,12 @@ def start(hs, listeners=None): | |||
|             def handle_sighup(*args, **kwargs): | ||||
|                 # Tell systemd our state, if we're using it. This will silently fail if | ||||
|                 # we're not using systemd. | ||||
|                 sd_channel = sdnotify.SystemdNotifier() | ||||
|                 sd_channel.notify("RELOADING=1") | ||||
|                 sdnotify(b"RELOADING=1") | ||||
| 
 | ||||
|                 for i in _sighup_callbacks: | ||||
|                     i(hs) | ||||
| 
 | ||||
|                 sd_channel.notify("READY=1") | ||||
|                 sdnotify(b"READY=1") | ||||
| 
 | ||||
|             signal.signal(signal.SIGHUP, handle_sighup) | ||||
| 
 | ||||
|  | @ -308,16 +307,12 @@ def setup_sdnotify(hs): | |||
| 
 | ||||
|     # Tell systemd our state, if we're using it. This will silently fail if | ||||
|     # we're not using systemd. | ||||
|     sd_channel = sdnotify.SystemdNotifier() | ||||
| 
 | ||||
|     hs.get_reactor().addSystemEventTrigger( | ||||
|         "after", | ||||
|         "startup", | ||||
|         lambda: sd_channel.notify("READY=1\nMAINPID=%s" % (os.getpid())), | ||||
|         "after", "startup", sdnotify, b"READY=1\nMAINPID=%i" % (os.getpid(),) | ||||
|     ) | ||||
| 
 | ||||
|     hs.get_reactor().addSystemEventTrigger( | ||||
|         "before", "shutdown", lambda: sd_channel.notify("STOPPING=1") | ||||
|         "before", "shutdown", sdnotify, b"STOPPING=1" | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -414,3 +409,35 @@ class _DeferredResolutionReceiver(object): | |||
|     def resolutionComplete(self): | ||||
|         self._deferred.callback(()) | ||||
|         self._receiver.resolutionComplete() | ||||
| 
 | ||||
| 
 | ||||
| sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET") | ||||
| 
 | ||||
| 
 | ||||
| def sdnotify(state): | ||||
|     """ | ||||
|     Send a notification to systemd, if the NOTIFY_SOCKET env var is set. | ||||
| 
 | ||||
|     This function is based on the sdnotify python package, but since it's only a few | ||||
|     lines of code, it's easier to duplicate it here than to add a dependency on a | ||||
|     package which many OSes don't include as a matter of principle. | ||||
| 
 | ||||
|     Args: | ||||
|         state (bytes): notification to send | ||||
|     """ | ||||
|     if not isinstance(state, bytes): | ||||
|         raise TypeError("sdnotify should be called with a bytes") | ||||
|     if not sdnotify_sockaddr: | ||||
|         return | ||||
|     addr = sdnotify_sockaddr | ||||
|     if addr[0] == "@": | ||||
|         addr = "\0" + addr[1:] | ||||
| 
 | ||||
|     try: | ||||
|         with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock: | ||||
|             sock.connect(addr) | ||||
|             sock.sendall(state) | ||||
|     except Exception as e: | ||||
|         # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET | ||||
|         # unless systemd is expecting us to notify it. | ||||
|         logger.warning("Unable to send notification to systemd: %s", e) | ||||
|  |  | |||
|  | @ -447,7 +447,7 @@ def setup(config_options): | |||
|                 reactor.stop() | ||||
|             sys.exit(1) | ||||
| 
 | ||||
|     reactor.addSystemEventTrigger("before", "startup", start) | ||||
|     reactor.callWhenRunning(start) | ||||
| 
 | ||||
|     return hs | ||||
| 
 | ||||
|  |  | |||
|  | @ -70,35 +70,37 @@ class ApplicationServiceScheduler(object): | |||
|         self.store = hs.get_datastore() | ||||
|         self.as_api = hs.get_application_service_api() | ||||
| 
 | ||||
|         def create_recoverer(service, callback): | ||||
|             return _Recoverer(self.clock, self.store, self.as_api, service, callback) | ||||
| 
 | ||||
|         self.txn_ctrl = _TransactionController( | ||||
|             self.clock, self.store, self.as_api, create_recoverer | ||||
|         ) | ||||
|         self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api) | ||||
|         self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def start(self): | ||||
|         logger.info("Starting appservice scheduler") | ||||
| 
 | ||||
|         # check for any DOWN ASes and start recoverers for them. | ||||
|         recoverers = yield _Recoverer.start( | ||||
|             self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered | ||||
|         services = yield self.store.get_appservices_by_state( | ||||
|             ApplicationServiceState.DOWN | ||||
|         ) | ||||
|         self.txn_ctrl.add_recoverers(recoverers) | ||||
| 
 | ||||
|         for service in services: | ||||
|             self.txn_ctrl.start_recoverer(service) | ||||
| 
 | ||||
|     def submit_event_for_as(self, service, event): | ||||
|         self.queuer.enqueue(service, event) | ||||
| 
 | ||||
| 
 | ||||
| class _ServiceQueuer(object): | ||||
|     """Queues events for the same application service together, sending | ||||
|     transactions as soon as possible. Once a transaction is sent successfully, | ||||
|     this schedules any other events in the queue to run. | ||||
|     """Queue of events waiting to be sent to appservices. | ||||
| 
 | ||||
|     Groups events into transactions per-appservice, and sends them on to the | ||||
|     TransactionController. Makes sure that we only have one transaction in flight per | ||||
|     appservice at a given time. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, txn_ctrl, clock): | ||||
|         self.queued_events = {}  # dict of {service_id: [events]} | ||||
| 
 | ||||
|         # the appservices which currently have a transaction in flight | ||||
|         self.requests_in_flight = set() | ||||
|         self.txn_ctrl = txn_ctrl | ||||
|         self.clock = clock | ||||
|  | @ -136,13 +138,29 @@ class _ServiceQueuer(object): | |||
| 
 | ||||
| 
 | ||||
| class _TransactionController(object): | ||||
|     def __init__(self, clock, store, as_api, recoverer_fn): | ||||
|     """Transaction manager. | ||||
| 
 | ||||
|     Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer | ||||
|     if a transaction fails. | ||||
| 
 | ||||
|     (Note we have only have one of these in the homeserver.) | ||||
| 
 | ||||
|     Args: | ||||
|         clock (synapse.util.Clock): | ||||
|         store (synapse.storage.DataStore): | ||||
|         as_api (synapse.appservice.api.ApplicationServiceApi): | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, clock, store, as_api): | ||||
|         self.clock = clock | ||||
|         self.store = store | ||||
|         self.as_api = as_api | ||||
|         self.recoverer_fn = recoverer_fn | ||||
|         # keep track of how many recoverers there are | ||||
|         self.recoverers = [] | ||||
| 
 | ||||
|         # map from service id to recoverer instance | ||||
|         self.recoverers = {} | ||||
| 
 | ||||
|         # for UTs | ||||
|         self.RECOVERER_CLASS = _Recoverer | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def send(self, service, events): | ||||
|  | @ -154,42 +172,45 @@ class _TransactionController(object): | |||
|                 if sent: | ||||
|                     yield txn.complete(self.store) | ||||
|                 else: | ||||
|                     run_in_background(self._start_recoverer, service) | ||||
|                     run_in_background(self._on_txn_fail, service) | ||||
|         except Exception: | ||||
|             logger.exception("Error creating appservice transaction") | ||||
|             run_in_background(self._start_recoverer, service) | ||||
|             run_in_background(self._on_txn_fail, service) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def on_recovered(self, recoverer): | ||||
|         self.recoverers.remove(recoverer) | ||||
|         logger.info( | ||||
|             "Successfully recovered application service AS ID %s", recoverer.service.id | ||||
|         ) | ||||
|         self.recoverers.pop(recoverer.service.id) | ||||
|         logger.info("Remaining active recoverers: %s", len(self.recoverers)) | ||||
|         yield self.store.set_appservice_state( | ||||
|             recoverer.service, ApplicationServiceState.UP | ||||
|         ) | ||||
| 
 | ||||
|     def add_recoverers(self, recoverers): | ||||
|         for r in recoverers: | ||||
|             self.recoverers.append(r) | ||||
|         if len(recoverers) > 0: | ||||
|             logger.info("New active recoverers: %s", len(self.recoverers)) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _start_recoverer(self, service): | ||||
|     def _on_txn_fail(self, service): | ||||
|         try: | ||||
|             yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) | ||||
|             logger.info( | ||||
|                 "Application service falling behind. Starting recoverer. AS ID %s", | ||||
|                 service.id, | ||||
|             ) | ||||
|             recoverer = self.recoverer_fn(service, self.on_recovered) | ||||
|             self.add_recoverers([recoverer]) | ||||
|             recoverer.recover() | ||||
|             self.start_recoverer(service) | ||||
|         except Exception: | ||||
|             logger.exception("Error starting AS recoverer") | ||||
| 
 | ||||
|     def start_recoverer(self, service): | ||||
|         """Start a Recoverer for the given service | ||||
| 
 | ||||
|         Args: | ||||
|             service (synapse.appservice.ApplicationService): | ||||
|         """ | ||||
|         logger.info("Starting recoverer for AS ID %s", service.id) | ||||
|         assert service.id not in self.recoverers | ||||
|         recoverer = self.RECOVERER_CLASS( | ||||
|             self.clock, self.store, self.as_api, service, self.on_recovered | ||||
|         ) | ||||
|         self.recoverers[service.id] = recoverer | ||||
|         recoverer.recover() | ||||
|         logger.info("Now %i active recoverers", len(self.recoverers)) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _is_service_up(self, service): | ||||
|         state = yield self.store.get_appservice_state(service) | ||||
|  | @ -197,18 +218,17 @@ class _TransactionController(object): | |||
| 
 | ||||
| 
 | ||||
| class _Recoverer(object): | ||||
|     @staticmethod | ||||
|     @defer.inlineCallbacks | ||||
|     def start(clock, store, as_api, callback): | ||||
|         services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) | ||||
|         recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] | ||||
|         for r in recoverers: | ||||
|             logger.info( | ||||
|                 "Starting recoverer for AS ID %s which was marked as " "DOWN", | ||||
|                 r.service.id, | ||||
|             ) | ||||
|             r.recover() | ||||
|         return recoverers | ||||
|     """Manages retries and backoff for a DOWN appservice. | ||||
| 
 | ||||
|     We have one of these for each appservice which is currently considered DOWN. | ||||
| 
 | ||||
|     Args: | ||||
|         clock (synapse.util.Clock): | ||||
|         store (synapse.storage.DataStore): | ||||
|         as_api (synapse.appservice.api.ApplicationServiceApi): | ||||
|         service (synapse.appservice.ApplicationService): the service we are managing | ||||
|         callback (callable[_Recoverer]): called once the service recovers. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, clock, store, as_api, service, callback): | ||||
|         self.clock = clock | ||||
|  | @ -224,7 +244,9 @@ class _Recoverer(object): | |||
|                 "as-recoverer-%s" % (self.service.id,), self.retry | ||||
|             ) | ||||
| 
 | ||||
|         self.clock.call_later((2 ** self.backoff_counter), _retry) | ||||
|         delay = 2 ** self.backoff_counter | ||||
|         logger.info("Scheduling retries on %s in %fs", self.service.id, delay) | ||||
|         self.clock.call_later(delay, _retry) | ||||
| 
 | ||||
|     def _backoff(self): | ||||
|         # cap the backoff to be around 8.5min => (2^9) = 512 secs | ||||
|  | @ -234,25 +256,30 @@ class _Recoverer(object): | |||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def retry(self): | ||||
|         logger.info("Starting retries on %s", self.service.id) | ||||
|         try: | ||||
|             txn = yield self.store.get_oldest_unsent_txn(self.service) | ||||
|             if txn: | ||||
|             while True: | ||||
|                 txn = yield self.store.get_oldest_unsent_txn(self.service) | ||||
|                 if not txn: | ||||
|                     # nothing left: we're done! | ||||
|                     self.callback(self) | ||||
|                     return | ||||
| 
 | ||||
|                 logger.info( | ||||
|                     "Retrying transaction %s for AS ID %s", txn.id, txn.service.id | ||||
|                 ) | ||||
|                 sent = yield txn.send(self.as_api) | ||||
|                 if sent: | ||||
|                     yield txn.complete(self.store) | ||||
|                     # reset the backoff counter and retry immediately | ||||
|                     self.backoff_counter = 1 | ||||
|                     yield self.retry() | ||||
|                 else: | ||||
|                     self._backoff() | ||||
|             else: | ||||
|                 self._set_service_recovered() | ||||
|         except Exception as e: | ||||
|             logger.exception(e) | ||||
|             self._backoff() | ||||
|                 if not sent: | ||||
|                     break | ||||
| 
 | ||||
|     def _set_service_recovered(self): | ||||
|         self.callback(self) | ||||
|                 yield txn.complete(self.store) | ||||
| 
 | ||||
|                 # reset the backoff counter and then process the next transaction | ||||
|                 self.backoff_counter = 1 | ||||
| 
 | ||||
|         except Exception: | ||||
|             logger.exception("Unexpected error running retries") | ||||
| 
 | ||||
|         # we didn't manage to send all of the transactions before we got an error of | ||||
|         # some flavour: reschedule the next retry. | ||||
|         self._backoff() | ||||
|  |  | |||
|  | @ -132,21 +132,21 @@ class EmailConfig(Config): | |||
|             self.email_password_reset_template_text = email_config.get( | ||||
|                 "password_reset_template_text", "password_reset.txt" | ||||
|             ) | ||||
|             self.email_password_reset_failure_template = email_config.get( | ||||
|                 "password_reset_failure_template", "password_reset_failure.html" | ||||
|             self.email_password_reset_template_failure_html = email_config.get( | ||||
|                 "password_reset_template_failure_html", "password_reset_failure.html" | ||||
|             ) | ||||
|             # This template does not support any replaceable variables, so we will | ||||
|             # read it from the disk once during setup | ||||
|             email_password_reset_success_template = email_config.get( | ||||
|                 "password_reset_success_template", "password_reset_success.html" | ||||
|             email_password_reset_template_success_html = email_config.get( | ||||
|                 "password_reset_template_success_html", "password_reset_success.html" | ||||
|             ) | ||||
| 
 | ||||
|             # Check templates exist | ||||
|             for f in [ | ||||
|                 self.email_password_reset_template_html, | ||||
|                 self.email_password_reset_template_text, | ||||
|                 self.email_password_reset_failure_template, | ||||
|                 email_password_reset_success_template, | ||||
|                 self.email_password_reset_template_failure_html, | ||||
|                 email_password_reset_template_success_html, | ||||
|             ]: | ||||
|                 p = os.path.join(self.email_template_dir, f) | ||||
|                 if not os.path.isfile(p): | ||||
|  | @ -154,9 +154,9 @@ class EmailConfig(Config): | |||
| 
 | ||||
|             # Retrieve content of web templates | ||||
|             filepath = os.path.join( | ||||
|                 self.email_template_dir, email_password_reset_success_template | ||||
|                 self.email_template_dir, email_password_reset_template_success_html | ||||
|             ) | ||||
|             self.email_password_reset_success_html_content = self.read_file( | ||||
|             self.email_password_reset_template_success_html_content = self.read_file( | ||||
|                 filepath, "email.password_reset_template_success_html" | ||||
|             ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,7 +18,6 @@ import logging | |||
| from collections import defaultdict | ||||
| 
 | ||||
| import six | ||||
| from six import raise_from | ||||
| from six.moves import urllib | ||||
| 
 | ||||
| import attr | ||||
|  | @ -657,9 +656,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher): | |||
|                 }, | ||||
|             ) | ||||
|         except (NotRetryingDestination, RequestSendFailed) as e: | ||||
|             raise_from(KeyLookupError("Failed to connect to remote server"), e) | ||||
|             # these both have str() representations which we can't really improve upon | ||||
|             raise KeyLookupError(str(e)) | ||||
|         except HttpResponseException as e: | ||||
|             raise_from(KeyLookupError("Remote server returned an error"), e) | ||||
|             raise KeyLookupError("Remote server returned an error: %s" % (e,)) | ||||
| 
 | ||||
|         keys = {} | ||||
|         added_keys = [] | ||||
|  | @ -821,9 +821,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher): | |||
|                     timeout=10000, | ||||
|                 ) | ||||
|             except (NotRetryingDestination, RequestSendFailed) as e: | ||||
|                 raise_from(KeyLookupError("Failed to connect to remote server"), e) | ||||
|                 # these both have str() representations which we can't really improve | ||||
|                 # upon | ||||
|                 raise KeyLookupError(str(e)) | ||||
|             except HttpResponseException as e: | ||||
|                 raise_from(KeyLookupError("Remote server returned an error"), e) | ||||
|                 raise KeyLookupError("Remote server returned an error: %s" % (e,)) | ||||
| 
 | ||||
|             if response["server_name"] != server_name: | ||||
|                 raise KeyLookupError( | ||||
|  |  | |||
|  | @ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions | |||
| from synapse.federation.units import Edu, Transaction | ||||
| from synapse.http.endpoint import parse_server_name | ||||
| from synapse.logging.context import nested_logging_context | ||||
| from synapse.logging.opentracing import log_kv, trace | ||||
| from synapse.logging.utils import log_function | ||||
| from synapse.replication.http.federation import ( | ||||
|     ReplicationFederationSendEduRestServlet, | ||||
|  | @ -507,6 +508,7 @@ class FederationServer(FederationBase): | |||
|     def on_query_user_devices(self, origin, user_id): | ||||
|         return self.on_query_request("user_devices", user_id) | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     @log_function | ||||
|     def on_claim_client_keys(self, origin, content): | ||||
|  | @ -515,6 +517,7 @@ class FederationServer(FederationBase): | |||
|             for device_id, algorithm in device_keys.items(): | ||||
|                 query.append((user_id, device_id, algorithm)) | ||||
| 
 | ||||
|         log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) | ||||
|         results = yield self.store.claim_e2e_one_time_keys(query) | ||||
| 
 | ||||
|         json_result = {} | ||||
|  |  | |||
|  | @ -22,7 +22,6 @@ import re | |||
| from twisted.internet.defer import maybeDeferred | ||||
| 
 | ||||
| import synapse | ||||
| import synapse.logging.opentracing as opentracing | ||||
| from synapse.api.errors import Codes, FederationDeniedError, SynapseError | ||||
| from synapse.api.room_versions import RoomVersions | ||||
| from synapse.api.urls import ( | ||||
|  | @ -39,6 +38,7 @@ from synapse.http.servlet import ( | |||
|     parse_string_from_args, | ||||
| ) | ||||
| from synapse.logging.context import run_in_background | ||||
| from synapse.logging.opentracing import start_active_span_from_context, tags | ||||
| from synapse.types import ThirdPartyInstanceID, get_domain_from_id | ||||
| from synapse.util.ratelimitutils import FederationRateLimiter | ||||
| from synapse.util.versionstring import get_version_string | ||||
|  | @ -289,16 +289,17 @@ class BaseFederationServlet(object): | |||
|                 raise | ||||
| 
 | ||||
|             # Start an opentracing span | ||||
|             with opentracing.start_active_span_from_context( | ||||
|             with start_active_span_from_context( | ||||
|                 request.requestHeaders, | ||||
|                 "incoming-federation-request", | ||||
|                 tags={ | ||||
|                     "request_id": request.get_request_id(), | ||||
|                     opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER, | ||||
|                     opentracing.tags.HTTP_METHOD: request.get_method(), | ||||
|                     opentracing.tags.HTTP_URL: request.get_redacted_uri(), | ||||
|                     opentracing.tags.PEER_HOST_IPV6: request.getClientIP(), | ||||
|                     tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, | ||||
|                     tags.HTTP_METHOD: request.get_method(), | ||||
|                     tags.HTTP_URL: request.get_redacted_uri(), | ||||
|                     tags.PEER_HOST_IPV6: request.getClientIP(), | ||||
|                     "authenticated_entity": origin, | ||||
|                     "servlet_name": request.request_metrics.name, | ||||
|                 }, | ||||
|             ): | ||||
|                 if origin: | ||||
|  |  | |||
|  | @ -24,6 +24,7 @@ from twisted.internet import defer | |||
| 
 | ||||
| from synapse.api.errors import CodeMessageException, SynapseError | ||||
| from synapse.logging.context import make_deferred_yieldable, run_in_background | ||||
| from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace | ||||
| from synapse.types import UserID, get_domain_from_id | ||||
| from synapse.util import unwrapFirstError | ||||
| from synapse.util.retryutils import NotRetryingDestination | ||||
|  | @ -46,6 +47,7 @@ class E2eKeysHandler(object): | |||
|             "client_keys", self.on_federation_query_client_keys | ||||
|         ) | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def query_devices(self, query_body, timeout): | ||||
|         """ Handle a device key query from a client | ||||
|  | @ -81,6 +83,9 @@ class E2eKeysHandler(object): | |||
|             else: | ||||
|                 remote_queries[user_id] = device_ids | ||||
| 
 | ||||
|         set_tag("local_key_query", local_query) | ||||
|         set_tag("remote_key_query", remote_queries) | ||||
| 
 | ||||
|         # First get local devices. | ||||
|         failures = {} | ||||
|         results = {} | ||||
|  | @ -121,6 +126,7 @@ class E2eKeysHandler(object): | |||
|                 r[user_id] = remote_queries[user_id] | ||||
| 
 | ||||
|         # Now fetch any devices that we don't have in our cache | ||||
|         @trace | ||||
|         @defer.inlineCallbacks | ||||
|         def do_remote_query(destination): | ||||
|             """This is called when we are querying the device list of a user on | ||||
|  | @ -185,6 +191,8 @@ class E2eKeysHandler(object): | |||
|             except Exception as e: | ||||
|                 failure = _exception_to_failure(e) | ||||
|                 failures[destination] = failure | ||||
|                 set_tag("error", True) | ||||
|                 set_tag("reason", failure) | ||||
| 
 | ||||
|         yield make_deferred_yieldable( | ||||
|             defer.gatherResults( | ||||
|  | @ -198,6 +206,7 @@ class E2eKeysHandler(object): | |||
| 
 | ||||
|         return {"device_keys": results, "failures": failures} | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def query_local_devices(self, query): | ||||
|         """Get E2E device keys for local users | ||||
|  | @ -210,6 +219,7 @@ class E2eKeysHandler(object): | |||
|             defer.Deferred: (resolves to dict[string, dict[string, dict]]): | ||||
|                  map from user_id -> device_id -> device details | ||||
|         """ | ||||
|         set_tag("local_query", query) | ||||
|         local_query = [] | ||||
| 
 | ||||
|         result_dict = {} | ||||
|  | @ -217,6 +227,14 @@ class E2eKeysHandler(object): | |||
|             # we use UserID.from_string to catch invalid user ids | ||||
|             if not self.is_mine(UserID.from_string(user_id)): | ||||
|                 logger.warning("Request for keys for non-local user %s", user_id) | ||||
|                 log_kv( | ||||
|                     { | ||||
|                         "message": "Requested a local key for a user which" | ||||
|                         " was not local to the homeserver", | ||||
|                         "user_id": user_id, | ||||
|                     } | ||||
|                 ) | ||||
|                 set_tag("error", True) | ||||
|                 raise SynapseError(400, "Not a user here") | ||||
| 
 | ||||
|             if not device_ids: | ||||
|  | @ -241,6 +259,7 @@ class E2eKeysHandler(object): | |||
|                     r["unsigned"]["device_display_name"] = display_name | ||||
|                 result_dict[user_id][device_id] = r | ||||
| 
 | ||||
|         log_kv(results) | ||||
|         return result_dict | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|  | @ -251,6 +270,7 @@ class E2eKeysHandler(object): | |||
|         res = yield self.query_local_devices(device_keys_query) | ||||
|         return {"device_keys": res} | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def claim_one_time_keys(self, query, timeout): | ||||
|         local_query = [] | ||||
|  | @ -265,6 +285,9 @@ class E2eKeysHandler(object): | |||
|                 domain = get_domain_from_id(user_id) | ||||
|                 remote_queries.setdefault(domain, {})[user_id] = device_keys | ||||
| 
 | ||||
|         set_tag("local_key_query", local_query) | ||||
|         set_tag("remote_key_query", remote_queries) | ||||
| 
 | ||||
|         results = yield self.store.claim_e2e_one_time_keys(local_query) | ||||
| 
 | ||||
|         json_result = {} | ||||
|  | @ -276,8 +299,10 @@ class E2eKeysHandler(object): | |||
|                         key_id: json.loads(json_bytes) | ||||
|                     } | ||||
| 
 | ||||
|         @trace | ||||
|         @defer.inlineCallbacks | ||||
|         def claim_client_keys(destination): | ||||
|             set_tag("destination", destination) | ||||
|             device_keys = remote_queries[destination] | ||||
|             try: | ||||
|                 remote_result = yield self.federation.claim_client_keys( | ||||
|  | @ -290,6 +315,8 @@ class E2eKeysHandler(object): | |||
|             except Exception as e: | ||||
|                 failure = _exception_to_failure(e) | ||||
|                 failures[destination] = failure | ||||
|                 set_tag("error", True) | ||||
|                 set_tag("reason", failure) | ||||
| 
 | ||||
|         yield make_deferred_yieldable( | ||||
|             defer.gatherResults( | ||||
|  | @ -313,9 +340,11 @@ class E2eKeysHandler(object): | |||
|             ), | ||||
|         ) | ||||
| 
 | ||||
|         log_kv({"one_time_keys": json_result, "failures": failures}) | ||||
|         return {"one_time_keys": json_result, "failures": failures} | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     @tag_args | ||||
|     def upload_keys_for_user(self, user_id, device_id, keys): | ||||
| 
 | ||||
|         time_now = self.clock.time_msec() | ||||
|  | @ -329,6 +358,13 @@ class E2eKeysHandler(object): | |||
|                 user_id, | ||||
|                 time_now, | ||||
|             ) | ||||
|             log_kv( | ||||
|                 { | ||||
|                     "message": "Updating device_keys for user.", | ||||
|                     "user_id": user_id, | ||||
|                     "device_id": device_id, | ||||
|                 } | ||||
|             ) | ||||
|             # TODO: Sign the JSON with the server key | ||||
|             changed = yield self.store.set_e2e_device_keys( | ||||
|                 user_id, device_id, time_now, device_keys | ||||
|  | @ -336,12 +372,24 @@ class E2eKeysHandler(object): | |||
|             if changed: | ||||
|                 # Only notify about device updates *if* the keys actually changed | ||||
|                 yield self.device_handler.notify_device_update(user_id, [device_id]) | ||||
| 
 | ||||
|         else: | ||||
|             log_kv({"message": "Not updating device_keys for user", "user_id": user_id}) | ||||
|         one_time_keys = keys.get("one_time_keys", None) | ||||
|         if one_time_keys: | ||||
|             log_kv( | ||||
|                 { | ||||
|                     "message": "Updating one_time_keys for device.", | ||||
|                     "user_id": user_id, | ||||
|                     "device_id": device_id, | ||||
|                 } | ||||
|             ) | ||||
|             yield self._upload_one_time_keys_for_user( | ||||
|                 user_id, device_id, time_now, one_time_keys | ||||
|             ) | ||||
|         else: | ||||
|             log_kv( | ||||
|                 {"message": "Did not update one_time_keys", "reason": "no keys given"} | ||||
|             ) | ||||
| 
 | ||||
|         # the device should have been registered already, but it may have been | ||||
|         # deleted due to a race with a DELETE request. Or we may be using an | ||||
|  | @ -352,6 +400,7 @@ class E2eKeysHandler(object): | |||
| 
 | ||||
|         result = yield self.store.count_e2e_one_time_keys(user_id, device_id) | ||||
| 
 | ||||
|         set_tag("one_time_key_counts", result) | ||||
|         return {"one_time_key_counts": result} | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|  | @ -395,6 +444,7 @@ class E2eKeysHandler(object): | |||
|                     (algorithm, key_id, encode_canonical_json(key).decode("ascii")) | ||||
|                 ) | ||||
| 
 | ||||
|         log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys}) | ||||
|         yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys) | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -26,6 +26,7 @@ from synapse.api.errors import ( | |||
|     StoreError, | ||||
|     SynapseError, | ||||
| ) | ||||
| from synapse.logging.opentracing import log_kv, trace | ||||
| from synapse.util.async_helpers import Linearizer | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
|  | @ -49,6 +50,7 @@ class E2eRoomKeysHandler(object): | |||
|         # changed. | ||||
|         self._upload_linearizer = Linearizer("upload_room_keys_lock") | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def get_room_keys(self, user_id, version, room_id=None, session_id=None): | ||||
|         """Bulk get the E2E room keys for a given backup, optionally filtered to a given | ||||
|  | @ -84,8 +86,10 @@ class E2eRoomKeysHandler(object): | |||
|                 user_id, version, room_id, session_id | ||||
|             ) | ||||
| 
 | ||||
|             log_kv(results) | ||||
|             return results | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def delete_room_keys(self, user_id, version, room_id=None, session_id=None): | ||||
|         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given | ||||
|  | @ -107,6 +111,7 @@ class E2eRoomKeysHandler(object): | |||
|         with (yield self._upload_linearizer.queue(user_id)): | ||||
|             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def upload_room_keys(self, user_id, version, room_keys): | ||||
|         """Bulk upload a list of room keys into a given backup version, asserting | ||||
|  | @ -186,7 +191,14 @@ class E2eRoomKeysHandler(object): | |||
|             session_id(str): the session whose room_key we're setting | ||||
|             room_key(dict): the room_key being set | ||||
|         """ | ||||
| 
 | ||||
|         log_kv( | ||||
|             { | ||||
|                 "message": "Trying to upload room key", | ||||
|                 "room_id": room_id, | ||||
|                 "session_id": session_id, | ||||
|                 "user_id": user_id, | ||||
|             } | ||||
|         ) | ||||
|         # get the room_key for this particular row | ||||
|         current_room_key = None | ||||
|         try: | ||||
|  | @ -195,14 +207,23 @@ class E2eRoomKeysHandler(object): | |||
|             ) | ||||
|         except StoreError as e: | ||||
|             if e.code == 404: | ||||
|                 pass | ||||
|                 log_kv( | ||||
|                     { | ||||
|                         "message": "Room key not found.", | ||||
|                         "room_id": room_id, | ||||
|                         "user_id": user_id, | ||||
|                     } | ||||
|                 ) | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         if self._should_replace_room_key(current_room_key, room_key): | ||||
|             log_kv({"message": "Replacing room key."}) | ||||
|             yield self.store.set_e2e_room_key( | ||||
|                 user_id, version, room_id, session_id, room_key | ||||
|             ) | ||||
|         else: | ||||
|             log_kv({"message": "Not replacing room_key."}) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def _should_replace_room_key(current_room_key, room_key): | ||||
|  | @ -236,6 +257,7 @@ class E2eRoomKeysHandler(object): | |||
|                 return False | ||||
|         return True | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def create_version(self, user_id, version_info): | ||||
|         """Create a new backup version.  This automatically becomes the new | ||||
|  | @ -294,6 +316,7 @@ class E2eRoomKeysHandler(object): | |||
|                     raise | ||||
|             return res | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def delete_version(self, user_id, version=None): | ||||
|         """Deletes a given version of the user's e2e_room_keys backup | ||||
|  | @ -314,6 +337,7 @@ class E2eRoomKeysHandler(object): | |||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def update_version(self, user_id, version, version_info): | ||||
|         """Update the info about a given version of the user's backup | ||||
|  |  | |||
|  | @ -70,6 +70,7 @@ class PaginationHandler(object): | |||
|         self.auth = hs.get_auth() | ||||
|         self.store = hs.get_datastore() | ||||
|         self.clock = hs.get_clock() | ||||
|         self._server_name = hs.hostname | ||||
| 
 | ||||
|         self.pagination_lock = ReadWriteLock() | ||||
|         self._purges_in_progress_by_room = set() | ||||
|  | @ -153,6 +154,22 @@ class PaginationHandler(object): | |||
|         """ | ||||
|         return self._purges_by_id.get(purge_id) | ||||
| 
 | ||||
|     async def purge_room(self, room_id): | ||||
|         """Purge the given room from the database""" | ||||
|         with (await self.pagination_lock.write(room_id)): | ||||
|             # check we know about the room | ||||
|             await self.store.get_room_version(room_id) | ||||
| 
 | ||||
|             # first check that we have no users in this room | ||||
|             joined = await defer.maybeDeferred( | ||||
|                 self.store.is_host_joined, room_id, self._server_name | ||||
|             ) | ||||
| 
 | ||||
|             if joined: | ||||
|                 raise SynapseError(400, "Users are still joined to this room") | ||||
| 
 | ||||
|             await self.store.purge_room(room_id) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def get_messages( | ||||
|         self, | ||||
|  |  | |||
|  | @ -786,9 +786,8 @@ class SyncHandler(object): | |||
|                         batch.events[0].event_id, state_filter=state_filter | ||||
|                     ) | ||||
|                 else: | ||||
|                     # Its not clear how we get here, but empirically we do | ||||
|                     # (#5407). Logging has been added elsewhere to try and | ||||
|                     # figure out where this state comes from. | ||||
|                     # We can get here if the user has ignored the senders of all | ||||
|                     # the recent events. | ||||
|                     state_at_timeline_start = yield self.get_state_at( | ||||
|                         room_id, stream_position=now_token, state_filter=state_filter | ||||
|                     ) | ||||
|  | @ -1771,20 +1770,9 @@ class SyncHandler(object): | |||
|             newly_joined_room=newly_joined, | ||||
|         ) | ||||
| 
 | ||||
|         if not batch and batch.limited: | ||||
|             # This resulted in #5407, which is weird, so lets log! We do it | ||||
|             # here as we have the maximum amount of information. | ||||
|             user_id = sync_result_builder.sync_config.user.to_string() | ||||
|             logger.info( | ||||
|                 "Issue #5407: Found limited batch with no events. user %s, room %s," | ||||
|                 " sync_config %s, newly_joined %s, events %s, batch %s.", | ||||
|                 user_id, | ||||
|                 room_id, | ||||
|                 sync_config, | ||||
|                 newly_joined, | ||||
|                 events, | ||||
|                 batch, | ||||
|             ) | ||||
|         # Note: `batch` can be both empty and limited here in the case where | ||||
|         # `_load_filtered_recents` can't find any events the user should see | ||||
|         # (e.g. due to having ignored the sender of the last 50 events). | ||||
| 
 | ||||
|         if newly_joined: | ||||
|             # debug for https://github.com/matrix-org/synapse/issues/4422 | ||||
|  |  | |||
|  | @ -52,9 +52,9 @@ class MatrixFederationAgent(object): | |||
|             SRVResolver impl to use for looking up SRV records. None to use a default | ||||
|             implementation. | ||||
| 
 | ||||
|         _well_known_cache (TTLCache|None): | ||||
|             TTLCache impl for storing cached well-known lookups. None to use a default | ||||
|             implementation. | ||||
|         _well_known_resolver (WellKnownResolver|None): | ||||
|             WellKnownResolver to use to perform well-known lookups. None to use a | ||||
|             default implementation. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__( | ||||
|  | @ -62,7 +62,7 @@ class MatrixFederationAgent(object): | |||
|         reactor, | ||||
|         tls_client_options_factory, | ||||
|         _srv_resolver=None, | ||||
|         _well_known_cache=None, | ||||
|         _well_known_resolver=None, | ||||
|     ): | ||||
|         self._reactor = reactor | ||||
|         self._clock = Clock(reactor) | ||||
|  | @ -79,15 +79,17 @@ class MatrixFederationAgent(object): | |||
|             pool=self._pool, | ||||
|         ) | ||||
| 
 | ||||
|         self._well_known_resolver = WellKnownResolver( | ||||
|             self._reactor, | ||||
|             agent=Agent( | ||||
|         if _well_known_resolver is None: | ||||
|             _well_known_resolver = WellKnownResolver( | ||||
|                 self._reactor, | ||||
|                 contextFactory=tls_client_options_factory, | ||||
|                 pool=self._pool, | ||||
|             ), | ||||
|             well_known_cache=_well_known_cache, | ||||
|         ) | ||||
|                 agent=Agent( | ||||
|                     self._reactor, | ||||
|                     pool=self._pool, | ||||
|                     contextFactory=tls_client_options_factory, | ||||
|                 ), | ||||
|             ) | ||||
| 
 | ||||
|         self._well_known_resolver = _well_known_resolver | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def request(self, method, uri, headers=None, bodyProducer=None): | ||||
|  |  | |||
|  | @ -32,12 +32,19 @@ from synapse.util.metrics import Measure | |||
| # period to cache .well-known results for by default | ||||
| WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600 | ||||
| 
 | ||||
| # jitter to add to the .well-known default cache ttl | ||||
| WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60 | ||||
| # jitter factor to add to the .well-known default cache ttls | ||||
| WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1 | ||||
| 
 | ||||
| # period to cache failure to fetch .well-known for | ||||
| WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600 | ||||
| 
 | ||||
| # period to cache failure to fetch .well-known if there has recently been a | ||||
| # valid well-known for that domain. | ||||
| WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60 | ||||
| 
 | ||||
| # period to remember there was a valid well-known after valid record expires | ||||
| WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600 | ||||
| 
 | ||||
| # cap for .well-known cache period | ||||
| WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600 | ||||
| 
 | ||||
|  | @ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60 | |||
| # we'll start trying to refetch 1 minute before it expires. | ||||
| WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2 | ||||
| 
 | ||||
| # Number of times we retry fetching a well-known for a domain we know recently | ||||
| # had a valid entry. | ||||
| WELL_KNOWN_RETRY_ATTEMPTS = 3 | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| _well_known_cache = TTLCache("well-known") | ||||
| _had_valid_well_known_cache = TTLCache("had-valid-well-known") | ||||
| 
 | ||||
| 
 | ||||
| @attr.s(slots=True, frozen=True) | ||||
|  | @ -65,14 +77,20 @@ class WellKnownResolver(object): | |||
|     """Handles well-known lookups for matrix servers. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, reactor, agent, well_known_cache=None): | ||||
|     def __init__( | ||||
|         self, reactor, agent, well_known_cache=None, had_well_known_cache=None | ||||
|     ): | ||||
|         self._reactor = reactor | ||||
|         self._clock = Clock(reactor) | ||||
| 
 | ||||
|         if well_known_cache is None: | ||||
|             well_known_cache = _well_known_cache | ||||
| 
 | ||||
|         if had_well_known_cache is None: | ||||
|             had_well_known_cache = _had_valid_well_known_cache | ||||
| 
 | ||||
|         self._well_known_cache = well_known_cache | ||||
|         self._had_valid_well_known_cache = had_well_known_cache | ||||
|         self._well_known_agent = RedirectAgent(agent) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|  | @ -100,7 +118,7 @@ class WellKnownResolver(object): | |||
|         # requests for the same server in parallel? | ||||
|         try: | ||||
|             with Measure(self._clock, "get_well_known"): | ||||
|                 result, cache_period = yield self._do_get_well_known(server_name) | ||||
|                 result, cache_period = yield self._fetch_well_known(server_name) | ||||
| 
 | ||||
|         except _FetchWellKnownFailure as e: | ||||
|             if prev_result and e.temporary: | ||||
|  | @ -111,10 +129,18 @@ class WellKnownResolver(object): | |||
| 
 | ||||
|             result = None | ||||
| 
 | ||||
|             # add some randomness to the TTL to avoid a stampeding herd every hour | ||||
|             # after startup | ||||
|             cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD | ||||
|             cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) | ||||
|             if self._had_valid_well_known_cache.get(server_name, False): | ||||
|                 # We have recently seen a valid well-known record for this | ||||
|                 # server, so we cache the lack of well-known for a shorter time. | ||||
|                 cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD | ||||
|             else: | ||||
|                 cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD | ||||
| 
 | ||||
|             # add some randomness to the TTL to avoid a stampeding herd | ||||
|             cache_period *= random.uniform( | ||||
|                 1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, | ||||
|                 1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, | ||||
|             ) | ||||
| 
 | ||||
|         if cache_period > 0: | ||||
|             self._well_known_cache.set(server_name, result, cache_period) | ||||
|  | @ -122,7 +148,7 @@ class WellKnownResolver(object): | |||
|         return WellKnownLookupResult(delegated_server=result) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _do_get_well_known(self, server_name): | ||||
|     def _fetch_well_known(self, server_name): | ||||
|         """Actually fetch and parse a .well-known, without checking the cache | ||||
| 
 | ||||
|         Args: | ||||
|  | @ -134,24 +160,15 @@ class WellKnownResolver(object): | |||
|         Returns: | ||||
|             Deferred[Tuple[bytes,int]]: The lookup result and cache period. | ||||
|         """ | ||||
|         uri = b"https://%s/.well-known/matrix/server" % (server_name,) | ||||
|         uri_str = uri.decode("ascii") | ||||
|         logger.info("Fetching %s", uri_str) | ||||
| 
 | ||||
|         had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False) | ||||
| 
 | ||||
|         # We do this in two steps to differentiate between possibly transient | ||||
|         # errors (e.g. can't connect to host, 503 response) and more permenant | ||||
|         # errors (such as getting a 404 response). | ||||
|         try: | ||||
|             response = yield make_deferred_yieldable( | ||||
|                 self._well_known_agent.request(b"GET", uri) | ||||
|             ) | ||||
|             body = yield make_deferred_yieldable(readBody(response)) | ||||
| 
 | ||||
|             if 500 <= response.code < 600: | ||||
|                 raise Exception("Non-200 response %s" % (response.code,)) | ||||
|         except Exception as e: | ||||
|             logger.info("Error fetching %s: %s", uri_str, e) | ||||
|             raise _FetchWellKnownFailure(temporary=True) | ||||
|         response, body = yield self._make_well_known_request( | ||||
|             server_name, retry=had_valid_well_known | ||||
|         ) | ||||
| 
 | ||||
|         try: | ||||
|             if response.code != 200: | ||||
|  | @ -161,8 +178,11 @@ class WellKnownResolver(object): | |||
|             logger.info("Response from .well-known: %s", parsed_body) | ||||
| 
 | ||||
|             result = parsed_body["m.server"].encode("ascii") | ||||
|         except defer.CancelledError: | ||||
|             # Bail if we've been cancelled | ||||
|             raise | ||||
|         except Exception as e: | ||||
|             logger.info("Error fetching %s: %s", uri_str, e) | ||||
|             logger.info("Error parsing well-known for %s: %s", server_name, e) | ||||
|             raise _FetchWellKnownFailure(temporary=False) | ||||
| 
 | ||||
|         cache_period = _cache_period_from_headers( | ||||
|  | @ -172,13 +192,69 @@ class WellKnownResolver(object): | |||
|             cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD | ||||
|             # add some randomness to the TTL to avoid a stampeding herd every 24 hours | ||||
|             # after startup | ||||
|             cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) | ||||
|             cache_period *= random.uniform( | ||||
|                 1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, | ||||
|                 1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, | ||||
|             ) | ||||
|         else: | ||||
|             cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD) | ||||
|             cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD) | ||||
| 
 | ||||
|         # We got a success, mark as such in the cache | ||||
|         self._had_valid_well_known_cache.set( | ||||
|             server_name, | ||||
|             bool(result), | ||||
|             cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID, | ||||
|         ) | ||||
| 
 | ||||
|         return (result, cache_period) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _make_well_known_request(self, server_name, retry): | ||||
|         """Make the well known request. | ||||
| 
 | ||||
|         This will retry the request if requested and it fails (with unable | ||||
|         to connect or receives a 5xx error). | ||||
| 
 | ||||
|         Args: | ||||
|             server_name (bytes) | ||||
|             retry (bool): Whether to retry the request if it fails. | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred[tuple[IResponse, bytes]] Returns the response object and | ||||
|             body. Response may be a non-200 response. | ||||
|         """ | ||||
|         uri = b"https://%s/.well-known/matrix/server" % (server_name,) | ||||
|         uri_str = uri.decode("ascii") | ||||
| 
 | ||||
|         i = 0 | ||||
|         while True: | ||||
|             i += 1 | ||||
| 
 | ||||
|             logger.info("Fetching %s", uri_str) | ||||
|             try: | ||||
|                 response = yield make_deferred_yieldable( | ||||
|                     self._well_known_agent.request(b"GET", uri) | ||||
|                 ) | ||||
|                 body = yield make_deferred_yieldable(readBody(response)) | ||||
| 
 | ||||
|                 if 500 <= response.code < 600: | ||||
|                     raise Exception("Non-200 response %s" % (response.code,)) | ||||
| 
 | ||||
|                 return response, body | ||||
|             except defer.CancelledError: | ||||
|                 # Bail if we've been cancelled | ||||
|                 raise | ||||
|             except Exception as e: | ||||
|                 if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS: | ||||
|                     logger.info("Error fetching %s: %s", uri_str, e) | ||||
|                     raise _FetchWellKnownFailure(temporary=True) | ||||
| 
 | ||||
|                 logger.info("Error fetching %s: %s. Retrying", uri_str, e) | ||||
| 
 | ||||
|             # Sleep briefly in the hopes that they come back up | ||||
|             yield self._clock.sleep(0.5) | ||||
| 
 | ||||
| 
 | ||||
| def _cache_period_from_headers(headers, time_now=time.time): | ||||
|     cache_controls = _parse_cache_control(headers) | ||||
|  |  | |||
|  | @ -36,7 +36,6 @@ from twisted.internet.task import _EPSILON, Cooperator | |||
| from twisted.web._newclient import ResponseDone | ||||
| from twisted.web.http_headers import Headers | ||||
| 
 | ||||
| import synapse.logging.opentracing as opentracing | ||||
| import synapse.metrics | ||||
| import synapse.util.retryutils | ||||
| from synapse.api.errors import ( | ||||
|  | @ -50,6 +49,12 @@ from synapse.http import QuieterFileBodyProducer | |||
| from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver | ||||
| from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent | ||||
| from synapse.logging.context import make_deferred_yieldable | ||||
| from synapse.logging.opentracing import ( | ||||
|     inject_active_span_byte_dict, | ||||
|     set_tag, | ||||
|     start_active_span, | ||||
|     tags, | ||||
| ) | ||||
| from synapse.util.async_helpers import timeout_deferred | ||||
| from synapse.util.metrics import Measure | ||||
| 
 | ||||
|  | @ -341,20 +346,20 @@ class MatrixFederationHttpClient(object): | |||
|             query_bytes = b"" | ||||
| 
 | ||||
|         # Retreive current span | ||||
|         scope = opentracing.start_active_span( | ||||
|         scope = start_active_span( | ||||
|             "outgoing-federation-request", | ||||
|             tags={ | ||||
|                 opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT, | ||||
|                 opentracing.tags.PEER_ADDRESS: request.destination, | ||||
|                 opentracing.tags.HTTP_METHOD: request.method, | ||||
|                 opentracing.tags.HTTP_URL: request.path, | ||||
|                 tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, | ||||
|                 tags.PEER_ADDRESS: request.destination, | ||||
|                 tags.HTTP_METHOD: request.method, | ||||
|                 tags.HTTP_URL: request.path, | ||||
|             }, | ||||
|             finish_on_close=True, | ||||
|         ) | ||||
| 
 | ||||
|         # Inject the span into the headers | ||||
|         headers_dict = {} | ||||
|         opentracing.inject_active_span_byte_dict(headers_dict, request.destination) | ||||
|         inject_active_span_byte_dict(headers_dict, request.destination) | ||||
| 
 | ||||
|         headers_dict[b"User-Agent"] = [self.version_string_bytes] | ||||
| 
 | ||||
|  | @ -436,9 +441,7 @@ class MatrixFederationHttpClient(object): | |||
|                         response.phrase.decode("ascii", errors="replace"), | ||||
|                     ) | ||||
| 
 | ||||
|                     opentracing.set_tag( | ||||
|                         opentracing.tags.HTTP_STATUS_CODE, response.code | ||||
|                     ) | ||||
|                     set_tag(tags.HTTP_STATUS_CODE, response.code) | ||||
| 
 | ||||
|                     if 200 <= response.code < 300: | ||||
|                         pass | ||||
|  |  | |||
|  | @ -43,6 +43,9 @@ OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as | |||
| an optional dependency. This does however limit the number of modifiable spans | ||||
| at any point in the code to one. From here out references to `opentracing` | ||||
| in the code snippets refer to the Synapses module. | ||||
| Most methods provided in the module have a direct correlation to those provided | ||||
| by opentracing. Refer to docs there for a more in-depth documentation on some of | ||||
| the args and methods. | ||||
| 
 | ||||
| Tracing | ||||
| ------- | ||||
|  | @ -68,52 +71,62 @@ set a tag on the current active span. | |||
| Tracing functions | ||||
| ----------------- | ||||
| 
 | ||||
| Functions can be easily traced using decorators. There is a decorator for | ||||
| 'normal' function and for functions which are actually deferreds. The name of | ||||
| Functions can be easily traced using decorators. The name of | ||||
| the function becomes the operation name for the span. | ||||
| 
 | ||||
| .. code-block:: python | ||||
| 
 | ||||
|    from synapse.logging.opentracing import trace, trace_deferred | ||||
|    from synapse.logging.opentracing import trace | ||||
| 
 | ||||
|    # Start a span using 'normal_function' as the operation name | ||||
|    # Start a span using 'interesting_function' as the operation name | ||||
|    @trace | ||||
|    def normal_function(*args, **kwargs): | ||||
|    def interesting_function(*args, **kwargs): | ||||
|        # Does all kinds of cool and expected things | ||||
|        return something_usual_and_useful | ||||
| 
 | ||||
|    # Start a span using 'deferred_function' as the operation name | ||||
|    @trace_deferred | ||||
|    @defer.inlineCallbacks | ||||
|    def deferred_function(*args, **kwargs): | ||||
|        # We start | ||||
|        yield we_wait | ||||
|        # we finish | ||||
|        return something_usual_and_useful | ||||
| 
 | ||||
| Operation names can be explicitly set for functions by using | ||||
| ``trace_using_operation_name`` and | ||||
| ``trace_deferred_using_operation_name`` | ||||
| ``trace_using_operation_name`` | ||||
| 
 | ||||
| .. code-block:: python | ||||
| 
 | ||||
|    from synapse.logging.opentracing import ( | ||||
|        trace_using_operation_name, | ||||
|        trace_deferred_using_operation_name | ||||
|    ) | ||||
|    from synapse.logging.opentracing import trace_using_operation_name | ||||
| 
 | ||||
|    @trace_using_operation_name("A *much* better operation name") | ||||
|    def normal_function(*args, **kwargs): | ||||
|    def interesting_badly_named_function(*args, **kwargs): | ||||
|        # Does all kinds of cool and expected things | ||||
|        return something_usual_and_useful | ||||
| 
 | ||||
|    @trace_deferred_using_operation_name("Another exciting operation name!") | ||||
|    @defer.inlineCallbacks | ||||
|    def deferred_function(*args, **kwargs): | ||||
|        # We start | ||||
|        yield we_wait | ||||
|        # we finish | ||||
|        return something_usual_and_useful | ||||
| Setting Tags | ||||
| ------------ | ||||
| 
 | ||||
| To set a tag on the active span do | ||||
| 
 | ||||
| .. code-block:: python | ||||
| 
 | ||||
|    from synapse.logging.opentracing import set_tag | ||||
| 
 | ||||
|    set_tag(tag_name, tag_value) | ||||
| 
 | ||||
| There's a convenient decorator to tag all the args of the method. It uses | ||||
| inspection in order to use the formal parameter names prefixed with 'ARG_' as | ||||
| tag names. It uses kwarg names as tag names without the prefix. | ||||
| 
 | ||||
| .. code-block:: python | ||||
| 
 | ||||
|    from synapse.logging.opentracing import tag_args | ||||
| 
 | ||||
|    @tag_args | ||||
|    def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): | ||||
|        pass | ||||
| 
 | ||||
|    set_fates("the story", "the end", "the act") | ||||
|    # This will have the following tags | ||||
|    #  - ARG_clotho: "the story" | ||||
|    #  - ARG_lachesis: "the end" | ||||
|    #  - ARG_atropos: "the act" | ||||
|    #  - father: "Zues" | ||||
|    #  - mother: "Themis" | ||||
| 
 | ||||
| Contexts and carriers | ||||
| --------------------- | ||||
|  |  | |||
|  | @ -72,7 +72,6 @@ REQUIREMENTS = [ | |||
|     "netaddr>=0.7.18", | ||||
|     "Jinja2>=2.9", | ||||
|     "bleach>=1.4.3", | ||||
|     "sdnotify>=0.3", | ||||
| ] | ||||
| 
 | ||||
| CONDITIONAL_REQUIREMENTS = { | ||||
|  |  | |||
|  | @ -42,6 +42,7 @@ from synapse.rest.admin._base import ( | |||
|     historical_admin_path_patterns, | ||||
| ) | ||||
| from synapse.rest.admin.media import register_servlets_for_media_repo | ||||
| from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet | ||||
| from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet | ||||
| from synapse.types import UserID, create_requester | ||||
| from synapse.util.versionstring import get_version_string | ||||
|  | @ -738,6 +739,7 @@ def register_servlets(hs, http_server): | |||
|     Register all the admin servlets. | ||||
|     """ | ||||
|     register_servlets_for_client_rest_resource(hs, http_server) | ||||
|     PurgeRoomServlet(hs).register(http_server) | ||||
|     SendServerNoticeServlet(hs).register(http_server) | ||||
|     VersionServlet(hs).register(http_server) | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,57 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2019 The Matrix.org Foundation C.I.C. | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| import re | ||||
| 
 | ||||
| from synapse.http.servlet import ( | ||||
|     RestServlet, | ||||
|     assert_params_in_dict, | ||||
|     parse_json_object_from_request, | ||||
| ) | ||||
| from synapse.rest.admin import assert_requester_is_admin | ||||
| 
 | ||||
| 
 | ||||
| class PurgeRoomServlet(RestServlet): | ||||
|     """Servlet which will remove all trace of a room from the database | ||||
| 
 | ||||
|     POST /_synapse/admin/v1/purge_room | ||||
|     { | ||||
|         "room_id": "!room:id" | ||||
|     } | ||||
| 
 | ||||
|     returns: | ||||
| 
 | ||||
|     {} | ||||
|     """ | ||||
| 
 | ||||
|     PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),) | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         """ | ||||
|         Args: | ||||
|             hs (synapse.server.HomeServer): server | ||||
|         """ | ||||
|         self.hs = hs | ||||
|         self.auth = hs.get_auth() | ||||
|         self.pagination_handler = hs.get_pagination_handler() | ||||
| 
 | ||||
|     async def on_POST(self, request): | ||||
|         await assert_requester_is_admin(self.auth, request) | ||||
| 
 | ||||
|         body = parse_json_object_from_request(request) | ||||
|         assert_params_in_dict(body, ("room_id",)) | ||||
| 
 | ||||
|         await self.pagination_handler.purge_room(body["room_id"]) | ||||
| 
 | ||||
|         return (200, {}) | ||||
|  | @ -282,13 +282,13 @@ class PasswordResetSubmitTokenServlet(RestServlet): | |||
|                     return None | ||||
| 
 | ||||
|             # Otherwise show the success template | ||||
|             html = self.config.email_password_reset_success_html_content | ||||
|             html = self.config.email_password_reset_template_success_html_content | ||||
|             request.setResponseCode(200) | ||||
|         except ThreepidValidationError as e: | ||||
|             # Show a failure page with a reason | ||||
|             html = self.load_jinja2_template( | ||||
|                 self.config.email_template_dir, | ||||
|                 self.config.email_password_reset_failure_template, | ||||
|                 self.config.email_password_reset_template_failure_html, | ||||
|                 template_vars={"failure_reason": e.msg}, | ||||
|             ) | ||||
|             request.setResponseCode(e.code) | ||||
|  |  | |||
|  | @ -24,6 +24,7 @@ from synapse.http.servlet import ( | |||
|     parse_json_object_from_request, | ||||
|     parse_string, | ||||
| ) | ||||
| from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name | ||||
| from synapse.types import StreamToken | ||||
| 
 | ||||
| from ._base import client_patterns | ||||
|  | @ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet): | |||
|         self.auth = hs.get_auth() | ||||
|         self.e2e_keys_handler = hs.get_e2e_keys_handler() | ||||
| 
 | ||||
|     @trace_using_operation_name("upload_keys") | ||||
|     @defer.inlineCallbacks | ||||
|     def on_POST(self, request, device_id): | ||||
|         requester = yield self.auth.get_user_by_req(request, allow_guest=True) | ||||
|  | @ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet): | |||
|             # passing the device_id here is deprecated; however, we allow it | ||||
|             # for now for compatibility with older clients. | ||||
|             if requester.device_id is not None and device_id != requester.device_id: | ||||
|                 set_tag("error", True) | ||||
|                 log_kv( | ||||
|                     { | ||||
|                         "message": "Client uploading keys for a different device", | ||||
|                         "logged_in_id": requester.device_id, | ||||
|                         "key_being_uploaded": device_id, | ||||
|                     } | ||||
|                 ) | ||||
|                 logger.warning( | ||||
|                     "Client uploading keys for a different device " | ||||
|                     "(logged in as %s, uploading for %s)", | ||||
|  | @ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet): | |||
|         requester = yield self.auth.get_user_by_req(request, allow_guest=True) | ||||
| 
 | ||||
|         from_token_string = parse_string(request, "from") | ||||
|         set_tag("from", from_token_string) | ||||
| 
 | ||||
|         # We want to enforce they do pass us one, but we ignore it and return | ||||
|         # changes after the "to" as well as before. | ||||
|         parse_string(request, "to") | ||||
|         set_tag("to", parse_string(request, "to")) | ||||
| 
 | ||||
|         from_token = StreamToken.from_string(from_token_string) | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,6 +18,7 @@ import json | |||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.api.errors import StoreError | ||||
| from synapse.logging.opentracing import log_kv, trace | ||||
| 
 | ||||
| from ._base import SQLBaseStore | ||||
| 
 | ||||
|  | @ -82,11 +83,11 @@ class EndToEndRoomKeyStore(SQLBaseStore): | |||
|             table="e2e_room_keys", | ||||
|             keyvalues={ | ||||
|                 "user_id": user_id, | ||||
|                 "version": version, | ||||
|                 "room_id": room_id, | ||||
|                 "session_id": session_id, | ||||
|             }, | ||||
|             values={ | ||||
|                 "version": version, | ||||
|                 "first_message_index": room_key["first_message_index"], | ||||
|                 "forwarded_count": room_key["forwarded_count"], | ||||
|                 "is_verified": room_key["is_verified"], | ||||
|  | @ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore): | |||
|             }, | ||||
|             lock=False, | ||||
|         ) | ||||
|         log_kv( | ||||
|             { | ||||
|                 "message": "Set room key", | ||||
|                 "room_id": room_id, | ||||
|                 "session_id": session_id, | ||||
|                 "room_key": room_key, | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): | ||||
|         """Bulk get the E2E room keys for a given backup, optionally filtered to a given | ||||
|  | @ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): | |||
| 
 | ||||
|         return sessions | ||||
| 
 | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): | ||||
|         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given | ||||
|  | @ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): | |||
|             "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn | ||||
|         ) | ||||
| 
 | ||||
|     @trace | ||||
|     def create_e2e_room_keys_version(self, user_id, info): | ||||
|         """Atomically creates a new version of this user's e2e_room_keys store | ||||
|         with the given version info. | ||||
|  | @ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): | |||
|             "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn | ||||
|         ) | ||||
| 
 | ||||
|     @trace | ||||
|     def update_e2e_room_keys_version(self, user_id, version, info): | ||||
|         """Update a given backup version | ||||
| 
 | ||||
|  | @ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): | |||
|             desc="update_e2e_room_keys_version", | ||||
|         ) | ||||
| 
 | ||||
|     @trace | ||||
|     def delete_e2e_room_keys_version(self, user_id, version=None): | ||||
|         """Delete a given backup version of the user's room keys. | ||||
|         Doesn't delete their actual key data. | ||||
|  |  | |||
|  | @ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json | |||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.logging.opentracing import log_kv, set_tag, trace | ||||
| from synapse.util.caches.descriptors import cached | ||||
| 
 | ||||
| from ._base import SQLBaseStore, db_to_json | ||||
| 
 | ||||
| 
 | ||||
| class EndToEndKeyWorkerStore(SQLBaseStore): | ||||
|     @trace | ||||
|     @defer.inlineCallbacks | ||||
|     def get_e2e_device_keys( | ||||
|         self, query_list, include_all_devices=False, include_deleted_devices=False | ||||
|  | @ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): | |||
|             Dict mapping from user-id to dict mapping from device_id to | ||||
|             dict containing "key_json", "device_display_name". | ||||
|         """ | ||||
|         set_tag("query_list", query_list) | ||||
|         if not query_list: | ||||
|             return {} | ||||
| 
 | ||||
|  | @ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore): | |||
| 
 | ||||
|         return results | ||||
| 
 | ||||
|     @trace | ||||
|     def _get_e2e_device_keys_txn( | ||||
|         self, txn, query_list, include_all_devices=False, include_deleted_devices=False | ||||
|     ): | ||||
|         set_tag("include_all_devices", include_all_devices) | ||||
|         set_tag("include_deleted_devices", include_deleted_devices) | ||||
| 
 | ||||
|         query_clauses = [] | ||||
|         query_params = [] | ||||
| 
 | ||||
|  | @ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): | |||
|             for user_id, device_id in deleted_devices: | ||||
|                 result.setdefault(user_id, {})[device_id] = None | ||||
| 
 | ||||
|         log_kv(result) | ||||
|         return result | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|  | @ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): | |||
|             keyvalues={"user_id": user_id, "device_id": device_id}, | ||||
|             desc="add_e2e_one_time_keys_check", | ||||
|         ) | ||||
| 
 | ||||
|         return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} | ||||
|         result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} | ||||
|         log_kv({"message": "Fetched one time keys for user", "one_time_keys": result}) | ||||
|         return result | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): | ||||
|  | @ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): | |||
|         """ | ||||
| 
 | ||||
|         def _add_e2e_one_time_keys(txn): | ||||
|             set_tag("user_id", user_id) | ||||
|             set_tag("device_id", device_id) | ||||
|             set_tag("new_keys", new_keys) | ||||
|             # We are protected from race between lookup and insertion due to | ||||
|             # a unique constraint. If there is a race of two calls to | ||||
|             # `add_e2e_one_time_keys` then they'll conflict and we will only | ||||
|  | @ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): | |||
|         """ | ||||
| 
 | ||||
|         def _set_e2e_device_keys_txn(txn): | ||||
|             set_tag("user_id", user_id) | ||||
|             set_tag("device_id", device_id) | ||||
|             set_tag("time_now", time_now) | ||||
|             set_tag("device_keys", device_keys) | ||||
| 
 | ||||
|             old_key_json = self._simple_select_one_onecol_txn( | ||||
|                 txn, | ||||
|                 table="e2e_device_keys_json", | ||||
|  | @ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): | |||
|             new_key_json = encode_canonical_json(device_keys).decode("utf-8") | ||||
| 
 | ||||
|             if old_key_json == new_key_json: | ||||
|                 log_kv({"Message": "Device key already stored."}) | ||||
|                 return False | ||||
| 
 | ||||
|             self._simple_upsert_txn( | ||||
|  | @ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): | |||
|                 keyvalues={"user_id": user_id, "device_id": device_id}, | ||||
|                 values={"ts_added_ms": time_now, "key_json": new_key_json}, | ||||
|             ) | ||||
| 
 | ||||
|             log_kv({"message": "Device keys stored."}) | ||||
|             return True | ||||
| 
 | ||||
|         return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn) | ||||
|  | @ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): | |||
|     def claim_e2e_one_time_keys(self, query_list): | ||||
|         """Take a list of one time keys out of the database""" | ||||
| 
 | ||||
|         @trace | ||||
|         def _claim_e2e_one_time_keys(txn): | ||||
|             sql = ( | ||||
|                 "SELECT key_id, key_json FROM e2e_one_time_keys_json" | ||||
|  | @ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): | |||
|                 " AND key_id = ?" | ||||
|             ) | ||||
|             for user_id, device_id, algorithm, key_id in delete: | ||||
|                 log_kv( | ||||
|                     { | ||||
|                         "message": "Executing claim e2e_one_time_keys transaction on database." | ||||
|                     } | ||||
|                 ) | ||||
|                 txn.execute(sql, (user_id, device_id, algorithm, key_id)) | ||||
|                 log_kv({"message": "finished executing and invalidating cache"}) | ||||
|                 self._invalidate_cache_and_stream( | ||||
|                     txn, self.count_e2e_one_time_keys, (user_id, device_id) | ||||
|                 ) | ||||
|  | @ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): | |||
| 
 | ||||
|     def delete_e2e_keys_by_device(self, user_id, device_id): | ||||
|         def delete_e2e_keys_by_device_txn(txn): | ||||
|             log_kv( | ||||
|                 { | ||||
|                     "message": "Deleting keys for device", | ||||
|                     "device_id": device_id, | ||||
|                     "user_id": user_id, | ||||
|                 } | ||||
|             ) | ||||
|             self._simple_delete_txn( | ||||
|                 txn, | ||||
|                 table="e2e_device_keys_json", | ||||
|  |  | |||
|  | @ -1302,15 +1302,11 @@ class EventsStore( | |||
|             "event_reference_hashes", | ||||
|             "event_search", | ||||
|             "event_to_state_groups", | ||||
|             "guest_access", | ||||
|             "history_visibility", | ||||
|             "local_invites", | ||||
|             "room_names", | ||||
|             "state_events", | ||||
|             "rejections", | ||||
|             "redactions", | ||||
|             "room_memberships", | ||||
|             "topics", | ||||
|         ): | ||||
|             txn.executemany( | ||||
|                 "DELETE FROM %s WHERE event_id = ?" % (table,), | ||||
|  | @ -1454,10 +1450,10 @@ class EventsStore( | |||
| 
 | ||||
|         for event, _ in events_and_contexts: | ||||
|             if event.type == EventTypes.Name: | ||||
|                 # Insert into the room_names and event_search tables. | ||||
|                 # Insert into the event_search table. | ||||
|                 self._store_room_name_txn(txn, event) | ||||
|             elif event.type == EventTypes.Topic: | ||||
|                 # Insert into the topics table and event_search table. | ||||
|                 # Insert into the event_search table. | ||||
|                 self._store_room_topic_txn(txn, event) | ||||
|             elif event.type == EventTypes.Message: | ||||
|                 # Insert into the event_search table. | ||||
|  | @ -1465,12 +1461,6 @@ class EventsStore( | |||
|             elif event.type == EventTypes.Redaction: | ||||
|                 # Insert into the redactions table. | ||||
|                 self._store_redaction(txn, event) | ||||
|             elif event.type == EventTypes.RoomHistoryVisibility: | ||||
|                 # Insert into the event_search table. | ||||
|                 self._store_history_visibility_txn(txn, event) | ||||
|             elif event.type == EventTypes.GuestAccess: | ||||
|                 # Insert into the event_search table. | ||||
|                 self._store_guest_access_txn(txn, event) | ||||
| 
 | ||||
|             self._handle_event_relations(txn, event) | ||||
| 
 | ||||
|  | @ -2191,6 +2181,143 @@ class EventsStore( | |||
| 
 | ||||
|         return to_delete, to_dedelta | ||||
| 
 | ||||
|     def purge_room(self, room_id): | ||||
|         """Deletes all record of a room | ||||
| 
 | ||||
|         Args: | ||||
|             room_id (str): | ||||
|         """ | ||||
| 
 | ||||
|         return self.runInteraction("purge_room", self._purge_room_txn, room_id) | ||||
| 
 | ||||
|     def _purge_room_txn(self, txn, room_id): | ||||
|         # first we have to delete the state groups states | ||||
|         logger.info("[purge] removing %s from state_groups_state", room_id) | ||||
| 
 | ||||
|         txn.execute( | ||||
|             """ | ||||
|             DELETE FROM state_groups_state WHERE state_group IN ( | ||||
|               SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) | ||||
|               WHERE events.room_id=? | ||||
|             ) | ||||
|             """, | ||||
|             (room_id,), | ||||
|         ) | ||||
| 
 | ||||
|         # ... and the state group edges | ||||
|         logger.info("[purge] removing %s from state_group_edges", room_id) | ||||
| 
 | ||||
|         txn.execute( | ||||
|             """ | ||||
|             DELETE FROM state_group_edges WHERE state_group IN ( | ||||
|               SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) | ||||
|               WHERE events.room_id=? | ||||
|             ) | ||||
|             """, | ||||
|             (room_id,), | ||||
|         ) | ||||
| 
 | ||||
|         # ... and the state groups | ||||
|         logger.info("[purge] removing %s from state_groups", room_id) | ||||
| 
 | ||||
|         txn.execute( | ||||
|             """ | ||||
|             DELETE FROM state_groups WHERE id IN ( | ||||
|               SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) | ||||
|               WHERE events.room_id=? | ||||
|             ) | ||||
|             """, | ||||
|             (room_id,), | ||||
|         ) | ||||
| 
 | ||||
|         # and then tables which lack an index on room_id but have one on event_id | ||||
|         for table in ( | ||||
|             "event_auth", | ||||
|             "event_edges", | ||||
|             "event_push_actions_staging", | ||||
|             "event_reference_hashes", | ||||
|             "event_relations", | ||||
|             "event_to_state_groups", | ||||
|             "redactions", | ||||
|             "rejections", | ||||
|             "state_events", | ||||
|         ): | ||||
|             logger.info("[purge] removing %s from %s", room_id, table) | ||||
| 
 | ||||
|             txn.execute( | ||||
|                 """ | ||||
|                 DELETE FROM %s WHERE event_id IN ( | ||||
|                   SELECT event_id FROM events WHERE room_id=? | ||||
|                 ) | ||||
|                 """ | ||||
|                 % (table,), | ||||
|                 (room_id,), | ||||
|             ) | ||||
| 
 | ||||
|         # and finally, the tables with an index on room_id (or no useful index) | ||||
|         for table in ( | ||||
|             "current_state_events", | ||||
|             "event_backward_extremities", | ||||
|             "event_forward_extremities", | ||||
|             "event_json", | ||||
|             "event_push_actions", | ||||
|             "event_search", | ||||
|             "events", | ||||
|             "group_rooms", | ||||
|             "public_room_list_stream", | ||||
|             "receipts_graph", | ||||
|             "receipts_linearized", | ||||
|             "room_aliases", | ||||
|             "room_depth", | ||||
|             "room_memberships", | ||||
|             "room_state", | ||||
|             "room_stats", | ||||
|             "room_stats_earliest_token", | ||||
|             "rooms", | ||||
|             "stream_ordering_to_exterm", | ||||
|             "topics", | ||||
|             "users_in_public_rooms", | ||||
|             "users_who_share_private_rooms", | ||||
|             # no useful index, but let's clear them anyway | ||||
|             "appservice_room_list", | ||||
|             "e2e_room_keys", | ||||
|             "event_push_summary", | ||||
|             "pusher_throttle", | ||||
|             "group_summary_rooms", | ||||
|             "local_invites", | ||||
|             "room_account_data", | ||||
|             "room_tags", | ||||
|         ): | ||||
|             logger.info("[purge] removing %s from %s", room_id, table) | ||||
|             txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,)) | ||||
| 
 | ||||
|         # Other tables we do NOT need to clear out: | ||||
|         # | ||||
|         #  - blocked_rooms | ||||
|         #    This is important, to make sure that we don't accidentally rejoin a blocked | ||||
|         #    room after it was purged | ||||
|         # | ||||
|         #  - user_directory | ||||
|         #    This has a room_id column, but it is unused | ||||
|         # | ||||
| 
 | ||||
|         # Other tables that we might want to consider clearing out include: | ||||
|         # | ||||
|         #  - event_reports | ||||
|         #       Given that these are intended for abuse management my initial | ||||
|         #       inclination is to leave them in place. | ||||
|         # | ||||
|         #  - current_state_delta_stream | ||||
|         #  - ex_outlier_stream | ||||
|         #  - room_tags_revisions | ||||
|         #       The problem with these is that they are largeish and there is no room_id | ||||
|         #       index on them. In any case we should be clearing out 'stream' tables | ||||
|         #       periodically anyway (#5888) | ||||
| 
 | ||||
|         # TODO: we could probably usefully do a bunch of cache invalidation here | ||||
| 
 | ||||
|         logger.info("[purge] done") | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def is_event_after(self, event_id1, event_id2): | ||||
|         """Returns True if event_id1 is after event_id2 in the stream | ||||
|  |  | |||
|  | @ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore): | |||
| 
 | ||||
|     def _store_room_topic_txn(self, txn, event): | ||||
|         if hasattr(event, "content") and "topic" in event.content: | ||||
|             self._simple_insert_txn( | ||||
|                 txn, | ||||
|                 "topics", | ||||
|                 { | ||||
|                     "event_id": event.event_id, | ||||
|                     "room_id": event.room_id, | ||||
|                     "topic": event.content["topic"], | ||||
|                 }, | ||||
|             ) | ||||
| 
 | ||||
|             self.store_event_search_txn( | ||||
|                 txn, event, "content.topic", event.content["topic"] | ||||
|             ) | ||||
| 
 | ||||
|     def _store_room_name_txn(self, txn, event): | ||||
|         if hasattr(event, "content") and "name" in event.content: | ||||
|             self._simple_insert_txn( | ||||
|                 txn, | ||||
|                 "room_names", | ||||
|                 { | ||||
|                     "event_id": event.event_id, | ||||
|                     "room_id": event.room_id, | ||||
|                     "name": event.content["name"], | ||||
|                 }, | ||||
|             ) | ||||
| 
 | ||||
|             self.store_event_search_txn( | ||||
|                 txn, event, "content.name", event.content["name"] | ||||
|             ) | ||||
|  | @ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore): | |||
|                 txn, event, "content.body", event.content["body"] | ||||
|             ) | ||||
| 
 | ||||
|     def _store_history_visibility_txn(self, txn, event): | ||||
|         self._store_content_index_txn(txn, event, "history_visibility") | ||||
| 
 | ||||
|     def _store_guest_access_txn(self, txn, event): | ||||
|         self._store_content_index_txn(txn, event, "guest_access") | ||||
| 
 | ||||
|     def _store_content_index_txn(self, txn, event, key): | ||||
|         if hasattr(event, "content") and key in event.content: | ||||
|             sql = ( | ||||
|                 "INSERT INTO %(key)s" | ||||
|                 " (event_id, room_id, %(key)s)" | ||||
|                 " VALUES (?, ?, ?)" % {"key": key} | ||||
|             ) | ||||
|             txn.execute(sql, (event.event_id, event.room_id, event.content[key])) | ||||
| 
 | ||||
|     def add_event_report( | ||||
|         self, room_id, event_id, user_id, reason, content, received_ts | ||||
|     ): | ||||
|  |  | |||
|  | @ -0,0 +1,20 @@ | |||
| /* Copyright 2019 The Matrix.org Foundation C.I.C. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| -- these tables are never used. | ||||
| DROP TABLE IF EXISTS room_names; | ||||
| DROP TABLE IF EXISTS topics; | ||||
| DROP TABLE IF EXISTS history_visibility; | ||||
| DROP TABLE IF EXISTS guest_access; | ||||
|  | @ -0,0 +1,18 @@ | |||
| /* Copyright 2019 Matrix.org Foundation CIC | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| -- version is supposed to be part of the room keys index | ||||
| CREATE UNIQUE INDEX e2e_room_keys_with_version_idx ON e2e_room_keys(user_id, version, room_id, session_id); | ||||
| DROP INDEX IF EXISTS e2e_room_keys_idx; | ||||
|  | @ -0,0 +1,17 @@ | |||
| /* Copyright 2019 Matrix.org Foundation CIC | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| -- this was apparently forgotten when the table was created back in delta 53. | ||||
| CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id); | ||||
|  | @ -37,11 +37,9 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): | |||
|         self.recoverer = Mock() | ||||
|         self.recoverer_fn = Mock(return_value=self.recoverer) | ||||
|         self.txnctrl = _TransactionController( | ||||
|             clock=self.clock, | ||||
|             store=self.store, | ||||
|             as_api=self.as_api, | ||||
|             recoverer_fn=self.recoverer_fn, | ||||
|             clock=self.clock, store=self.store, as_api=self.as_api | ||||
|         ) | ||||
|         self.txnctrl.RECOVERER_CLASS = self.recoverer_fn | ||||
| 
 | ||||
|     def test_single_service_up_txn_sent(self): | ||||
|         # Test: The AS is up and the txn is successfully sent. | ||||
|  |  | |||
|  | @ -73,8 +73,6 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
| 
 | ||||
|         self.mock_resolver = Mock() | ||||
| 
 | ||||
|         self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds) | ||||
| 
 | ||||
|         config_dict = default_config("test", parse=False) | ||||
|         config_dict["federation_custom_ca_list"] = [get_test_ca_cert_file()] | ||||
| 
 | ||||
|  | @ -82,11 +80,21 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         config.parse_config_dict(config_dict, "", "") | ||||
| 
 | ||||
|         self.tls_factory = ClientTLSOptionsFactory(config) | ||||
| 
 | ||||
|         self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds) | ||||
|         self.had_well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds) | ||||
|         self.well_known_resolver = WellKnownResolver( | ||||
|             self.reactor, | ||||
|             Agent(self.reactor, contextFactory=self.tls_factory), | ||||
|             well_known_cache=self.well_known_cache, | ||||
|             had_well_known_cache=self.had_well_known_cache, | ||||
|         ) | ||||
| 
 | ||||
|         self.agent = MatrixFederationAgent( | ||||
|             reactor=self.reactor, | ||||
|             tls_client_options_factory=self.tls_factory, | ||||
|             _srv_resolver=self.mock_resolver, | ||||
|             _well_known_cache=self.well_known_cache, | ||||
|             _well_known_resolver=self.well_known_resolver, | ||||
|         ) | ||||
| 
 | ||||
|     def _make_connection(self, client_factory, expected_sni): | ||||
|  | @ -543,7 +551,7 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         self.assertEqual(self.well_known_cache[b"testserv"], b"target-server") | ||||
| 
 | ||||
|         # check the cache expires | ||||
|         self.reactor.pump((25 * 3600,)) | ||||
|         self.reactor.pump((48 * 3600,)) | ||||
|         self.well_known_cache.expire() | ||||
|         self.assertNotIn(b"testserv", self.well_known_cache) | ||||
| 
 | ||||
|  | @ -631,7 +639,7 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         self.assertEqual(self.well_known_cache[b"testserv"], b"target-server") | ||||
| 
 | ||||
|         # check the cache expires | ||||
|         self.reactor.pump((25 * 3600,)) | ||||
|         self.reactor.pump((48 * 3600,)) | ||||
|         self.well_known_cache.expire() | ||||
|         self.assertNotIn(b"testserv", self.well_known_cache) | ||||
| 
 | ||||
|  | @ -701,11 +709,18 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
| 
 | ||||
|         config = default_config("test", parse=True) | ||||
| 
 | ||||
|         # Build a new agent and WellKnownResolver with a different tls factory | ||||
|         tls_factory = ClientTLSOptionsFactory(config) | ||||
|         agent = MatrixFederationAgent( | ||||
|             reactor=self.reactor, | ||||
|             tls_client_options_factory=ClientTLSOptionsFactory(config), | ||||
|             tls_client_options_factory=tls_factory, | ||||
|             _srv_resolver=self.mock_resolver, | ||||
|             _well_known_cache=self.well_known_cache, | ||||
|             _well_known_resolver=WellKnownResolver( | ||||
|                 self.reactor, | ||||
|                 Agent(self.reactor, contextFactory=tls_factory), | ||||
|                 well_known_cache=self.well_known_cache, | ||||
|                 had_well_known_cache=self.had_well_known_cache, | ||||
|             ), | ||||
|         ) | ||||
| 
 | ||||
|         test_d = agent.request(b"GET", b"matrix://testserv/foo/bar") | ||||
|  | @ -932,15 +947,9 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         self.successResultOf(test_d) | ||||
| 
 | ||||
|     def test_well_known_cache(self): | ||||
|         well_known_resolver = WellKnownResolver( | ||||
|             self.reactor, | ||||
|             Agent(self.reactor, contextFactory=self.tls_factory), | ||||
|             well_known_cache=self.well_known_cache, | ||||
|         ) | ||||
| 
 | ||||
|         self.reactor.lookups["testserv"] = "1.2.3.4" | ||||
| 
 | ||||
|         fetch_d = well_known_resolver.get_well_known(b"testserv") | ||||
|         fetch_d = self.well_known_resolver.get_well_known(b"testserv") | ||||
| 
 | ||||
|         # there should be an attempt to connect on port 443 for the .well-known | ||||
|         clients = self.reactor.tcpClients | ||||
|  | @ -963,7 +972,7 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         well_known_server.loseConnection() | ||||
| 
 | ||||
|         # repeat the request: it should hit the cache | ||||
|         fetch_d = well_known_resolver.get_well_known(b"testserv") | ||||
|         fetch_d = self.well_known_resolver.get_well_known(b"testserv") | ||||
|         r = self.successResultOf(fetch_d) | ||||
|         self.assertEqual(r.delegated_server, b"target-server") | ||||
| 
 | ||||
|  | @ -971,7 +980,7 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         self.reactor.pump((1000.0,)) | ||||
| 
 | ||||
|         # now it should connect again | ||||
|         fetch_d = well_known_resolver.get_well_known(b"testserv") | ||||
|         fetch_d = self.well_known_resolver.get_well_known(b"testserv") | ||||
| 
 | ||||
|         self.assertEqual(len(clients), 1) | ||||
|         (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) | ||||
|  | @ -992,15 +1001,9 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         it ignores transient errors. | ||||
|         """ | ||||
| 
 | ||||
|         well_known_resolver = WellKnownResolver( | ||||
|             self.reactor, | ||||
|             Agent(self.reactor, contextFactory=self.tls_factory), | ||||
|             well_known_cache=self.well_known_cache, | ||||
|         ) | ||||
| 
 | ||||
|         self.reactor.lookups["testserv"] = "1.2.3.4" | ||||
| 
 | ||||
|         fetch_d = well_known_resolver.get_well_known(b"testserv") | ||||
|         fetch_d = self.well_known_resolver.get_well_known(b"testserv") | ||||
| 
 | ||||
|         # there should be an attempt to connect on port 443 for the .well-known | ||||
|         clients = self.reactor.tcpClients | ||||
|  | @ -1026,27 +1029,37 @@ class MatrixFederationAgentTests(unittest.TestCase): | |||
|         # another lookup. | ||||
|         self.reactor.pump((900.0,)) | ||||
| 
 | ||||
|         fetch_d = well_known_resolver.get_well_known(b"testserv") | ||||
|         clients = self.reactor.tcpClients | ||||
|         (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) | ||||
|         fetch_d = self.well_known_resolver.get_well_known(b"testserv") | ||||
| 
 | ||||
|         # fonx the connection attempt, this will be treated as a temporary | ||||
|         # failure. | ||||
|         client_factory.clientConnectionFailed(None, Exception("nope")) | ||||
|         # The resolver may retry a few times, so fonx all requests that come along | ||||
|         attempts = 0 | ||||
|         while self.reactor.tcpClients: | ||||
|             clients = self.reactor.tcpClients | ||||
|             (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) | ||||
| 
 | ||||
|         # attemptdelay on the hostnameendpoint is 0.3, so takes that long before the | ||||
|         # .well-known request fails. | ||||
|         self.reactor.pump((0.4,)) | ||||
|             attempts += 1 | ||||
| 
 | ||||
|             # fonx the connection attempt, this will be treated as a temporary | ||||
|             # failure. | ||||
|             client_factory.clientConnectionFailed(None, Exception("nope")) | ||||
| 
 | ||||
|             # There's a few sleeps involved, so we have to pump the reactor a | ||||
|             # bit. | ||||
|             self.reactor.pump((1.0, 1.0)) | ||||
| 
 | ||||
|         # We expect to see more than one attempt as there was previously a valid | ||||
|         # well known. | ||||
|         self.assertGreater(attempts, 1) | ||||
| 
 | ||||
|         # Resolver should return cached value, despite the lookup failing. | ||||
|         r = self.successResultOf(fetch_d) | ||||
|         self.assertEqual(r.delegated_server, b"target-server") | ||||
| 
 | ||||
|         # Expire the cache and repeat the request | ||||
|         self.reactor.pump((100.0,)) | ||||
|         # Expire both caches and repeat the request | ||||
|         self.reactor.pump((10000.0,)) | ||||
| 
 | ||||
|         # Repated the request, this time it should fail if the lookup fails. | ||||
|         fetch_d = well_known_resolver.get_well_known(b"testserv") | ||||
|         fetch_d = self.well_known_resolver.get_well_known(b"testserv") | ||||
| 
 | ||||
|         clients = self.reactor.tcpClients | ||||
|         (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston