Merge pull request #2099 from matrix-org/erikj/deviceinbox_reduce
Deduplicate new deviceinbox rows for replicationpull/2104/head
commit
b9caf4f726
|
@ -325,23 +325,26 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
|||
# we return.
|
||||
upper_pos = min(current_pos, last_pos + limit)
|
||||
sql = (
|
||||
"SELECT stream_id, user_id"
|
||||
"SELECT max(stream_id), user_id"
|
||||
" FROM device_inbox"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
" GROUP BY user_id"
|
||||
)
|
||||
txn.execute(sql, (last_pos, upper_pos))
|
||||
rows = txn.fetchall()
|
||||
|
||||
sql = (
|
||||
"SELECT stream_id, destination"
|
||||
"SELECT max(stream_id), destination"
|
||||
" FROM device_federation_outbox"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
" GROUP BY destination"
|
||||
)
|
||||
txn.execute(sql, (last_pos, upper_pos))
|
||||
rows.extend(txn)
|
||||
|
||||
# Order by ascending stream ordering
|
||||
rows.sort()
|
||||
|
||||
return rows
|
||||
|
||||
return self.runInteraction(
|
||||
|
|
Loading…
Reference in New Issue