Fix to-device being dropped in limited sync in SQLite. (#11966)
If ther are more than 100 to-device messages pending for a device `/sync` will only return the first 100, however the next batch token was incorrectly calculated and so all other pending messages would be dropped. This is due to `txn.rowcount` only returning the number of rows that *changed*, rather than the number *selected* in SQLite.pull/11971/head
parent
4ef39f3353
commit
79fb64e417
|
@ -0,0 +1 @@
|
||||||
|
Fix to-device messages being dropped during limited sync when using SQLite.
|
|
@ -362,7 +362,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
# intended for each device.
|
# intended for each device.
|
||||||
last_processed_stream_pos = to_stream_id
|
last_processed_stream_pos = to_stream_id
|
||||||
recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {}
|
recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {}
|
||||||
|
rowcount = 0
|
||||||
for row in txn:
|
for row in txn:
|
||||||
|
rowcount += 1
|
||||||
|
|
||||||
last_processed_stream_pos = row[0]
|
last_processed_stream_pos = row[0]
|
||||||
recipient_user_id = row[1]
|
recipient_user_id = row[1]
|
||||||
recipient_device_id = row[2]
|
recipient_device_id = row[2]
|
||||||
|
@ -373,7 +376,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
(recipient_user_id, recipient_device_id), []
|
(recipient_user_id, recipient_device_id), []
|
||||||
).append(message_dict)
|
).append(message_dict)
|
||||||
|
|
||||||
if limit is not None and txn.rowcount == limit:
|
if limit is not None and rowcount == limit:
|
||||||
# We ended up bumping up against the message limit. There may be more messages
|
# We ended up bumping up against the message limit. There may be more messages
|
||||||
# to retrieve. Return what we have, as well as the last stream position that
|
# to retrieve. Return what we have, as well as the last stream position that
|
||||||
# was processed.
|
# was processed.
|
||||||
|
|
|
@ -198,3 +198,43 @@ class SendToDeviceTestCase(HomeserverTestCase):
|
||||||
"content": {"idx": 3},
|
"content": {"idx": 3},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_limited_sync(self):
|
||||||
|
"""If a limited sync for to-devices happens the next /sync should respond immediately."""
|
||||||
|
|
||||||
|
self.register_user("u1", "pass")
|
||||||
|
user1_tok = self.login("u1", "pass", "d1")
|
||||||
|
|
||||||
|
user2 = self.register_user("u2", "pass")
|
||||||
|
user2_tok = self.login("u2", "pass", "d2")
|
||||||
|
|
||||||
|
# Do an initial sync
|
||||||
|
channel = self.make_request("GET", "/sync", access_token=user2_tok)
|
||||||
|
self.assertEqual(channel.code, 200, channel.result)
|
||||||
|
sync_token = channel.json_body["next_batch"]
|
||||||
|
|
||||||
|
# Send 150 to-device messages. We limit to 100 in `/sync`
|
||||||
|
for i in range(150):
|
||||||
|
test_msg = {"foo": "bar"}
|
||||||
|
chan = self.make_request(
|
||||||
|
"PUT",
|
||||||
|
f"/_matrix/client/r0/sendToDevice/m.test/1234-{i}",
|
||||||
|
content={"messages": {user2: {"d2": test_msg}}},
|
||||||
|
access_token=user1_tok,
|
||||||
|
)
|
||||||
|
self.assertEqual(chan.code, 200, chan.result)
|
||||||
|
|
||||||
|
channel = self.make_request(
|
||||||
|
"GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, 200, channel.result)
|
||||||
|
messages = channel.json_body.get("to_device", {}).get("events", [])
|
||||||
|
self.assertEqual(len(messages), 100)
|
||||||
|
sync_token = channel.json_body["next_batch"]
|
||||||
|
|
||||||
|
channel = self.make_request(
|
||||||
|
"GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, 200, channel.result)
|
||||||
|
messages = channel.json_body.get("to_device", {}).get("events", [])
|
||||||
|
self.assertEqual(len(messages), 50)
|
||||||
|
|
Loading…
Reference in New Issue