Increase perf of handling concurrent use of StreamIDGenerators. (#9190)

We have seen a failure mode here where if there are many in flight
unfinished IDs then marking an ID as finished takes a lot of CPU (as
calling deque.remove iterates over the list)
pull/9199/head
Erik Johnston 2021-01-21 16:31:51 +00:00 committed by GitHub
parent 939ef657ce
commit 12ec55bfaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 8 deletions

1
changelog.d/9190.misc Normal file
View File

@ -0,0 +1 @@
Improve performance of concurrent use of `StreamIDGenerators`.

View File

@ -15,12 +15,11 @@
import heapq import heapq
import logging import logging
import threading import threading
from collections import deque from collections import OrderedDict
from contextlib import contextmanager from contextlib import contextmanager
from typing import Dict, List, Optional, Set, Tuple, Union from typing import Dict, List, Optional, Set, Tuple, Union
import attr import attr
from typing_extensions import Deque
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import DatabasePool, LoggingTransaction
@ -101,7 +100,13 @@ class StreamIdGenerator:
self._current = (max if step > 0 else min)( self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step) self._current, _load_current_id(db_conn, table, column, step)
) )
self._unfinished_ids = deque() # type: Deque[int]
# We use this as an ordered set, as we want to efficiently append items,
# remove items and get the first item. Since we insert IDs in order, the
# insertion ordering will ensure its in the correct ordering.
#
# The key and values are the same, but we never look at the values.
self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int]
def get_next(self): def get_next(self):
""" """
@ -113,7 +118,7 @@ class StreamIdGenerator:
self._current += self._step self._current += self._step
next_id = self._current next_id = self._current
self._unfinished_ids.append(next_id) self._unfinished_ids[next_id] = next_id
@contextmanager @contextmanager
def manager(): def manager():
@ -121,7 +126,7 @@ class StreamIdGenerator:
yield next_id yield next_id
finally: finally:
with self._lock: with self._lock:
self._unfinished_ids.remove(next_id) self._unfinished_ids.pop(next_id)
return _AsyncCtxManagerWrapper(manager()) return _AsyncCtxManagerWrapper(manager())
@ -140,7 +145,7 @@ class StreamIdGenerator:
self._current += n * self._step self._current += n * self._step
for next_id in next_ids: for next_id in next_ids:
self._unfinished_ids.append(next_id) self._unfinished_ids[next_id] = next_id
@contextmanager @contextmanager
def manager(): def manager():
@ -149,7 +154,7 @@ class StreamIdGenerator:
finally: finally:
with self._lock: with self._lock:
for next_id in next_ids: for next_id in next_ids:
self._unfinished_ids.remove(next_id) self._unfinished_ids.pop(next_id)
return _AsyncCtxManagerWrapper(manager()) return _AsyncCtxManagerWrapper(manager())
@ -162,7 +167,7 @@ class StreamIdGenerator:
""" """
with self._lock: with self._lock:
if self._unfinished_ids: if self._unfinished_ids:
return self._unfinished_ids[0] - self._step return next(iter(self._unfinished_ids)) - self._step
return self._current return self._current