Filter before ordering
parent
d1e9655f75
commit
b999adcaa2
|
@ -202,19 +202,64 @@ class SyncHandler(BaseHandler):
|
||||||
return self.incremental_sync_with_gap(sync_config, batch_token)
|
return self.incremental_sync_with_gap(sync_config, batch_token)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_room_timestamps_at_token(self, room_ids, token):
|
def _get_room_timestamps_at_token(self, room_ids, token, sync_config,
|
||||||
room_to_last_ts = {}
|
limit):
|
||||||
|
room_to_entries = {}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_last_ts(room_id):
|
def _get_last_ts(room_id):
|
||||||
ts = yield self.store.get_last_ts_for_room(
|
entry = yield self.store.get_last_ts_for_room(
|
||||||
room_id, token.room_key
|
room_id, token.room_key
|
||||||
)
|
)
|
||||||
room_to_last_ts[room_id] = ts if ts else 0
|
|
||||||
|
|
||||||
logger.info("room_to_last_ts: %r", room_to_last_ts)
|
# TODO: Is this ever possible?
|
||||||
|
room_to_entries[room_id] = entry if entry else {
|
||||||
|
"origin_server_ts": 0,
|
||||||
|
}
|
||||||
|
|
||||||
yield concurrently_execute(_get_last_ts, room_ids, 10)
|
yield concurrently_execute(_get_last_ts, room_ids, 10)
|
||||||
defer.returnValue(room_to_last_ts)
|
|
||||||
|
if len(room_to_entries) <= limit:
|
||||||
|
defer.returnValue({
|
||||||
|
room_id: entry["origin_server_ts"]
|
||||||
|
for room_id, entry in room_to_entries.items()
|
||||||
|
})
|
||||||
|
|
||||||
|
queued_events = sorted(
|
||||||
|
room_to_entries.items(),
|
||||||
|
key=lambda e: e[1]["origin_server_ts"]
|
||||||
|
)
|
||||||
|
|
||||||
|
to_return = {}
|
||||||
|
|
||||||
|
while len(to_return) < limit and len(queued_events) > 0:
|
||||||
|
to_fetch = queued_events[:limit - len(to_return)]
|
||||||
|
event_to_q = {
|
||||||
|
e["event_id"]: (room_id, e) for room_id, e in to_fetch.items()
|
||||||
|
if "event_id" in e
|
||||||
|
}
|
||||||
|
|
||||||
|
# Now we fetch each event to check if its been filtered out
|
||||||
|
event_map = yield self.store.get_events(event_to_q.keys())
|
||||||
|
|
||||||
|
recents = sync_config.filter_collection.filter_room_timeline(
|
||||||
|
event_map.values()
|
||||||
|
)
|
||||||
|
recents = yield filter_events_for_client(
|
||||||
|
self.store,
|
||||||
|
sync_config.user.to_string(),
|
||||||
|
recents,
|
||||||
|
)
|
||||||
|
|
||||||
|
to_return.update({r.room_id: r.origin_server_ts for r in recents})
|
||||||
|
|
||||||
|
for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
|
||||||
|
queued_events.append(event_to_q[ev_id])
|
||||||
|
|
||||||
|
# FIXME: Need to refetch TS
|
||||||
|
queued_events.sort(key=lambda e: e[1]["origin_server_ts"])
|
||||||
|
|
||||||
|
defer.returnValue(to_return)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def full_state_sync(self, sync_config, batch_token):
|
def full_state_sync(self, sync_config, batch_token):
|
||||||
|
@ -292,6 +337,8 @@ class SyncHandler(BaseHandler):
|
||||||
e.room_id for e in room_list if e.membership == Membership.JOIN
|
e.room_id for e in room_list if e.membership == Membership.JOIN
|
||||||
],
|
],
|
||||||
token=now_token,
|
token=now_token,
|
||||||
|
sync_config=sync_config,
|
||||||
|
limit=pagination_limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
if room_to_last_ts:
|
if room_to_last_ts:
|
||||||
|
@ -512,8 +559,13 @@ class SyncHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts):
|
def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts,
|
||||||
start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token)
|
sync_config, pagination_limit):
|
||||||
|
start_ts = yield self._get_room_timestamps_at_token(
|
||||||
|
room_ids, since_token,
|
||||||
|
sync_config=sync_config,
|
||||||
|
limit=pagination_limit,
|
||||||
|
)
|
||||||
|
|
||||||
missing_list = frozenset(
|
missing_list = frozenset(
|
||||||
room_id for room_id, ts in
|
room_id for room_id, ts in
|
||||||
|
@ -675,10 +727,13 @@ class SyncHandler(BaseHandler):
|
||||||
|
|
||||||
p_room_token = room_pagination_config.get("t", None)
|
p_room_token = room_pagination_config.get("t", None)
|
||||||
if p_room_token:
|
if p_room_token:
|
||||||
|
pa_limit = room_pagination_config["l"]
|
||||||
needing_full_state = yield self._get_rooms_that_need_full_state(
|
needing_full_state = yield self._get_rooms_that_need_full_state(
|
||||||
[room_id],
|
[room_id],
|
||||||
since_token,
|
since_token,
|
||||||
room_pagination_config.get("ts", 0),
|
room_pagination_config.get("ts", 0),
|
||||||
|
sync_config=sync_config,
|
||||||
|
pagination_limit=pa_limit,
|
||||||
)
|
)
|
||||||
need_full_state = room_id in needing_full_state
|
need_full_state = room_id in needing_full_state
|
||||||
else:
|
else:
|
||||||
|
@ -716,10 +771,13 @@ class SyncHandler(BaseHandler):
|
||||||
if not room_sync.timeline:
|
if not room_sync.timeline:
|
||||||
p_room_token = room_pagination_config.get("t", None)
|
p_room_token = room_pagination_config.get("t", None)
|
||||||
if p_room_token:
|
if p_room_token:
|
||||||
|
pa_limit = room_pagination_config["l"]
|
||||||
needing_full_state = yield self._get_rooms_that_need_full_state(
|
needing_full_state = yield self._get_rooms_that_need_full_state(
|
||||||
[room_id],
|
[room_id],
|
||||||
since_token,
|
since_token,
|
||||||
room_pagination_config.get("ts", 0),
|
room_pagination_config.get("ts", 0),
|
||||||
|
sync_config=sync_config,
|
||||||
|
pagination_limit=pa_limit,
|
||||||
)
|
)
|
||||||
if room_id in needing_full_state:
|
if room_id in needing_full_state:
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -533,7 +533,7 @@ class StreamStore(SQLBaseStore):
|
||||||
stream_ordering = RoomStreamToken.parse_stream_token(token).stream
|
stream_ordering = RoomStreamToken.parse_stream_token(token).stream
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT origin_server_ts FROM events"
|
"SELECT event_id, origin_server_ts FROM events"
|
||||||
" WHERE room_id = ? AND stream_ordering <= ?"
|
" WHERE room_id = ? AND stream_ordering <= ?"
|
||||||
" ORDER BY topological_ordering DESC, stream_ordering DESC"
|
" ORDER BY topological_ordering DESC, stream_ordering DESC"
|
||||||
" LIMIT 1"
|
" LIMIT 1"
|
||||||
|
@ -541,9 +541,9 @@ class StreamStore(SQLBaseStore):
|
||||||
|
|
||||||
def f(txn):
|
def f(txn):
|
||||||
txn.execute(sql, (room_id, stream_ordering))
|
txn.execute(sql, (room_id, stream_ordering))
|
||||||
rows = txn.fetchall()
|
rows = self.cursor_to_dict(txn)
|
||||||
if rows:
|
if rows:
|
||||||
return rows[0][0]
|
return rows[0]
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue