Parse tokens before calling DB function
parent
274b8c6025
commit
3e6d306e94
|
@ -738,16 +738,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
def has_room_changed_since(self, room_id, stream_id):
|
def has_room_changed_since(self, room_id, stream_id):
|
||||||
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|
||||||
|
|
||||||
def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None,
|
def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
|
||||||
direction='b', limit=-1, event_filter=None):
|
direction='b', limit=-1, event_filter=None):
|
||||||
"""Returns list of events before or after a given token.
|
"""Returns list of events before or after a given token.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
txn
|
txn
|
||||||
room_id (str)
|
room_id (str)
|
||||||
from_key (str): The token used to stream from
|
from_token (RoomStreamToken): The token used to stream from
|
||||||
to_key (str|None): A token which if given limits the results to
|
to_token (RoomStreamToken|None): A token which if given limits the
|
||||||
only those before
|
results to only those before
|
||||||
direction(char): Either 'b' or 'f' to indicate whether we are
|
direction(char): Either 'b' or 'f' to indicate whether we are
|
||||||
paginating forwards or backwards from `from_key`.
|
paginating forwards or backwards from `from_key`.
|
||||||
limit (int): The maximum number of events to return. Zero or less
|
limit (int): The maximum number of events to return. Zero or less
|
||||||
|
@ -757,7 +757,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
tuple[list[dict], str]: Returns the results as a list of dicts and
|
tuple[list[dict], str]: Returns the results as a list of dicts and
|
||||||
a token that points to the end of the result set. The dicts have
|
a token that points to the end of the result set. The dicts haveq
|
||||||
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
||||||
"""
|
"""
|
||||||
# Tokens really represent positions between elements, but we use
|
# Tokens really represent positions between elements, but we use
|
||||||
|
@ -767,20 +767,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
if direction == 'b':
|
if direction == 'b':
|
||||||
order = "DESC"
|
order = "DESC"
|
||||||
bounds = upper_bound(
|
bounds = upper_bound(
|
||||||
RoomStreamToken.parse(from_key), self.database_engine
|
from_token, self.database_engine
|
||||||
)
|
)
|
||||||
if to_key:
|
if to_token:
|
||||||
bounds = "%s AND %s" % (bounds, lower_bound(
|
bounds = "%s AND %s" % (bounds, lower_bound(
|
||||||
RoomStreamToken.parse(to_key), self.database_engine
|
to_token, self.database_engine
|
||||||
))
|
))
|
||||||
else:
|
else:
|
||||||
order = "ASC"
|
order = "ASC"
|
||||||
bounds = lower_bound(
|
bounds = lower_bound(
|
||||||
RoomStreamToken.parse(from_key), self.database_engine
|
from_token, self.database_engine
|
||||||
)
|
)
|
||||||
if to_key:
|
if to_token:
|
||||||
bounds = "%s AND %s" % (bounds, upper_bound(
|
bounds = "%s AND %s" % (bounds, upper_bound(
|
||||||
RoomStreamToken.parse(to_key), self.database_engine
|
to_token, self.database_engine
|
||||||
))
|
))
|
||||||
|
|
||||||
filter_clause, filter_args = filter_to_clause(event_filter)
|
filter_clause, filter_args = filter_to_clause(event_filter)
|
||||||
|
@ -821,12 +821,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# when we are going backwards so we subtract one from the
|
# when we are going backwards so we subtract one from the
|
||||||
# stream part.
|
# stream part.
|
||||||
toke -= 1
|
toke -= 1
|
||||||
next_token = str(RoomStreamToken(topo, toke))
|
next_token = RoomStreamToken(topo, toke)
|
||||||
else:
|
else:
|
||||||
# TODO (erikj): We should work out what to do here instead.
|
# TODO (erikj): We should work out what to do here instead.
|
||||||
next_token = to_key if to_key else from_key
|
next_token = to_token if to_token else from_token
|
||||||
|
|
||||||
return rows, next_token,
|
return rows, str(next_token),
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||||
|
@ -851,6 +851,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from_key = RoomStreamToken.parse(from_key)
|
||||||
|
if to_key:
|
||||||
|
to_key = RoomStreamToken.parse(to_key)
|
||||||
|
|
||||||
rows, token = yield self.runInteraction(
|
rows, token = yield self.runInteraction(
|
||||||
"paginate_room_events", self.paginate_room_events_txn,
|
"paginate_room_events", self.paginate_room_events_txn,
|
||||||
room_id, from_key, to_key, direction, limit, event_filter,
|
room_id, from_key, to_key, direction, limit, event_filter,
|
||||||
|
|
Loading…
Reference in New Issue