Merge pull request #6506 from matrix-org/erikj/remove_snapshot_cache
Remove SnapshotCache in favour of ResponseCachepull/6503/head
						commit
						e3f528c544
					
				|  | @ -0,0 +1 @@ | |||
| Remove `SnapshotCache` in favour of `ResponseCache`. | ||||
|  | @ -26,7 +26,7 @@ from synapse.streams.config import PaginationConfig | |||
| from synapse.types import StreamToken, UserID | ||||
| from synapse.util import unwrapFirstError | ||||
| from synapse.util.async_helpers import concurrently_execute | ||||
| from synapse.util.caches.snapshot_cache import SnapshotCache | ||||
| from synapse.util.caches.response_cache import ResponseCache | ||||
| from synapse.visibility import filter_events_for_client | ||||
| 
 | ||||
| from ._base import BaseHandler | ||||
|  | @ -41,7 +41,7 @@ class InitialSyncHandler(BaseHandler): | |||
|         self.state = hs.get_state_handler() | ||||
|         self.clock = hs.get_clock() | ||||
|         self.validator = EventValidator() | ||||
|         self.snapshot_cache = SnapshotCache() | ||||
|         self.snapshot_cache = ResponseCache(hs, "initial_sync_cache") | ||||
|         self._event_serializer = hs.get_event_client_serializer() | ||||
|         self.storage = hs.get_storage() | ||||
|         self.state_store = self.storage.state | ||||
|  | @ -79,17 +79,14 @@ class InitialSyncHandler(BaseHandler): | |||
|             as_client_event, | ||||
|             include_archived, | ||||
|         ) | ||||
|         now_ms = self.clock.time_msec() | ||||
|         result = self.snapshot_cache.get(now_ms, key) | ||||
|         if result is not None: | ||||
|             return result | ||||
| 
 | ||||
|         return self.snapshot_cache.set( | ||||
|             now_ms, | ||||
|         return self.snapshot_cache.wrap( | ||||
|             key, | ||||
|             self._snapshot_all_rooms( | ||||
|                 user_id, pagin_config, as_client_event, include_archived | ||||
|             ), | ||||
|             self._snapshot_all_rooms, | ||||
|             user_id, | ||||
|             pagin_config, | ||||
|             as_client_event, | ||||
|             include_archived, | ||||
|         ) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|  |  | |||
|  | @ -1,94 +0,0 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2015, 2016 OpenMarket Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| from synapse.util.async_helpers import ObservableDeferred | ||||
| 
 | ||||
| 
 | ||||
| class SnapshotCache(object): | ||||
|     """Cache for snapshots like the response of /initialSync. | ||||
|     The response of initialSync only has to be a recent snapshot of the | ||||
|     server state. It shouldn't matter to clients if it is a few minutes out | ||||
|     of date. | ||||
| 
 | ||||
|     This caches a deferred response. Until the deferred completes it will be | ||||
|     returned from the cache. This means that if the client retries the request | ||||
|     while the response is still being computed, that original response will be | ||||
|     used rather than trying to compute a new response. | ||||
| 
 | ||||
|     Once the deferred completes it will removed from the cache after 5 minutes. | ||||
|     We delay removing it from the cache because a client retrying its request | ||||
|     could race with us finishing computing the response. | ||||
| 
 | ||||
|     Rather than tracking precisely how long something has been in the cache we | ||||
|     keep two generations of completed responses. Every 5 minutes discard the | ||||
|     old generation, move the new generation to the old generation, and set the | ||||
|     new generation to be empty. This means that a result will be in the cache | ||||
|     somewhere between 5 and 10 minutes. | ||||
|     """ | ||||
| 
 | ||||
|     DURATION_MS = 5 * 60 * 1000  # Cache results for 5 minutes. | ||||
| 
 | ||||
|     def __init__(self): | ||||
|         self.pending_result_cache = {}  # Request that haven't finished yet. | ||||
|         self.prev_result_cache = {}  # The older requests that have finished. | ||||
|         self.next_result_cache = {}  # The newer requests that have finished. | ||||
|         self.time_last_rotated_ms = 0 | ||||
| 
 | ||||
|     def rotate(self, time_now_ms): | ||||
|         # Rotate once if the cache duration has passed since the last rotation. | ||||
|         if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: | ||||
|             self.prev_result_cache = self.next_result_cache | ||||
|             self.next_result_cache = {} | ||||
|             self.time_last_rotated_ms += self.DURATION_MS | ||||
| 
 | ||||
|         # Rotate again if the cache duration has passed twice since the last | ||||
|         # rotation. | ||||
|         if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: | ||||
|             self.prev_result_cache = self.next_result_cache | ||||
|             self.next_result_cache = {} | ||||
|             self.time_last_rotated_ms = time_now_ms | ||||
| 
 | ||||
|     def get(self, time_now_ms, key): | ||||
|         self.rotate(time_now_ms) | ||||
|         # This cache is intended to deduplicate requests, so we expect it to be | ||||
|         # missed most of the time. So we just lookup the key in all of the | ||||
|         # dictionaries rather than trying to short circuit the lookup if the | ||||
|         # key is found. | ||||
|         result = self.prev_result_cache.get(key) | ||||
|         result = self.next_result_cache.get(key, result) | ||||
|         result = self.pending_result_cache.get(key, result) | ||||
|         if result is not None: | ||||
|             return result.observe() | ||||
|         else: | ||||
|             return None | ||||
| 
 | ||||
|     def set(self, time_now_ms, key, deferred): | ||||
|         self.rotate(time_now_ms) | ||||
| 
 | ||||
|         result = ObservableDeferred(deferred) | ||||
| 
 | ||||
|         self.pending_result_cache[key] = result | ||||
| 
 | ||||
|         def shuffle_along(r): | ||||
|             # When the deferred completes we shuffle it along to the first | ||||
|             # generation of the result cache. So that it will eventually | ||||
|             # expire from the rotation of that cache. | ||||
|             self.next_result_cache[key] = result | ||||
|             self.pending_result_cache.pop(key, None) | ||||
|             return r | ||||
| 
 | ||||
|         result.addBoth(shuffle_along) | ||||
| 
 | ||||
|         return result.observe() | ||||
|  | @ -1,63 +0,0 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| # Copyright 2015, 2016 OpenMarket Ltd | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| 
 | ||||
| from twisted.internet.defer import Deferred | ||||
| 
 | ||||
| from synapse.util.caches.snapshot_cache import SnapshotCache | ||||
| 
 | ||||
| from .. import unittest | ||||
| 
 | ||||
| 
 | ||||
| class SnapshotCacheTestCase(unittest.TestCase): | ||||
|     def setUp(self): | ||||
|         self.cache = SnapshotCache() | ||||
|         self.cache.DURATION_MS = 1 | ||||
| 
 | ||||
|     def test_get_set(self): | ||||
|         # Check that getting a missing key returns None | ||||
|         self.assertEquals(self.cache.get(0, "key"), None) | ||||
| 
 | ||||
|         # Check that setting a key with a deferred returns | ||||
|         # a deferred that resolves when the initial deferred does | ||||
|         d = Deferred() | ||||
|         set_result = self.cache.set(0, "key", d) | ||||
|         self.assertIsNotNone(set_result) | ||||
|         self.assertFalse(set_result.called) | ||||
| 
 | ||||
|         # Check that getting the key before the deferred has resolved | ||||
|         # returns a deferred that resolves when the initial deferred does. | ||||
|         get_result_at_10 = self.cache.get(10, "key") | ||||
|         self.assertIsNotNone(get_result_at_10) | ||||
|         self.assertFalse(get_result_at_10.called) | ||||
| 
 | ||||
|         # Check that the returned deferreds resolve when the initial deferred | ||||
|         # does. | ||||
|         d.callback("v") | ||||
|         self.assertTrue(set_result.called) | ||||
|         self.assertTrue(get_result_at_10.called) | ||||
| 
 | ||||
|         # Check that getting the key after the deferred has resolved | ||||
|         # before the cache expires returns a resolved deferred. | ||||
|         get_result_at_11 = self.cache.get(11, "key") | ||||
|         self.assertIsNotNone(get_result_at_11) | ||||
|         if isinstance(get_result_at_11, Deferred): | ||||
|             # The cache may return the actual result rather than a deferred | ||||
|             self.assertTrue(get_result_at_11.called) | ||||
| 
 | ||||
|         # Check that getting the key after the deferred has resolved | ||||
|         # after the cache expires returns None | ||||
|         get_result_at_12 = self.cache.get(12, "key") | ||||
|         self.assertIsNone(get_result_at_12) | ||||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston