diff --git a/changelog.d/9900.bugfix b/changelog.d/9900.bugfix new file mode 100644 index 0000000000..a8470fca3f --- /dev/null +++ b/changelog.d/9900.bugfix @@ -0,0 +1 @@ +Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 969c73c1e7..12df35f26e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -2026,18 +2026,40 @@ class PresenceFederationQueue: ) return result["updates"], result["upto_token"], result["limited"] + # If the from_token is the current token then there's nothing to return + # and we can trivially no-op. + if from_token == self._next_id - 1: + return [], upto_token, False + # We can find the correct position in the queue by noting that there is # exactly one entry per stream ID, and that the last entry has an ID of # `self._next_id - 1`, so we can count backwards from the end. # + # Since we are returning all states in the range `from_token < stream_id + # <= upto_token` we look for the index with a `stream_id` of `from_token + # + 1`. + # # Since the start of the queue is periodically truncated we need to # handle the case where `from_token` stream ID has already been dropped. - start_idx = max(from_token - self._next_id, -len(self._queue)) + start_idx = max(from_token + 1 - self._next_id, -len(self._queue)) to_send = [] # type: List[Tuple[int, Tuple[str, str]]] limited = False new_id = upto_token for _, stream_id, destinations, user_ids in self._queue[start_idx:]: + if stream_id <= from_token: + # Paranoia check that we are actually only sending states that + # are have stream_id strictly greater than from_token. We should + # never hit this. + logger.warning( + "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)", + stream_id, + from_token, + self._next_id, + len(self._queue), + ) + continue + if stream_id > upto_token: break diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 61271cd084..ce330e79cc 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -509,6 +509,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): self.assertCountEqual(rows, expected_rows) + now_token = self.queue.get_current_token(self.instance_name) + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", upto_token, now_token, 10) + ) + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + self.assertCountEqual(rows, []) + def test_send_and_get_split(self): state1 = UserPresenceState.default("@user1:test") state2 = UserPresenceState.default("@user2:test") @@ -538,6 +546,20 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): self.assertCountEqual(rows, expected_rows) + now_token = self.queue.get_current_token(self.instance_name) + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", upto_token, now_token, 10) + ) + + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + + expected_rows = [ + (2, ("dest3", "@user3:test")), + ] + + self.assertCountEqual(rows, expected_rows) + def test_clear_queue_all(self): state1 = UserPresenceState.default("@user1:test") state2 = UserPresenceState.default("@user2:test")