Make sure the notifier stream token goes forward when it is updated. Sort the pending events by the correct room_stream_id
parent
0ad1c67234
commit
1e90715a3d
|
@ -88,7 +88,7 @@ class _NotifierUserStream(object):
|
||||||
stream_id(str): The new id for the stream the event came from.
|
stream_id(str): The new id for the stream the event came from.
|
||||||
time_now_ms(int): The current time in milliseconds.
|
time_now_ms(int): The current time in milliseconds.
|
||||||
"""
|
"""
|
||||||
self.current_token = self.current_token.copy_and_replace(
|
self.current_token = self.current_token.copy_and_advance(
|
||||||
stream_key, stream_id
|
stream_key, stream_id
|
||||||
)
|
)
|
||||||
if self.listeners:
|
if self.listeners:
|
||||||
|
@ -192,7 +192,7 @@ class Notifier(object):
|
||||||
yield run_on_reactor()
|
yield run_on_reactor()
|
||||||
|
|
||||||
self.pending_new_room_events.append((
|
self.pending_new_room_events.append((
|
||||||
event, room_stream_id, extra_users
|
room_stream_id, event, extra_users
|
||||||
))
|
))
|
||||||
self._notify_pending_new_room_events(max_room_stream_id)
|
self._notify_pending_new_room_events(max_room_stream_id)
|
||||||
|
|
||||||
|
@ -205,10 +205,10 @@ class Notifier(object):
|
||||||
"""
|
"""
|
||||||
pending = sorted(self.pending_new_room_events)
|
pending = sorted(self.pending_new_room_events)
|
||||||
self.pending_new_room_events = []
|
self.pending_new_room_events = []
|
||||||
for event, room_stream_id, extra_users in pending:
|
for room_stream_id, event, extra_users in pending:
|
||||||
if room_stream_id > max_room_stream_id:
|
if room_stream_id > max_room_stream_id:
|
||||||
self.pending_new_room_events.append((
|
self.pending_new_room_events.append((
|
||||||
event, room_stream_id, extra_users
|
room_stream_id, event, extra_users
|
||||||
))
|
))
|
||||||
else:
|
else:
|
||||||
self._on_new_room_event(event, room_stream_id, extra_users)
|
self._on_new_room_event(event, room_stream_id, extra_users)
|
||||||
|
|
|
@ -119,6 +119,7 @@ class StreamToken(
|
||||||
@property
|
@property
|
||||||
def room_stream_id(self):
|
def room_stream_id(self):
|
||||||
# TODO(markjh): Awful hack to work around hacks in the presence tests
|
# TODO(markjh): Awful hack to work around hacks in the presence tests
|
||||||
|
# which assume that the keys are integers.
|
||||||
if type(self.room_key) is int:
|
if type(self.room_key) is int:
|
||||||
return self.room_key
|
return self.room_key
|
||||||
else:
|
else:
|
||||||
|
@ -132,6 +133,22 @@ class StreamToken(
|
||||||
or (int(other_token.typing_key) < int(self.typing_key))
|
or (int(other_token.typing_key) < int(self.typing_key))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def copy_and_advance(self, key, new_value):
|
||||||
|
"""Advance the given key in the token to a new value if and only if the
|
||||||
|
new value is after the old value.
|
||||||
|
"""
|
||||||
|
new_token = self.copy_and_replace(key, new_value)
|
||||||
|
if key == "room_key":
|
||||||
|
new_id = new_token.room_stream_id
|
||||||
|
old_id = self.room_stream_id
|
||||||
|
else:
|
||||||
|
new_id = int(getattr(new_token, key))
|
||||||
|
old_id = int(getattr(self, key))
|
||||||
|
if old_id < new_id:
|
||||||
|
return new_token
|
||||||
|
else:
|
||||||
|
return self
|
||||||
|
|
||||||
def copy_and_replace(self, key, new_value):
|
def copy_and_replace(self, key, new_value):
|
||||||
d = self._asdict()
|
d = self._asdict()
|
||||||
d[key] = new_value
|
d[key] = new_value
|
||||||
|
|
Loading…
Reference in New Issue