Fix deadlock in id_generators. No idea why this was an actual deadlock.

erikj/initial_sync_perf
Erik Johnston 2015-04-29 19:15:23 +01:00
parent 3e71d13acf
commit d2d61a8288
1 changed files with 16 additions and 14 deletions

View File

@ -86,10 +86,10 @@ class StreamIdGenerator(object):
with stream_id_gen.get_next_txn(txn) as stream_id: with stream_id_gen.get_next_txn(txn) as stream_id:
# ... persist event ... # ... persist event ...
""" """
with self._lock: if not self._current_max:
if not self._current_max: self._compute_current_max(txn)
self._compute_current_max(txn)
with self._lock:
self._current_max += 1 self._current_max += 1
next_id = self._current_max next_id = self._current_max
@ -110,22 +110,24 @@ class StreamIdGenerator(object):
"""Returns the maximum stream id such that all stream ids less than or """Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted. equal to it have been successfully persisted.
""" """
if not self._current_max:
yield store.runInteraction(
"_compute_current_max",
self._get_or_compute_current_max,
)
with self._lock: with self._lock:
if self._unfinished_ids: if self._unfinished_ids:
defer.returnValue(self._unfinished_ids[0] - 1) defer.returnValue(self._unfinished_ids[0] - 1)
if not self._current_max:
yield store.runInteraction(
"_compute_current_max",
self._compute_current_max,
)
defer.returnValue(self._current_max) defer.returnValue(self._current_max)
def _compute_current_max(self, txn): def _get_or_compute_current_max(self, txn):
txn.execute("SELECT MAX(stream_ordering) FROM events") with self._lock:
val, = txn.fetchone() txn.execute("SELECT MAX(stream_ordering) FROM events")
rows = txn.fetchall()
val, = rows[0]
self._current_max = int(val) if val else 1 self._current_max = int(val) if val else 1
return self._current_max return self._current_max