From 270e1c904a53c69512eec2d3818718d64efa7649 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 May 2017 16:51:05 +0100 Subject: [PATCH 1/9] Speed up calculating push rules --- synapse/push/bulk_push_rule_evaluator.py | 27 +++++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 760d567ca1..3da684c6b1 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -200,6 +200,10 @@ class RulesForRoom(object): # not update the cache with it. self.sequence = 0 + # A cache of user_ids that we *know* aren't interesting, e.g. user_ids + # owned by AS's, or remote users, etc. + self.uninteresting_user_set = set() + # We need to be clever on the invalidating caches callbacks, as # otherwise the invalidation callback holds a reference to the object, # potentially causing it to leak. @@ -231,10 +235,24 @@ class RulesForRoom(object): # Loop through to see which member events we've seen and have rules # for and which we need to fetch - for key, event_id in current_state_ids.iteritems(): + for key in current_state_ids: if key[0] != EventTypes.Member: continue + user_id = key[1] + if user_id in self.uninteresting_user_set: + continue + + if not self.is_mine_id(user_id): + self.uninteresting_user_set.add(user_id) + continue + + if self.store.get_if_app_services_interested_in_user(user_id): + self.uninteresting_user_set.add(user_id) + continue + + event_id = current_state_ids[key] + res = self.member_map.get(event_id, None) if res: user_id, state = res @@ -244,13 +262,6 @@ class RulesForRoom(object): ret_rules_by_user[user_id] = rules continue - user_id = key[1] - if not self.is_mine_id(user_id): - continue - - if self.store.get_if_app_services_interested_in_user(user_id): - continue - # If a user has left a room we remove their push rule. If they # joined then we readd it later in _update_rules_with_member_event_ids ret_rules_by_user.pop(user_id, None) From 25f03cf8e9600f68b72fb5843e1e2b789d064c2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 14:58:22 +0100 Subject: [PATCH 2/9] Use tuple unpacking --- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 3da684c6b1..f01a609e38 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -236,10 +236,10 @@ class RulesForRoom(object): # Loop through to see which member events we've seen and have rules # for and which we need to fetch for key in current_state_ids: - if key[0] != EventTypes.Member: + typ, user_id = key + if typ != EventTypes.Member: continue - user_id = key[1] if user_id in self.uninteresting_user_set: continue From 24c8f38784fc51945b54fc34f470c91192415c81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 14:59:27 +0100 Subject: [PATCH 3/9] Comment --- synapse/push/bulk_push_rule_evaluator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f01a609e38..8443e4b05c 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -201,7 +201,10 @@ class RulesForRoom(object): self.sequence = 0 # A cache of user_ids that we *know* aren't interesting, e.g. user_ids - # owned by AS's, or remote users, etc. + # owned by AS's, or remote users, etc. (I.e. users we will never need to + # calculate push for) + # These never need to be invalidated as we will never set up push for + # them. self.uninteresting_user_set = set() # We need to be clever on the invalidating caches callbacks, as From 2d17b09a6de8ac0951d30307aff767f103f5cd8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 15:01:36 +0100 Subject: [PATCH 4/9] Add debug logging --- synapse/push/bulk_push_rule_evaluator.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 8443e4b05c..6bf203993c 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -224,6 +224,7 @@ class RulesForRoom(object): with (yield self.linearizer.queue(())): if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) defer.returnValue(self.rules_by_user) ret_rules_by_user = {} @@ -236,6 +237,10 @@ class RulesForRoom(object): else: current_state_ids = context.current_state_ids + logger.debug( + "Looking for member changes in %r %r", state_group, current_state_ids + ) + # Loop through to see which member events we've seen and have rules # for and which we need to fetch for key in current_state_ids: @@ -273,10 +278,16 @@ class RulesForRoom(object): if missing_member_event_ids: # If we have some memebr events we haven't seen, look them up # and fetch push rules for them if appropriate. + logger.debug("Found new member events %r", missing_member_event_ids) yield self._update_rules_with_member_event_ids( ret_rules_by_user, missing_member_event_ids, state_group ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Returning push rules for %r %r", + self.room_id, ret_rules_by_user.keys(), + ) defer.returnValue(ret_rules_by_user) @defer.inlineCallbacks @@ -310,11 +321,17 @@ class RulesForRoom(object): for row in rows } + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Found members %r: %r", self.room_id, members.values()) + interested_in_user_ids = set( user_id for user_id, membership in members.itervalues() if membership == Membership.JOIN ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Joined: %r", interested_in_user_ids) + if_users_with_pushers = yield self.store.get_if_users_have_pushers( interested_in_user_ids, on_invalidate=self.invalidate_all_cb, @@ -324,10 +341,16 @@ class RulesForRoom(object): uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("With pushers: %r", user_ids) + users_with_receipts = yield self.store.get_users_with_read_receipts_in_room( self.room_id, on_invalidate=self.invalidate_all_cb, ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("With receipts: %r", users_with_receipts) + # any users with pushers must be ours: they have pushers for uid in users_with_receipts: if uid in interested_in_user_ids: @@ -348,6 +371,7 @@ class RulesForRoom(object): # as it keeps a reference to self and will stop this instance from being # GC'd if it gets dropped from the rules_to_user cache. Instead use # `self.invalidate_all_cb` + logger.debug("Invalidating RulesForRoom for %r", self.room_id) self.sequence += 1 self.state_group = object() self.member_map = {} From 7fb80b5eaeb00f27ac46a043e53341bd8a1d1cfc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 15:02:12 +0100 Subject: [PATCH 5/9] Check if current event is a membership event --- synapse/push/bulk_push_rule_evaluator.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 6bf203993c..2ee07f2f7e 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -53,7 +53,7 @@ class BulkPushRuleEvaluator(object): room_id = event.room_id rules_for_room = self._get_rules_for_room(room_id) - rules_by_user = yield rules_for_room.get_rules(context) + rules_by_user = yield rules_for_room.get_rules(event, context) # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited @@ -216,7 +216,7 @@ class RulesForRoom(object): self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id) @defer.inlineCallbacks - def get_rules(self, context): + def get_rules(self, event, context): """Given an event context return the rules for all users who are currently in the room. """ @@ -280,7 +280,7 @@ class RulesForRoom(object): # and fetch push rules for them if appropriate. logger.debug("Found new member events %r", missing_member_event_ids) yield self._update_rules_with_member_event_ids( - ret_rules_by_user, missing_member_event_ids, state_group + ret_rules_by_user, missing_member_event_ids, state_group, event ) if logger.isEnabledFor(logging.DEBUG): @@ -292,7 +292,7 @@ class RulesForRoom(object): @defer.inlineCallbacks def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids, - state_group): + state_group, event): """Update the partially filled rules_by_user dict by fetching rules for any newly joined users in the `member_event_ids` list. @@ -321,6 +321,11 @@ class RulesForRoom(object): for row in rows } + if event.type == EventTypes.Member: + for event_id in member_event_ids.itervalues(): + if event_id == event.event_id: + members[event_id] = (event.state_key, event.membership) + if logger.isEnabledFor(logging.DEBUG): logger.debug("Found members %r: %r", self.room_id, members.values()) From e3417a06e23c532e6502bdcdcaedac826e231d69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 15:04:42 +0100 Subject: [PATCH 6/9] Update list cache to handle one arg case We update the normal cache descriptors to handle caches with a single argument specially so that the key wasn't a 1-tuple. We need to update the cache list to be aware of this. --- synapse/util/caches/descriptors.py | 48 ++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 48dcbafeef..77a0d8e35d 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -404,6 +404,7 @@ class CacheDescriptor(_CacheDescriptorBase): wrapped.invalidate_all = cache.invalidate_all wrapped.cache = cache + wrapped.num_args = self.num_args obj.__dict__[self.orig.__name__] = wrapped @@ -451,8 +452,9 @@ class CacheListDescriptor(_CacheDescriptorBase): ) def __get__(self, obj, objtype=None): - - cache = getattr(obj, self.cached_method_name).cache + cached_method = getattr(obj, self.cached_method_name) + cache = cached_method.cache + num_args = cached_method.num_args @functools.wraps(self.orig) def wrapped(*args, **kwargs): @@ -470,11 +472,14 @@ class CacheListDescriptor(_CacheDescriptorBase): cached_defers = {} missing = [] for arg in list_args: - key = list(keyargs) - key[self.list_pos] = arg - try: - res = cache.get(tuple(key), callback=invalidate_callback) + if num_args == 1: + res = cache.get(arg, callback=invalidate_callback) + else: + key = list(keyargs) + key[self.list_pos] = arg + res = cache.get(tuple(key), callback=invalidate_callback) + if not isinstance(res, ObservableDeferred): results[arg] = res elif not res.has_succeeded(): @@ -505,17 +510,28 @@ class CacheListDescriptor(_CacheDescriptorBase): observer = ObservableDeferred(observer) - key = list(keyargs) - key[self.list_pos] = arg - cache.set( - tuple(key), observer, - callback=invalidate_callback - ) + if num_args == 1: + cache.set( + arg, observer, + callback=invalidate_callback + ) - def invalidate(f, key): - cache.invalidate(key) - return f - observer.addErrback(invalidate, tuple(key)) + def invalidate(f, key): + cache.invalidate(key) + return f + observer.addErrback(invalidate, arg) + else: + key = list(keyargs) + key[self.list_pos] = arg + cache.set( + tuple(key), observer, + callback=invalidate_callback + ) + + def invalidate(f, key): + cache.invalidate(key) + return f + observer.addErrback(invalidate, tuple(key)) res = observer.observe() res.addCallback(lambda r, arg: (arg, r), arg) From bd7bb5df717810eec0ae56d558a8413003d2ecaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 15:12:19 +0100 Subject: [PATCH 7/9] Pull out if statement from for loop --- synapse/util/caches/descriptors.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 77a0d8e35d..cbdff86596 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -471,14 +471,22 @@ class CacheListDescriptor(_CacheDescriptorBase): results = {} cached_defers = {} missing = [] + + # If the cache takes a single arg then that is used as the key, + # otherwise a tuple is used. + if num_args == 1: + def cache_get(arg): + return cache.get(arg, callback=invalidate_callback) + else: + key = list(keyargs) + + def cache_get(arg): + key[self.list_pos] = arg + return cache.get(tuple(key), callback=invalidate_callback) + for arg in list_args: try: - if num_args == 1: - res = cache.get(arg, callback=invalidate_callback) - else: - key = list(keyargs) - key[self.list_pos] = arg - res = cache.get(tuple(key), callback=invalidate_callback) + res = cache_get(arg) if not isinstance(res, ObservableDeferred): results[arg] = res From d668caa79c4f99b6d2b93c5b96e640e88f71a5c0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 16:21:06 +0100 Subject: [PATCH 8/9] Remove spurious log level guards --- synapse/push/bulk_push_rule_evaluator.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 2ee07f2f7e..354a2e64c3 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -334,8 +334,7 @@ class RulesForRoom(object): if membership == Membership.JOIN ) - if logger.isEnabledFor(logging.DEBUG): - logger.debug("Joined: %r", interested_in_user_ids) + logger.debug("Joined: %r", interested_in_user_ids) if_users_with_pushers = yield self.store.get_if_users_have_pushers( interested_in_user_ids, @@ -346,15 +345,13 @@ class RulesForRoom(object): uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher ) - if logger.isEnabledFor(logging.DEBUG): - logger.debug("With pushers: %r", user_ids) + logger.debug("With pushers: %r", user_ids) users_with_receipts = yield self.store.get_users_with_read_receipts_in_room( self.room_id, on_invalidate=self.invalidate_all_cb, ) - if logger.isEnabledFor(logging.DEBUG): - logger.debug("With receipts: %r", users_with_receipts) + logger.debug("With receipts: %r", users_with_receipts) # any users with pushers must be ours: they have pushers for uid in users_with_receipts: From 6489455bed4c46ee8ffa09a933e8a3289f2ae62a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 16:22:04 +0100 Subject: [PATCH 9/9] Comment --- synapse/push/bulk_push_rule_evaluator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 354a2e64c3..9a96e6fe8f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -321,6 +321,8 @@ class RulesForRoom(object): for row in rows } + # If the event is a join event then it will be in current state evnts + # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: for event_id in member_event_ids.itervalues(): if event_id == event.event_id: