Use state to calculate get_users_in_room

erikj/timings
Erik Johnston 2016-06-01 11:13:08 +01:00
parent 143ff10212
commit 0417a8607b
5 changed files with 38 additions and 25 deletions

View File

@ -42,7 +42,7 @@ class ActionGenerator:
with Measure(self.clock, "handle_push_actions_for_event"): with Measure(self.clock, "handle_push_actions_for_event"):
with log_duration("evaluator_for_event"): with log_duration("evaluator_for_event"):
bulk_evaluator = yield evaluator_for_event( bulk_evaluator = yield evaluator_for_event(
event, self.hs, self.store event, self.hs, self.store, context.current_state
) )
with log_duration("action_for_event_by_user"): with log_duration("action_for_event_by_user"):

View File

@ -79,11 +79,6 @@ def _get_rules(room_id, user_ids, store):
def evaluator_for_event(event, hs, store, current_state): def evaluator_for_event(event, hs, store, current_state):
room_id = event.room_id room_id = event.room_id
# users in the room who have pushers need to get push rules run because
# that's how their pushers work
with log_duration("get_users_with_pushers_in_room"):
users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
# We also will want to generate notifs for other people in the room so # We also will want to generate notifs for other people in the room so
# their unread countss are correct in the event stream, but to avoid # their unread countss are correct in the event stream, but to avoid
# generating them for bot / AS users etc, we only do so for people who've # generating them for bot / AS users etc, we only do so for people who've
@ -95,6 +90,14 @@ def evaluator_for_event(event, hs, store, current_state):
if e.type == EventTypes.Member and e.membership == Membership.JOIN if e.type == EventTypes.Member and e.membership == Membership.JOIN
) )
# users in the room who have pushers need to get push rules run because
# that's how their pushers work
with log_duration("get_users_with_pushers_in_room"):
if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room)
users_with_pushers = set(
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
)
with log_duration("get_receipts_for_room"): with log_duration("get_receipts_for_room"):
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id) users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)

View File

@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
)
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))

View File

@ -18,7 +18,7 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging import logging
import simplejson as json import simplejson as json
@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn "get_all_updated_pushers", get_all_updated_pushers_txn
) )
@cachedInlineCallbacks(num_args=1) @cachedInlineCallbacks(lru=True, num_args=1)
def get_users_with_pushers_in_room(self, room_id): def get_if_user_has_pusher(self, user_id):
users = yield self.get_users_in_room(room_id)
result = yield self._simple_select_many_batch( result = yield self._simple_select_many_batch(
table='pushers', table='pushers',
column='user_name', keyvalues={
iterable=users, 'user_name': 'user_id',
retcols=['user_name'], },
desc='get_users_with_pushers_in_room' retcol='user_name',
desc='get_if_user_has_pusher',
allow_none=True,
) )
defer.returnValue([r['user_name'] for r in result]) defer.returnValue(bool(result))
@cachedList(cached_method_name="get_if_user_has_pusher",
list_name="user_ids", num_args=1, inlineCallbacks=True)
def get_if_users_have_pushers(self, user_ids):
rows = yield self._simple_select_many_batch(
table='pushers',
column='user_name',
iterable=user_ids,
retcols=['user_name'],
desc='get_if_users_have_pushers'
)
result = {user_id: False for user_id in user_ids}
result.update({r['user_name']: True for r in rows})
defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id, def add_pusher(self, user_id, access_token, kind, app_id,
@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
}, },
) )
if newly_inserted: if newly_inserted:
# get_users_with_pushers_in_room only cares if the user has # get_if_user_has_pusher only cares if the user has
# at least *one* pusher. # at least *one* pusher.
txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
yield self.runInteraction("add_pusher", f) yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id): def delete_pusher_txn(txn, stream_id):
txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
self._simple_delete_one_txn( self._simple_delete_one_txn(
txn, txn,

View File

@ -58,9 +58,6 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(
self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
)
txn.call_after( txn.call_after(
self._membership_stream_cache.entity_has_changed, self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering event.state_key, event.internal_metadata.stream_ordering