Merge remote-tracking branch 'origin/erikj/bulk_get_push_rules' into markjh/table_name
Conflicts: synapse/storage/push_rule.pypull/482/head
commit
f4dad9f639
|
@ -36,10 +36,6 @@ def stopped_user_eventstream(distributor, user):
|
||||||
return distributor.fire("stopped_user_eventstream", user)
|
return distributor.fire("stopped_user_eventstream", user)
|
||||||
|
|
||||||
|
|
||||||
def user_joined_room(distributor, user, room_id):
|
|
||||||
return distributor.fire("user_joined_room", user, room_id)
|
|
||||||
|
|
||||||
|
|
||||||
class EventStreamHandler(BaseHandler):
|
class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -136,9 +132,6 @@ class EventStreamHandler(BaseHandler):
|
||||||
# thundering herds on restart.
|
# thundering herds on restart.
|
||||||
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
|
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
|
||||||
|
|
||||||
if is_guest:
|
|
||||||
yield user_joined_room(self.distributor, auth_user, room_id)
|
|
||||||
|
|
||||||
events, tokens = yield self.notifier.get_events_for(
|
events, tokens = yield self.notifier.get_events_for(
|
||||||
auth_user, pagin_config, timeout,
|
auth_user, pagin_config, timeout,
|
||||||
only_room_events=only_room_events,
|
only_room_events=only_room_events,
|
||||||
|
|
|
@ -585,6 +585,7 @@ class SyncHandler(BaseHandler):
|
||||||
sync_config, leave_event, since_token, tags_by_room,
|
sync_config, leave_event, since_token, tags_by_room,
|
||||||
account_data_by_room
|
account_data_by_room
|
||||||
)
|
)
|
||||||
|
if room_sync:
|
||||||
archived.append(room_sync)
|
archived.append(room_sync)
|
||||||
|
|
||||||
invited = [
|
invited = [
|
||||||
|
@ -726,6 +727,9 @@ class SyncHandler(BaseHandler):
|
||||||
|
|
||||||
leave_token = since_token.copy_and_replace("room_key", stream_token)
|
leave_token = since_token.copy_and_replace("room_key", stream_token)
|
||||||
|
|
||||||
|
if since_token.is_after(leave_token):
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
batch = yield self.load_filtered_recents(
|
batch = yield self.load_filtered_recents(
|
||||||
leave_event.room_id, sync_config, leave_token, since_token,
|
leave_event.room_id, sync_config, leave_token, since_token,
|
||||||
)
|
)
|
||||||
|
|
|
@ -62,13 +62,14 @@ class PushRuleStore(SQLBaseStore):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def bulk_get_push_rules(self, user_ids):
|
def bulk_get_push_rules(self, user_ids):
|
||||||
|
if not user_ids:
|
||||||
|
defer.returnValue({})
|
||||||
|
|
||||||
batch_size = 100
|
batch_size = 100
|
||||||
|
|
||||||
def f(txn, user_ids_to_fetch):
|
def f(txn, user_ids_to_fetch):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT"
|
"SELECT pr.*"
|
||||||
" pr.user_name, pr.rule_id, priority_class, priority,"
|
|
||||||
" conditions, actions"
|
|
||||||
" FROM push_rules AS pr"
|
" FROM push_rules AS pr"
|
||||||
" LEFT JOIN push_rules_enable AS pre"
|
" LEFT JOIN push_rules_enable AS pre"
|
||||||
" ON pr.user_name = pre.user_name AND pr.rule_id = pre.rule_id"
|
" ON pr.user_name = pre.user_name AND pr.rule_id = pre.rule_id"
|
||||||
|
@ -78,29 +79,18 @@ class PushRuleStore(SQLBaseStore):
|
||||||
" ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC"
|
" ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC"
|
||||||
)
|
)
|
||||||
txn.execute(sql, user_ids_to_fetch)
|
txn.execute(sql, user_ids_to_fetch)
|
||||||
return txn.fetchall()
|
return self.cursor_to_dict(txn)
|
||||||
|
|
||||||
results = {}
|
results = {}
|
||||||
|
|
||||||
batch_start = 0
|
chunks = [user_ids[i:i+batch_size] for i in xrange(0, len(user_ids), batch_size)]
|
||||||
while batch_start < len(user_ids):
|
for batch_user_ids in chunks:
|
||||||
batch_end = min(len(user_ids), batch_size)
|
|
||||||
batch_user_ids = user_ids[batch_start:batch_end]
|
|
||||||
batch_start = batch_end
|
|
||||||
|
|
||||||
rows = yield self.runInteraction(
|
rows = yield self.runInteraction(
|
||||||
"bulk_get_push_rules", f, batch_user_ids
|
"bulk_get_push_rules", f, batch_user_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
cols = (
|
|
||||||
"user_name", "rule_id", "priority_class", "priority",
|
|
||||||
"conditions", "actions",
|
|
||||||
)
|
|
||||||
|
|
||||||
for row in rows:
|
for row in rows:
|
||||||
rawdict = dict(zip(cols, rows))
|
results.setdefault(row['user_name'], []).append(row)
|
||||||
results.setdefault(rawdict["user_name"], []).append(rawdict)
|
|
||||||
|
|
||||||
defer.returnValue(results)
|
defer.returnValue(results)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
Loading…
Reference in New Issue