Wrap process in a flag so we don't process whist already processing.
parent
6ec02e9ecf
commit
15e0f1696f
|
@ -48,6 +48,7 @@ class HttpPusher(object):
|
||||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||||
self.failing_since = pusherdict['failing_since']
|
self.failing_since = pusherdict['failing_since']
|
||||||
self.timed_call = None
|
self.timed_call = None
|
||||||
|
self.processing = False
|
||||||
|
|
||||||
# This is the highest stream ordering we know it's safe to process.
|
# This is the highest stream ordering we know it's safe to process.
|
||||||
# When new events arrive, we'll be given a window of new events: we
|
# When new events arrive, we'll be given a window of new events: we
|
||||||
|
@ -109,6 +110,14 @@ class HttpPusher(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _process(self):
|
def _process(self):
|
||||||
|
try:
|
||||||
|
self.processing = True
|
||||||
|
yield self._unsafe_process()
|
||||||
|
finally:
|
||||||
|
self.processing = False
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _unsafe_process(self):
|
||||||
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
|
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
|
||||||
self.user_id, self.last_stream_ordering, self.max_stream_ordering
|
self.user_id, self.last_stream_ordering, self.max_stream_ordering
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue