Merge pull request #2874 from matrix-org/erikj/creator_push_actions

Store push actions in DB staging area instead of context
pull/2876/merge
Erik Johnston 2018-02-16 11:52:51 +00:00 committed by GitHub
commit bf5ef10a93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 138 additions and 44 deletions

View File

@ -52,7 +52,6 @@ class EventContext(object):
"prev_state_ids",
"state_group",
"rejected",
"push_actions",
"prev_group",
"delta_ids",
"prev_state_events",
@ -67,7 +66,6 @@ class EventContext(object):
self.state_group = None
self.rejected = False
self.push_actions = []
# A previously persisted state group and a delta between that
# and this state.
@ -104,7 +102,6 @@ class EventContext(object):
"event_state_key": event.state_key if event.is_state() else None,
"state_group": self.state_group,
"rejected": self.rejected,
"push_actions": self.push_actions,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
@ -127,7 +124,6 @@ class EventContext(object):
context = EventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.push_actions = input["push_actions"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]

View File

@ -683,9 +683,15 @@ class EventCreationHandler(object):
event, context
)
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
try:
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
raise
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.

View File

@ -40,10 +40,6 @@ class ActionGenerator(object):
@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "action_for_event_by_user"):
actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
yield self.bulk_evaluator.action_for_event_by_user(
event, context
)
context.push_actions = [
(uid, actions) for uid, actions in actions_by_user.iteritems()
]

View File

@ -137,14 +137,13 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
"""Given an event and context, evaluate the push rules and return
the results
"""Given an event and context, evaluate the push rules and insert the
results into the event_push_actions_staging table.
Returns:
dict of user_id -> action
Deferred
"""
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
room_members = yield self.store.get_joined_users_from_context(
event, context
@ -190,9 +189,13 @@ class BulkPushRuleEvaluator(object):
if matches:
actions = [x for x in rule['actions'] if x != 'dont_notify']
if actions and 'notify' in actions:
actions_by_user[uid] = actions
# Push rules say we should notify the user of this event,
# so we mark it in the DB in the staging area. (This
# will then get handled when we persist the event)
yield self.store.add_push_actions_to_staging(
event.event_id, uid, actions,
)
break
defer.returnValue(actions_by_user)
def _condition_checker(evaluator, conditions, uid, display_name, cache):

View File

@ -88,33 +88,50 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notifs, 30 * 60 * 1000
)
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""
values = []
for uid, actions in tuples:
is_highlight = 1 if _action_has_highlight(actions) else 0
values.append({
'room_id': event.room_id,
'event_id': event.event_id,
'user_id': uid,
'actions': _serialize_action(actions, is_highlight),
'stream_ordering': event.internal_metadata.stream_ordering,
'topological_ordering': event.depth,
'notif': 1,
'highlight': is_highlight,
})
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
FROM event_push_actions_staging
WHERE event_id = ?
"""
for uid, __ in tuples:
txn.execute(sql, (
event.room_id, event.internal_metadata.stream_ordering,
event.depth, event.event_id,
))
user_ids = self._simple_select_onecol_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
retcol="user_id",
)
self._simple_delete_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
)
for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid)
(event.room_id, uid,)
)
self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@ -738,6 +755,50 @@ class EventPushActionsStore(SQLBaseStore):
(rotate_to_stream_ordering,)
)
def add_push_actions_to_staging(self, event_id, user_id, actions):
"""Add the push actions for the user and event to the push
action staging area.
Args:
event_id (str)
user_id (str)
actions (list[dict|str]): An action can either be a string or
dict.
Returns:
Deferred
"""
is_highlight = 1 if _action_has_highlight(actions) else 0
return self._simple_insert(
table="event_push_actions_staging",
values={
"event_id": event_id,
"user_id": user_id,
"actions": _serialize_action(actions, is_highlight),
"notif": 1,
"highlight": is_highlight,
},
desc="add_push_actions_to_staging",
)
def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
Args:
event_id (str)
"""
return self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)
def _action_has_highlight(actions):
for action in actions:

View File

@ -1168,10 +1168,9 @@ class EventsStore(SQLBaseStore):
for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table.
if context.push_actions:
self._set_push_actions_for_event_and_users_txn(
txn, event, context.push_actions
)
self._set_push_actions_for_event_and_users_txn(
txn, event,
)
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the

View File

@ -0,0 +1,28 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Temporary staging area for push actions that have been calculated for an
-- event, but the event hasn't yet been persisted.
-- When the event is persisted the rows are moved over to the
-- event_push_actions table.
CREATE TABLE event_push_actions_staging (
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
actions TEXT NOT NULL,
notif SMALLINT NOT NULL,
highlight SMALLINT NOT NULL
);
CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);

View File

@ -230,7 +230,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)
context.push_actions = push_actions
for user_id, actions in push_actions:
yield self.master_store.add_push_actions_to_staging(
event.event_id, user_id, actions,
)
ordering = None
if backfill:

View File

@ -62,6 +62,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
{"notify_count": noitf_count, "highlight_count": highlight_count}
)
@defer.inlineCallbacks
def _inject_actions(stream, action):
event = Mock()
event.room_id = room_id
@ -69,11 +70,12 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
event.internal_metadata.stream_ordering = stream
event.depth = stream
tuples = [(user_id, action)]
return self.store.runInteraction(
yield self.store.add_push_actions_to_staging(
event.event_id, user_id, action,
)
yield self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
event, tuples
event,
)
def _rotate(stream):