Fix notifier leak
parent
bc42ca121f
commit
050ebccf30
|
@ -308,49 +308,48 @@ class Notifier(object):
|
||||||
else:
|
else:
|
||||||
current_token = user_stream.current_token
|
current_token = user_stream.current_token
|
||||||
|
|
||||||
listener = [_NotificationListener(deferred)]
|
result = None
|
||||||
|
|
||||||
if timeout and not current_token.is_after(from_token):
|
|
||||||
user_stream.listeners.add(listener[0])
|
|
||||||
|
|
||||||
if current_token.is_after(from_token):
|
if current_token.is_after(from_token):
|
||||||
result = yield callback(from_token, current_token)
|
result = yield callback(from_token, current_token)
|
||||||
else:
|
|
||||||
result = None
|
|
||||||
|
|
||||||
timer = [None]
|
|
||||||
|
|
||||||
if result:
|
if result:
|
||||||
user_stream.listeners.discard(listener[0])
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
return
|
|
||||||
|
|
||||||
if timeout:
|
if timeout:
|
||||||
|
timer = [None]
|
||||||
|
listeners = []
|
||||||
timed_out = [False]
|
timed_out = [False]
|
||||||
|
|
||||||
|
def notify_listeners():
|
||||||
|
user_stream.listeners.difference_update(listeners)
|
||||||
|
for listener in listeners:
|
||||||
|
listener.notify(current_token)
|
||||||
|
del listeners[:]
|
||||||
|
|
||||||
def _timeout_listener():
|
def _timeout_listener():
|
||||||
timed_out[0] = True
|
timed_out[0] = True
|
||||||
timer[0] = None
|
timer[0] = None
|
||||||
user_stream.listeners.discard(listener[0])
|
notify_listeners()
|
||||||
listener[0].notify(current_token)
|
|
||||||
|
|
||||||
# We create multiple notification listeners so we have to manage
|
# We create multiple notification listeners so we have to manage
|
||||||
# canceling the timeout ourselves.
|
# canceling the timeout ourselves.
|
||||||
timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
|
timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
|
||||||
|
|
||||||
while not result and not timed_out[0]:
|
while not result and not timed_out[0]:
|
||||||
new_token = yield deferred
|
|
||||||
deferred = defer.Deferred()
|
deferred = defer.Deferred()
|
||||||
listener[0] = _NotificationListener(deferred)
|
notify_listeners()
|
||||||
user_stream.listeners.add(listener[0])
|
listeners.append(_NotificationListener(deferred))
|
||||||
|
user_stream.listeners.update(listeners)
|
||||||
|
new_token = yield deferred
|
||||||
|
|
||||||
result = yield callback(current_token, new_token)
|
result = yield callback(current_token, new_token)
|
||||||
current_token = new_token
|
current_token = new_token
|
||||||
|
|
||||||
if timer[0] is not None:
|
if timer[0] is not None:
|
||||||
try:
|
try:
|
||||||
self.clock.cancel_call_later(timer[0])
|
self.clock.cancel_call_later(timer[0])
|
||||||
except:
|
except:
|
||||||
logger.exception("Failed to cancel notifer timer")
|
logger.exception("Failed to cancel notifer timer")
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue