_simple_upsert: retry on IntegrityError

wrap the call to _simple_upsert_txn in a loop so that we retry on an
integrityerror: this means we can avoid locking the table provided there is an
unique index.
pull/2684/head
Richard van der Hoff 2017-11-16 15:30:15 +00:00
parent cdc9e50a5d
commit 10aaa1bc15
1 changed files with 29 additions and 6 deletions

View File

@ -469,23 +469,46 @@ class SQLBaseStore(object):
txn.executemany(sql, vals) txn.executemany(sql, vals)
@defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values, def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True): insertion_values={}, desc="_simple_upsert", lock=True):
""" """
`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:
* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
* we somehow know that we are the only thread which will be updating
this table.
Args: Args:
table (str): The table to upsert into table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values values (dict): The nonunique columns and their new values
insertion_values (dict): key/values to use when inserting insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
Returns: Returns:
Deferred(bool): True if a new entry was created, False if an Deferred(bool): True if a new entry was created, False if an
existing one was updated. existing one was updated.
""" """
return self.runInteraction( while True:
desc, try:
self._simple_upsert_txn, table, keyvalues, values, insertion_values, result = yield self.runInteraction(
lock desc,
) self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock=lock
)
defer.returnValue(result)
except self.database_engine.IntegrityError as e:
# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
table, e
)
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True): lock=True):