Retry transaction, not SQL query
parent
24d8134ac1
commit
9a05795619
|
@ -182,22 +182,12 @@ class LoggingTransaction(object):
|
||||||
start = time.time() * 1000
|
start = time.time() * 1000
|
||||||
|
|
||||||
try:
|
try:
|
||||||
i = 0
|
return self.txn.execute(
|
||||||
N = 5
|
sql, *args, **kwargs
|
||||||
while True:
|
)
|
||||||
try:
|
|
||||||
return self.txn.execute(
|
|
||||||
sql, *args, **kwargs
|
|
||||||
)
|
|
||||||
except self.database_engine.module.DatabaseError as e:
|
|
||||||
if self.database_engine.is_deadlock(e) and i < N:
|
|
||||||
i += 1
|
|
||||||
logger.warn("[SQL DEADLOCK] {%s}", self.name)
|
|
||||||
continue
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
|
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
msecs = (time.time() * 1000) - start
|
msecs = (time.time() * 1000) - start
|
||||||
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
|
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
|
||||||
|
@ -347,7 +337,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
start_time = time.time() * 1000
|
start_time = time.time() * 1000
|
||||||
|
|
||||||
def inner_func(txn, *args, **kwargs):
|
def inner_func(conn, *args, **kwargs):
|
||||||
with LoggingContext("runInteraction") as context:
|
with LoggingContext("runInteraction") as context:
|
||||||
current_context.copy_to(context)
|
current_context.copy_to(context)
|
||||||
start = time.time() * 1000
|
start = time.time() * 1000
|
||||||
|
@ -362,10 +352,24 @@ class SQLBaseStore(object):
|
||||||
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
|
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
|
||||||
transaction_logger.debug("[TXN START] {%s}", name)
|
transaction_logger.debug("[TXN START] {%s}", name)
|
||||||
try:
|
try:
|
||||||
return func(
|
i = 0
|
||||||
LoggingTransaction(txn, name, self.database_engine),
|
N = 5
|
||||||
*args, **kwargs
|
while True:
|
||||||
)
|
try:
|
||||||
|
txn = conn.cursor()
|
||||||
|
return func(
|
||||||
|
LoggingTransaction(txn, name, self.database_engine),
|
||||||
|
*args, **kwargs
|
||||||
|
)
|
||||||
|
except self.database_engine.module.DatabaseError as e:
|
||||||
|
logger.warn("[TXN DEADLOCK] {%s} %r, %r", name, e.errno, e.sqlstate)
|
||||||
|
if self.database_engine.is_deadlock(e):
|
||||||
|
logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
|
||||||
|
if i < N:
|
||||||
|
i += 1
|
||||||
|
conn.rollback()
|
||||||
|
continue
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("[TXN FAIL] {%s}", name, e)
|
logger.debug("[TXN FAIL] {%s}", name, e)
|
||||||
raise
|
raise
|
||||||
|
@ -380,7 +384,7 @@ class SQLBaseStore(object):
|
||||||
sql_txn_timer.inc_by(duration, desc)
|
sql_txn_timer.inc_by(duration, desc)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
result = yield self._db_pool.runInteraction(
|
result = yield self._db_pool.runWithConnection(
|
||||||
inner_func, *args, **kwargs
|
inner_func, *args, **kwargs
|
||||||
)
|
)
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
Loading…
Reference in New Issue