parent
79eded1ae4
commit
fdd1a62e8d
|
@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
||||
from ._base import SQLBaseStore, db_to_json
|
||||
|
||||
|
@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
|
|||
)
|
||||
)
|
||||
|
||||
SENTINEL = object()
|
||||
|
||||
|
||||
class TransactionStore(SQLBaseStore):
|
||||
"""A collection of queries for handling PDUs.
|
||||
|
@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore):
|
|||
|
||||
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
|
||||
|
||||
self._destination_retry_cache = ExpiringCache(
|
||||
cache_name="get_destination_retry_timings",
|
||||
clock=self._clock,
|
||||
expiry_ms=5 * 60 * 1000,
|
||||
)
|
||||
|
||||
def get_received_txn_response(self, transaction_id, origin):
|
||||
"""For an incoming transaction from a given origin, check if we have
|
||||
already responded to it. If so, return the response code and response
|
||||
|
@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore):
|
|||
"""
|
||||
pass
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_destination_retry_timings(self, destination):
|
||||
"""Gets the current retry timings (if any) for a given destination.
|
||||
|
||||
|
@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore):
|
|||
None if not retrying
|
||||
Otherwise a dict for the retry scheme
|
||||
"""
|
||||
return self.runInteraction(
|
||||
|
||||
result = self._destination_retry_cache.get(destination, SENTINEL)
|
||||
if result is not SENTINEL:
|
||||
defer.returnValue(result)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
"get_destination_retry_timings",
|
||||
self._get_destination_retry_timings, destination)
|
||||
|
||||
# We don't hugely care about race conditions between getting and
|
||||
# invalidating the cache, since we time out fairly quickly anyway.
|
||||
self._destination_retry_cache[destination] = result
|
||||
defer.returnValue(result)
|
||||
|
||||
def _get_destination_retry_timings(self, txn, destination):
|
||||
result = self._simple_select_one_txn(
|
||||
txn,
|
||||
|
@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore):
|
|||
retry_interval (int) - how long until next retry in ms
|
||||
"""
|
||||
|
||||
self._destination_retry_cache.pop(destination)
|
||||
return self.runInteraction(
|
||||
"set_destination_retry_timings",
|
||||
self._set_destination_retry_timings,
|
||||
|
|
|
@ -24,6 +24,9 @@ from synapse.util.caches import register_cache
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
SENTINEL = object()
|
||||
|
||||
|
||||
class ExpiringCache(object):
|
||||
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
|
||||
reset_expiry_on_get=False, iterable=False):
|
||||
|
@ -102,6 +105,16 @@ class ExpiringCache(object):
|
|||
|
||||
return entry.value
|
||||
|
||||
def pop(self, key, default=None):
|
||||
value = self._cache.pop(key, SENTINEL)
|
||||
if value is SENTINEL:
|
||||
return default
|
||||
|
||||
if self.iterable:
|
||||
self._size_estimate -= len(value.value)
|
||||
|
||||
return value
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self._cache
|
||||
|
||||
|
|
Loading…
Reference in New Issue