Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do:

* Make badges work again
 * Remove old, unused code
pull/705/head
David Baker 2016-04-06 15:42:15 +01:00
parent b29f98377d
commit 7e2c89a37f
13 changed files with 503 additions and 130 deletions

View File

@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
import logging
@ -377,6 +377,12 @@ class BaseHandler(object):
event, context=context
)
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)
destinations = set()
for k, s in context.current_state.items():
try:

View File

@ -26,7 +26,7 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@ -1094,6 +1094,12 @@ class FederationHandler(BaseHandler):
context=context,
)
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks

View File

@ -70,11 +70,17 @@ def _get_rules(room_id, user_ids, store):
@defer.inlineCallbacks
def evaluator_for_room_id(room_id, hs, store):
results = yield store.get_receipts_for_room(room_id, "m.read")
user_ids = [
row["user_id"] for row in results
if hs.is_mine_id(row["user_id"])
]
users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
receipts = yield store.get_receipts_for_room(room_id, "m.read")
# any users with pushers must be ours: they have pushers
user_ids = set(users_with_pushers)
for r in receipts:
if hs.is_mine_id(r['user_id']):
user_ids.add(r['user_id'])
user_ids = list(user_ids)
rules_by_user = yield _get_rules(room_id, user_ids, store)
defer.returnValue(BulkPushRuleEvaluator(
@ -101,10 +107,15 @@ class BulkPushRuleEvaluator:
def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {}
users_dict = yield self.store.are_guests(self.rules_by_user.keys())
# None of these users can be peeking since this list of users comes
# from the set of users in the room, so we know for sure they're all
# actually in the room.
user_tuples = [
(u, False) for u in self.rules_by_user.keys()
]
filtered_by_user = yield handler.filter_events_for_clients(
users_dict.items(), [event], {event.event_id: current_state}
user_tuples, [event], {event.event_id: current_state}
)
room_members = yield self.store.get_users_in_room(self.room_id)

View File

@ -13,60 +13,188 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.push import Pusher, PusherConfigException
from synapse.push import PusherConfigException
from twisted.internet import defer
from twisted.internet import defer, reactor
import logging
import push_rule_evaluator
import push_tools
logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
def __init__(self, _hs, user_id, app_id,
app_display_name, device_display_name, pushkey, pushkey_ts,
data, last_token, last_success, failing_since):
super(HttpPusher, self).__init__(
_hs,
user_id,
app_id,
app_display_name,
device_display_name,
pushkey,
pushkey_ts,
data,
last_token,
last_success,
failing_since
class HttpPusher(object):
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
MAX_BACKOFF_SEC = 60 * 60
# This one's in ms because we compare it against the clock
GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
def __init__(self, hs, pusherdict):
self.hs = hs
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.user_id = pusherdict['user_name']
self.app_id = pusherdict['app_id']
self.app_display_name = pusherdict['app_display_name']
self.device_display_name = pusherdict['device_display_name']
self.pushkey = pusherdict['pushkey']
self.pushkey_ts = pusherdict['ts']
self.data = pusherdict['data']
self.last_stream_ordering = pusherdict['last_stream_ordering']
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict['failing_since']
self.timed_call = None
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
# should honour this rather than just looking for anything higher
# because of potential out-of-order event serialisation. This starts
# off as None though as we don't know any better.
self.max_stream_ordering = None
if 'data' not in pusherdict:
raise PusherConfigException(
"No 'data' key for HTTP pusher"
)
self.data = pusherdict['data']
self.name = "%s/%s/%s" % (
pusherdict['user_name'],
pusherdict['app_id'],
pusherdict['pushkey'],
)
if 'url' not in data:
if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
)
self.url = data['url']
self.http_client = _hs.get_simple_http_client()
self.url = self.data['url']
self.http_client = hs.get_simple_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
def on_started(self):
self._process()
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max_stream_ordering
self._process()
def on_timer(self):
self._process()
def on_stop(self):
if self.timed_call:
self.timed_call.cancel()
@defer.inlineCallbacks
def _process(self):
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
for push_action in unprocessed:
processed = yield self._process_one(push_action)
if processed:
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
self.store.update_pusher_last_stream_ordering_and_success(
self.app_id, self.pushkey, self.user_id,
self.last_stream_ordering,
self.clock.time_msec()
)
self.failing_since = None
yield self.store.update_pusher_failing_since(
self.app_id, self.pushkey, self.user_id,
self.failing_since
)
else:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
self.app_id, self.pushkey, self.user_id,
self.failing_since
)
if (
self.failing_since and
self.failing_since <
self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER
):
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
self.user_id, self.pushkey)
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
yield self.store.update_pusher_last_stream_ordering(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering
)
self.failing_since = None
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
self.failing_since
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC)
break
@defer.inlineCallbacks
def _process_one(self, push_action):
if 'notify' not in push_action['actions']:
defer.returnValue(True)
tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions'])
badge = yield push_tools.get_badge_count(self.hs, self.user_id)
event = yield self.store.get_event(push_action['event_id'], allow_none=True)
if event is None:
defer.returnValue(True) # It's been redacted
rejected = yield self.dispatch_push(event, tweaks, badge)
if rejected is False:
defer.returnValue(False)
if isinstance(rejected, list) or isinstance(rejected, tuple):
for pk in rejected:
if pk != self.pushkey:
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warn(
("Ignoring rejected pushkey %s because we"
" didn't send it"), pk
)
else:
logger.info(
"Pushkey %s was rejected: removing",
pk
)
yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk, self.user_id
)
defer.returnValue(True)
@defer.inlineCallbacks
def _build_notification_dict(self, event, tweaks, badge):
# we probably do not want to push for every presence update
# (we may want to be able to set up notifications when specific
# people sign in, but we'd want to only deliver the pertinent ones)
# Actually, presence events will not get this far now because we
# need to filter them out in the main Pusher code.
if 'event_id' not in event:
defer.returnValue(None)
ctx = yield self.get_context_for_event(event)
ctx = yield push_tools.get_context_for_event(self.hs, event)
d = {
'notification': {
'id': event['event_id'],
'room_id': event['room_id'],
'type': event['type'],
'sender': event['user_id'],
'id': event.event_id,
'room_id': event.room_id,
'type': event.type,
'sender': event.user_id,
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
@ -84,11 +212,11 @@ class HttpPusher(Pusher):
]
}
}
if event['type'] == 'm.room.member':
d['notification']['membership'] = event['content']['membership']
d['notification']['user_is_target'] = event['state_key'] == self.user_id
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
if 'content' in event:
d['notification']['content'] = event['content']
d['notification']['content'] = event.content
if len(ctx['aliases']):
d['notification']['room_alias'] = ctx['aliases'][0]

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
@defer.inlineCallbacks
def get_badge_count(hs, user_id):
invites, joins = yield defer.gatherResults([
hs.get_datastore().get_invited_rooms_for_user(user_id),
hs.get_datastore().get_rooms_for_user(user_id),
], consumeErrors=True)
my_receipts_by_room = yield hs.get_datastore().get_receipts_for_user(
user_id, "m.read",
)
badge = len(invites)
for r in joins:
if r.room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[r.room_id]
notifs = yield (
hs.get_datastore().get_unread_event_push_actions_by_room_for_user(
r.room_id, user_id, last_unread_event_id
)
)
badge += notifs["notify_count"]
defer.returnValue(badge)
@defer.inlineCallbacks
def get_context_for_event(hs, ev):
name_aliases = yield hs.get_datastore().get_room_name_and_aliases(
ev.room_id
)
ctx = {'aliases': name_aliases[1]}
if name_aliases[0] is not None:
ctx['name'] = name_aliases[0]
their_member_events_for_room = yield hs.get_datastore().get_current_state(
room_id=ev.room_id,
event_type='m.room.member',
state_key=ev.user_id
)
for mev in their_member_events_for_room:
if mev.content['membership'] == 'join' and 'displayname' in mev.content:
dn = mev.content['displayname']
if dn is not None:
ctx['sender_display_name'] = dn
defer.returnValue(ctx)

10
synapse/push/pusher.py Normal file
View File

@ -0,0 +1,10 @@
from httppusher import HttpPusher
PUSHER_TYPES = {
'http': HttpPusher
}
def create_pusher(hs, pusherdict):
if pusherdict['kind'] in PUSHER_TYPES:
return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)

View File

@ -16,9 +16,10 @@
from twisted.internet import defer
from .httppusher import HttpPusher
import pusher
from synapse.push import PusherConfigException
from synapse.util.logcontext import preserve_fn
from synapse.util.async import run_on_reactor
import logging
@ -48,7 +49,7 @@ class PusherPool:
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
# code path adding pushers.
self._create_pusher({
pusher.create_pusher(self.hs, {
"user_name": user_id,
"kind": kind,
"app_id": app_id,
@ -58,10 +59,18 @@ class PusherPool:
"ts": time_now_msec,
"lang": lang,
"data": data,
"last_token": None,
"last_stream_ordering": None,
"last_success": None,
"failing_since": None
})
# create the pusher setting last_stream_ordering to the current maximum
# stream ordering in event_push_actions, so it will process
# pushes from this point onwards.
last_stream_ordering = (
yield self.store.get_latest_push_action_stream_ordering()
)
yield self.store.add_pusher(
user_id=user_id,
access_token=access_token,
@ -73,6 +82,7 @@ class PusherPool:
pushkey_ts=time_now_msec,
lang=lang,
data=data,
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
yield self._refresh_pusher(app_id, pushkey, user_id)
@ -106,26 +116,19 @@ class PusherPool:
)
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
return HttpPusher(
self.hs,
user_id=pusherdict['user_name'],
app_id=pusherdict['app_id'],
app_display_name=pusherdict['app_display_name'],
device_display_name=pusherdict['device_display_name'],
pushkey=pusherdict['pushkey'],
pushkey_ts=pusherdict['ts'],
data=pusherdict['data'],
last_token=pusherdict['last_token'],
last_success=pusherdict['last_success'],
failing_since=pusherdict['failing_since']
)
else:
raise PusherConfigException(
"Unknown pusher type '%s' for user %s" %
(pusherdict['kind'], pusherdict['user_name'])
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
yield run_on_reactor()
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
p.on_new_notifications(min_stream_id, max_stream_id)
except:
logger.exception("Exception in pusher on_new_notifications")
@defer.inlineCallbacks
def _refresh_pusher(self, app_id, pushkey, user_id):
@ -146,30 +149,34 @@ class PusherPool:
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
try:
p = self._create_pusher(pusherdict)
p = pusher.create_pusher(self.hs, pusherdict)
except PusherConfigException:
logger.exception("Couldn't start a pusher: caught PusherConfigException")
continue
if p:
fullid = "%s:%s:%s" % (
appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
pusherdict['user_name']
)
if fullid in self.pushers:
self.pushers[fullid].stop()
self.pushers[fullid] = p
preserve_fn(p.start)()
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
preserve_fn(p.on_started)()
logger.info("Started pushers")
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
if fullid in self.pushers:
logger.info("Stopping pusher %s", fullid)
self.pushers[fullid].stop()
del self.pushers[fullid]
appid_pushkey = "%s:%s" % (app_id, pushkey)
byuser = self.pushers.get(user_id, {})
if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop()
del byuser[appid_pushkey]
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)

View File

@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(ret)
@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
def f(txn):
sql = (
"SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
" stream_ordering >= ? AND stream_ordering >= ?"
)
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
return [r[0] for r in txn.fetchall()]
ret = yield self.runInteraction("get_push_action_users_in_range", f)
defer.returnValue(ret)
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering,
max_stream_ordering=None):
def f(txn):
sql = (
"SELECT event_id, stream_ordering, actions"
" FROM event_push_actions"
" WHERE user_id = ? AND stream_ordering > ?"
)
args = [user_id, min_stream_ordering]
if max_stream_ordering is not None:
sql += " AND stream_ordering <= ?"
args.append(max_stream_ordering)
sql += " ORDER BY stream_ordering ASC"
txn.execute(sql, args)
return txn.fetchall()
ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
defer.returnValue([
{
"event_id": row[0],
"stream_ordering": row[1],
"actions": json.loads(row[2]),
} for row in ret
])
@defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
return txn.fetchone()
result = yield self.runInteraction(
"get_latest_push_action_stream_ordering", f
)
defer.returnValue(result[0] or 0)
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(

View File

@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled: ?
Returns: Tuple of stream_orderings where the first is the minimum and
last is the maximum stream ordering assigned to the events when
persisting.
"""
if not events_and_contexts:
return
@ -191,6 +202,7 @@ class EventsStore(SQLBaseStore):
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))

View File

@ -18,6 +18,8 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
import simplejson as json
import types
@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
@cachedInlineCallbacks(num_args=1)
def get_users_with_pushers_in_room(self, room_id):
users = yield self.get_users_in_room(room_id)
result = yield self._simple_select_many_batch(
'pushers', 'user_name', users, ['user_name']
)
defer.returnValue([r['user_name'] for r in result])
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
pushkey, pushkey_ts, lang, data, profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
yield self._simple_upsert(
"pushers",
dict(
app_id=app_id,
pushkey=pushkey,
user_name=user_id,
),
dict(
access_token=access_token,
kind=kind,
app_display_name=app_display_name,
device_display_name=device_display_name,
ts=pushkey_ts,
lang=lang,
data=encode_canonical_json(data),
profile_tag=profile_tag,
id=stream_id,
),
desc="add_pusher",
)
pushkey, pushkey_ts, lang, data, last_stream_ordering,
profile_tag=""):
def f(txn):
txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
with self._pushers_id_gen.get_next() as stream_id:
return self._simple_upsert_txn(
txn,
"pushers",
dict(
app_id=app_id,
pushkey=pushkey,
user_name=user_id,
),
dict(
access_token=access_token,
kind=kind,
app_display_name=app_display_name,
device_display_name=device_display_name,
ts=pushkey_ts,
lang=lang,
data=encode_canonical_json(data),
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
id=stream_id,
),
)
defer.returnValue((yield self.runInteraction("add_pusher", f)))
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore):
)
@defer.inlineCallbacks
def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
last_stream_ordering):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
{'last_token': last_token},
desc="update_pusher_last_token",
{'last_stream_ordering': last_stream_ordering},
desc="update_pusher_last_stream_ordering",
)
@defer.inlineCallbacks
def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
last_token, last_success):
def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
user_id,
last_stream_ordering,
last_success):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
{'last_token': last_token, 'last_success': last_success},
desc="update_pusher_last_token_and_success",
{
'last_stream_ordering': last_stream_ordering,
'last_success': last_success
},
desc="update_pusher_last_stream_ordering_and_success",
)
@defer.inlineCallbacks

View File

@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore):
defer.returnValue(res if res else False)
@cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1,
inlineCallbacks=True)
def are_guests(self, user_ids):
sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
",".join("?" for _ in user_ids),
)
rows = yield self._execute(
"are_guests", self.cursor_to_dict, sql, *user_ids
)
result = {user_id: False for user_id in user_ids}
result.update({
row["name"]: bool(row["is_guest"])
for row in rows
})
defer.returnValue(result)
def _query_for_auth(self, txn, token):
sql = (
"SELECT users.name, users.is_guest, access_tokens.id as token_id"

View File

@ -58,6 +58,7 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,))
txn.call_after(
self._membership_stream_cache.entity_has_changed,
event.state_key, event.internal_metadata.stream_ordering

View File

@ -0,0 +1,75 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Change the last_token to last_stream_ordering now that pushers no longer
# listen on an event stream but instead select out of the event_push_actions
# table.
import logging
logger = logging.getLogger(__name__)
def token_to_stream_ordering(token):
return int(token[1:].split('_')[0])
def run_upgrade(cur, database_engine, *args, **kwargs):
logger.info("Porting pushers table, delta 31...")
cur.execute("""
CREATE TABLE IF NOT EXISTS pushers2 (
id BIGINT PRIMARY KEY,
user_name TEXT NOT NULL,
access_token BIGINT DEFAULT NULL,
profile_tag VARCHAR(32) NOT NULL,
kind VARCHAR(8) NOT NULL,
app_id VARCHAR(64) NOT NULL,
app_display_name VARCHAR(64) NOT NULL,
device_display_name VARCHAR(128) NOT NULL,
pushkey TEXT NOT NULL,
ts BIGINT NOT NULL,
lang VARCHAR(8),
data TEXT,
last_stream_ordering INTEGER,
last_success BIGINT,
failing_since BIGINT,
UNIQUE (app_id, pushkey, user_name)
)
""")
cur.execute("""SELECT
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
FROM pushers
""")
count = 0
for row in cur.fetchall():
row = list(row)
row[12] = token_to_stream_ordering(row[12])
cur.execute(database_engine.convert_param_style("""
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_stream_ordering, last_success,
failing_since
) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
row
)
count += 1
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)