From ab9a13fbb32b259327d919fea4fdb78338ebfc27 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 10:26:25 +0100 Subject: [PATCH] Wake up /sync when things other than room events happen --- synapse/app/synchrotron.py | 45 +++++++++++++++++++ .../replication/slave/storage/push_rule.py | 6 ++- synapse/replication/slave/storage/receipts.py | 10 ++++- .../replication/slave/storage/registration.py | 4 +- 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 9d31732960..66a24d2c4b 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -315,6 +315,32 @@ class SynchrotronServer(HomeServer): def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + def notify(result): stream = result.get("events") if stream: @@ -331,6 +357,25 @@ class SynchrotronServer(HomeServer): event, position, max_position, extra_users ) + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + next_expire_broken_caches_ms = 0 while True: try: diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 86fec353d3..21ceb0213a 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -54,9 +54,13 @@ class SlavedPushRuleStore(SlavedEventStore): stream = result.get("push_rules") if stream: for row in stream["rows"]: - user_id = row[1] + position = row[0] + user_id = row[2] self.get_push_rules_for_user.invalidate((user_id,)) self.get_push_rules_enabled_for_user.invalidate((user_id,)) + self.push_rules_stream_cache.entity_has_changed( + user_id, position + ) self._push_rules_stream_id_gen.advance(int(stream["position"])) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index d5a210e569..ac9662d399 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore from synapse.storage.receipts import ReceiptsStore +from synapse.util.caches.stream_change_cache import StreamChangeCache # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the @@ -37,6 +38,10 @@ class SlavedReceiptsStore(BaseSlavedStore): db_conn, "receipts_linearized", "stream_id" ) + self._receipts_stream_cache = StreamChangeCache( + "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() + ) + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] get_linearized_receipts_for_room = ( ReceiptsStore.__dict__["get_linearized_receipts_for_room"] @@ -65,14 +70,15 @@ class SlavedReceiptsStore(BaseSlavedStore): if stream: self._receipts_id_gen.advance(int(stream["position"])) for row in stream["rows"]: - room_id, receipt_type, user_id = row[1:4] + position, room_id, receipt_type, user_id = row[:4] self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + self._receipts_stream_cache.entity_has_changed(room_id, position) return super(SlavedReceiptsStore, self).process_replication(result) def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self.get_linearized_receipts_for_room.invalidate((room_id,)) + self.get_linearized_receipts_for_room.invalidate_many((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index e2d2e60d42..307833f9e1 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -22,9 +22,9 @@ class SlavedRegistrationStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedRegistrationStore, self).__init__(db_conn, hs) - # TODO: Remove deleted tokens from the cache + # TODO: use the cached version and invalidate deleted tokens get_user_by_access_token = RegistrationStore.__dict__[ "get_user_by_access_token" - ] + ].orig _query_for_auth = DataStore._query_for_auth.__func__