diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index b91c165e2b..06250d2d96 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -36,6 +36,7 @@ def decode_rule_json(rule): @defer.inlineCallbacks def _get_rules(room_id, user_ids, store): rules_by_user = yield store.bulk_get_push_rules(user_ids) + rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) rules_by_user = { uid: baserules.list_with_base_rules([ @@ -44,6 +45,22 @@ def _get_rules(room_id, user_ids, store): ]) for uid in user_ids } + + # We apply the rules-enabled map here: bulk_get_push_rules doesn't + # fetch disabled rules, but this won't account for any server default + # rules the user has disabled, so we need to do this too. + for uid in user_ids: + if uid not in rules_enabled_by_user: + continue + + user_enabled_map = rules_enabled_by_user[uid] + + for rule in rules_by_user[uid]: + rule_id = rule['rule_id'] + + if rule_id in user_enabled_map: + rule['enabled'] = user_enabled_map[rule_id] + defer.returnValue(rules_by_user) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index aa61cf5569..a05c4f84cf 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -40,14 +40,20 @@ class EventPushActionsStore(SQLBaseStore): 'actions': json.dumps(actions) }) + def f(txn): + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + return self._simple_insert_many_txn(txn, "event_push_actions", values) + yield self.runInteraction( "set_actions_for_event_and_users", - self._simple_insert_many_txn, - "event_push_actions", - values + f, ) - @cachedInlineCallbacks(num_args=3) + @cachedInlineCallbacks(num_args=3, lru=True, tree=True) def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): @@ -98,6 +104,11 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def remove_push_actions_for_event_id(self, room_id, event_id): def f(txn): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) txn.execute( "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", (room_id, event_id) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 2adfefd994..35ec7e8cef 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -93,6 +93,35 @@ class PushRuleStore(SQLBaseStore): results.setdefault(row['user_name'], []).append(row) defer.returnValue(results) + @defer.inlineCallbacks + def bulk_get_push_rules_enabled(self, user_ids): + if not user_ids: + defer.returnValue({}) + + batch_size = 100 + + def f(txn, user_ids_to_fetch): + sql = ( + "SELECT user_name, rule_id, enabled" + " FROM push_rules_enable" + " WHERE user_name" + " IN (" + ",".join("?" for _ in user_ids_to_fetch) + ")" + ) + txn.execute(sql, user_ids_to_fetch) + return self.cursor_to_dict(txn) + + results = {} + + chunks = [user_ids[i:i+batch_size] for i in xrange(0, len(user_ids), batch_size)] + for batch_user_ids in chunks: + rows = yield self.runInteraction( + "bulk_get_push_rules_enabled", f, batch_user_ids + ) + + for row in rows: + results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] + defer.returnValue(results) + @defer.inlineCallbacks def add_push_rule(self, before, after, **kwargs): vals = kwargs diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 0033051849..88e56e3302 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -17,6 +17,7 @@ import logging from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError from synapse.util.caches.lrucache import LruCache +from synapse.util.caches.treecache import TreeCache from . import caches_by_name, DEBUG_CACHES, cache_counter @@ -36,9 +37,12 @@ _CacheSentinel = object() class Cache(object): - def __init__(self, name, max_entries=1000, keylen=1, lru=True): + def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): if lru: - self.cache = LruCache(max_size=max_entries) + cache_type = TreeCache if tree else dict + self.cache = LruCache( + max_size=max_entries, keylen=keylen, cache_type=cache_type + ) self.max_entries = None else: self.cache = OrderedDict() @@ -99,6 +103,15 @@ class Cache(object): self.sequence += 1 self.cache.pop(key, None) + def invalidate_many(self, key): + self.check_thread() + if not isinstance(key, tuple): + raise TypeError( + "The cache key must be a tuple not %r" % (type(key),) + ) + self.sequence += 1 + self.cache.del_multi(key) + def invalidate_all(self): self.check_thread() self.sequence += 1 @@ -122,7 +135,7 @@ class CacheDescriptor(object): which can be used to insert values into the cache specifically, without calling the calculation function. """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=True, + def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False, inlineCallbacks=False): self.orig = orig @@ -134,6 +147,7 @@ class CacheDescriptor(object): self.max_entries = max_entries self.num_args = num_args self.lru = lru + self.tree = tree self.arg_names = inspect.getargspec(orig).args[1:num_args+1] @@ -149,6 +163,7 @@ class CacheDescriptor(object): max_entries=self.max_entries, keylen=self.num_args, lru=self.lru, + tree=self.tree, ) def __get__(self, obj, objtype=None): @@ -200,6 +215,7 @@ class CacheDescriptor(object): wrapped.invalidate = self.cache.invalidate wrapped.invalidate_all = self.cache.invalidate_all + wrapped.invalidate_many = self.cache.invalidate_many wrapped.prefill = self.cache.prefill obj.__dict__[self.orig.__name__] = wrapped @@ -321,21 +337,23 @@ class CacheListDescriptor(object): return wrapped -def cached(max_entries=1000, num_args=1, lru=True): - return lambda orig: CacheDescriptor( - orig, - max_entries=max_entries, - num_args=num_args, - lru=lru - ) - - -def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False): +def cached(max_entries=1000, num_args=1, lru=True, tree=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, lru=lru, + tree=tree, + ) + + +def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False): + return lambda orig: CacheDescriptor( + orig, + max_entries=max_entries, + num_args=num_args, + lru=lru, + tree=tree, inlineCallbacks=True, ) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 0122b0bb3f..e6a66dc041 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -17,11 +17,27 @@ from functools import wraps import threading +from synapse.util.caches.treecache import TreeCache + + +def enumerate_leaves(node, depth): + if depth == 0: + yield node + else: + for n in node.values(): + for m in enumerate_leaves(n, depth - 1): + yield m + class LruCache(object): - """Least-recently-used cache.""" - def __init__(self, max_size): - cache = {} + """ + Least-recently-used cache. + Supports del_multi only if cache_type=TreeCache + If cache_type=TreeCache, all keys must be tuples. + """ + def __init__(self, max_size, keylen=1, cache_type=dict): + cache = cache_type() + self.size = 0 list_root = [] list_root[:] = [list_root, list_root, None, None] @@ -44,6 +60,7 @@ class LruCache(object): prev_node[NEXT] = node next_node[PREV] = node cache[key] = node + self.size += 1 def move_node_to_front(node): prev_node = node[PREV] @@ -62,7 +79,7 @@ class LruCache(object): next_node = node[NEXT] prev_node[NEXT] = next_node next_node[PREV] = prev_node - cache.pop(node[KEY], None) + self.size -= 1 @synchronized def cache_get(key, default=None): @@ -81,8 +98,10 @@ class LruCache(object): node[VALUE] = value else: add_node(key, value) - if len(cache) > max_size: - delete_node(list_root[PREV]) + if self.size > max_size: + todelete = list_root[PREV] + delete_node(todelete) + cache.pop(todelete[KEY], None) @synchronized def cache_set_default(key, value): @@ -91,8 +110,10 @@ class LruCache(object): return node[VALUE] else: add_node(key, value) - if len(cache) > max_size: - delete_node(list_root[PREV]) + if self.size > max_size: + todelete = list_root[PREV] + delete_node(todelete) + cache.pop(todelete[KEY], None) return value @synchronized @@ -100,10 +121,22 @@ class LruCache(object): node = cache.get(key, None) if node: delete_node(node) + cache.pop(node[KEY], None) return node[VALUE] else: return default + @synchronized + def cache_del_multi(key): + """ + This will only work if constructed with cache_type=TreeCache + """ + popped = cache.pop(key) + if popped is None: + return + for leaf in enumerate_leaves(popped, keylen - len(key)): + delete_node(leaf) + @synchronized def cache_clear(): list_root[NEXT] = list_root @@ -112,7 +145,7 @@ class LruCache(object): @synchronized def cache_len(): - return len(cache) + return self.size @synchronized def cache_contains(key): @@ -123,6 +156,8 @@ class LruCache(object): self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop + if cache_type is TreeCache: + self.del_multi = cache_del_multi self.len = cache_len self.contains = cache_contains self.clear = cache_clear diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py new file mode 100644 index 0000000000..3b58860910 --- /dev/null +++ b/synapse/util/caches/treecache.py @@ -0,0 +1,60 @@ +SENTINEL = object() + + +class TreeCache(object): + """ + Tree-based backing store for LruCache. Allows subtrees of data to be deleted + efficiently. + Keys must be tuples. + """ + def __init__(self): + self.root = {} + + def __setitem__(self, key, value): + return self.set(key, value) + + def __contains__(self, key): + return self.get(key, SENTINEL) is not SENTINEL + + def set(self, key, value): + node = self.root + for k in key[:-1]: + node = node.setdefault(k, {}) + node[key[-1]] = value + + def get(self, key, default=None): + node = self.root + for k in key[:-1]: + node = node.get(k, None) + if node is None: + return default + return node.get(key[-1], default) + + def clear(self): + self.root = {} + + def pop(self, key, default=None): + nodes = [] + + node = self.root + for k in key[:-1]: + node = node.get(k, None) + nodes.append(node) # don't add the root node + if node is None: + return default + popped = node.pop(key[-1], SENTINEL) + if popped is SENTINEL: + return default + + node_and_keys = zip(nodes, key) + node_and_keys.reverse() + node_and_keys.append((self.root, None)) + + for i in range(len(node_and_keys) - 1): + n, k = node_and_keys[i] + + if n: + break + node_and_keys[i+1][0].pop(k) + + return popped diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index fbbc5eed15..2cd3d26454 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -17,6 +17,7 @@ from .. import unittest from synapse.util.caches.lrucache import LruCache +from synapse.util.caches.treecache import TreeCache class LruCacheTestCase(unittest.TestCase): @@ -52,3 +53,22 @@ class LruCacheTestCase(unittest.TestCase): cache["key"] = 1 self.assertEquals(cache.pop("key"), 1) self.assertEquals(cache.pop("key"), None) + + def test_del_multi(self): + cache = LruCache(4, 2, cache_type=TreeCache) + cache[("animal", "cat")] = "mew" + cache[("animal", "dog")] = "woof" + cache[("vehicles", "car")] = "vroom" + cache[("vehicles", "train")] = "chuff" + + self.assertEquals(len(cache), 4) + + self.assertEquals(cache.get(("animal", "cat")), "mew") + self.assertEquals(cache.get(("vehicles", "car")), "vroom") + cache.del_multi(("animal",)) + self.assertEquals(len(cache), 2) + self.assertEquals(cache.get(("animal", "cat")), None) + self.assertEquals(cache.get(("animal", "dog")), None) + self.assertEquals(cache.get(("vehicles", "car")), "vroom") + self.assertEquals(cache.get(("vehicles", "train")), "chuff") + # Man from del_multi say "Yes". diff --git a/tests/util/test_treecache.py b/tests/util/test_treecache.py new file mode 100644 index 0000000000..9946ceb3f1 --- /dev/null +++ b/tests/util/test_treecache.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .. import unittest + +from synapse.util.caches.treecache import TreeCache + +class TreeCacheTestCase(unittest.TestCase): + def test_get_set_onelevel(self): + cache = TreeCache() + cache[("a",)] = "A" + cache[("b",)] = "B" + self.assertEquals(cache.get(("a",)), "A") + self.assertEquals(cache.get(("b",)), "B") + + def test_pop_onelevel(self): + cache = TreeCache() + cache[("a",)] = "A" + cache[("b",)] = "B" + self.assertEquals(cache.pop(("a",)), "A") + self.assertEquals(cache.pop(("a",)), None) + self.assertEquals(cache.get(("b",)), "B") + + def test_get_set_twolevel(self): + cache = TreeCache() + cache[("a", "a")] = "AA" + cache[("a", "b")] = "AB" + cache[("b", "a")] = "BA" + self.assertEquals(cache.get(("a", "a")), "AA") + self.assertEquals(cache.get(("a", "b")), "AB") + self.assertEquals(cache.get(("b", "a")), "BA") + + def test_pop_twolevel(self): + cache = TreeCache() + cache[("a", "a")] = "AA" + cache[("a", "b")] = "AB" + cache[("b", "a")] = "BA" + self.assertEquals(cache.pop(("a", "a")), "AA") + self.assertEquals(cache.get(("a", "a")), None) + self.assertEquals(cache.get(("a", "b")), "AB") + self.assertEquals(cache.pop(("b", "a")), "BA") + self.assertEquals(cache.pop(("b", "a")), None) + + def test_pop_mixedlevel(self): + cache = TreeCache() + cache[("a", "a")] = "AA" + cache[("a", "b")] = "AB" + cache[("b", "a")] = "BA" + self.assertEquals(cache.get(("a", "a")), "AA") + cache.pop(("a",)) + self.assertEquals(cache.get(("a", "a")), None) + self.assertEquals(cache.get(("a", "b")), None) + self.assertEquals(cache.get(("b", "a")), "BA")