Merge pull request #1188 from matrix-org/erikj/sent_transactions
Remove sent_transactions table.pull/1190/head
commit
2746e805fe
|
@ -16,13 +16,12 @@
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
import itertools
|
|
||||||
import logging
|
import logging
|
||||||
import ujson as json
|
import ujson as json
|
||||||
|
|
||||||
|
@ -50,20 +49,6 @@ class TransactionStore(SQLBaseStore):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(TransactionStore, self).__init__(hs)
|
super(TransactionStore, self).__init__(hs)
|
||||||
|
|
||||||
# New transactions that are currently in flights
|
|
||||||
self.inflight_transactions = {}
|
|
||||||
|
|
||||||
# Newly delievered transactions that *weren't* persisted while in flight
|
|
||||||
self.new_delivered_transactions = {}
|
|
||||||
|
|
||||||
# Newly delivered transactions that *were* persisted while in flight
|
|
||||||
self.update_delivered_transactions = {}
|
|
||||||
|
|
||||||
self.last_transaction = {}
|
|
||||||
|
|
||||||
reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
|
|
||||||
self._clock.looping_call(self._persist_in_mem_txns, 1000)
|
|
||||||
|
|
||||||
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
|
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
|
||||||
|
|
||||||
def get_received_txn_response(self, transaction_id, origin):
|
def get_received_txn_response(self, transaction_id, origin):
|
||||||
|
@ -148,46 +133,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
list: A list of previous transaction ids.
|
list: A list of previous transaction ids.
|
||||||
"""
|
"""
|
||||||
|
return defer.succeed([])
|
||||||
auto_id = self._transaction_id_gen.get_next()
|
|
||||||
|
|
||||||
txn_row = _TransactionRow(
|
|
||||||
id=auto_id,
|
|
||||||
transaction_id=transaction_id,
|
|
||||||
destination=destination,
|
|
||||||
ts=origin_server_ts,
|
|
||||||
response_code=0,
|
|
||||||
response_json=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
|
|
||||||
|
|
||||||
prev_txn = self.last_transaction.get(destination)
|
|
||||||
if prev_txn:
|
|
||||||
return defer.succeed(prev_txn)
|
|
||||||
else:
|
|
||||||
return self.runInteraction(
|
|
||||||
"_get_prevs_txn",
|
|
||||||
self._get_prevs_txn,
|
|
||||||
destination,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_prevs_txn(self, txn, destination):
|
|
||||||
# First we find out what the prev_txns should be.
|
|
||||||
# Since we know that we are only sending one transaction at a time,
|
|
||||||
# we can simply take the last one.
|
|
||||||
query = (
|
|
||||||
"SELECT * FROM sent_transactions"
|
|
||||||
" WHERE destination = ?"
|
|
||||||
" ORDER BY id DESC LIMIT 1"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(query, (destination,))
|
|
||||||
results = self.cursor_to_dict(txn)
|
|
||||||
|
|
||||||
prev_txns = [r["transaction_id"] for r in results]
|
|
||||||
|
|
||||||
return prev_txns
|
|
||||||
|
|
||||||
def delivered_txn(self, transaction_id, destination, code, response_dict):
|
def delivered_txn(self, transaction_id, destination, code, response_dict):
|
||||||
"""Persists the response for an outgoing transaction.
|
"""Persists the response for an outgoing transaction.
|
||||||
|
@ -198,52 +144,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
code (int)
|
code (int)
|
||||||
response_json (str)
|
response_json (str)
|
||||||
"""
|
"""
|
||||||
|
pass
|
||||||
txn_row = self.inflight_transactions.get(
|
|
||||||
destination, {}
|
|
||||||
).pop(transaction_id, None)
|
|
||||||
|
|
||||||
self.last_transaction[destination] = transaction_id
|
|
||||||
|
|
||||||
if txn_row:
|
|
||||||
d = self.new_delivered_transactions.setdefault(destination, {})
|
|
||||||
d[transaction_id] = txn_row._replace(
|
|
||||||
response_code=code,
|
|
||||||
response_json=None, # For now, don't persist response
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
d = self.update_delivered_transactions.setdefault(destination, {})
|
|
||||||
# For now, don't persist response
|
|
||||||
d[transaction_id] = _UpdateTransactionRow(code, None)
|
|
||||||
|
|
||||||
def get_transactions_after(self, transaction_id, destination):
|
|
||||||
"""Get all transactions after a given local transaction_id.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
transaction_id (str)
|
|
||||||
destination (str)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: A list of dicts
|
|
||||||
"""
|
|
||||||
return self.runInteraction(
|
|
||||||
"get_transactions_after",
|
|
||||||
self._get_transactions_after, transaction_id, destination
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_transactions_after(self, txn, transaction_id, destination):
|
|
||||||
query = (
|
|
||||||
"SELECT * FROM sent_transactions"
|
|
||||||
" WHERE destination = ? AND id >"
|
|
||||||
" ("
|
|
||||||
" SELECT id FROM sent_transactions"
|
|
||||||
" WHERE transaction_id = ? AND destination = ?"
|
|
||||||
" )"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(query, (destination, transaction_id, destination))
|
|
||||||
|
|
||||||
return self.cursor_to_dict(txn)
|
|
||||||
|
|
||||||
@cached(max_entries=10000)
|
@cached(max_entries=10000)
|
||||||
def get_destination_retry_timings(self, destination):
|
def get_destination_retry_timings(self, destination):
|
||||||
|
@ -339,58 +240,11 @@ class TransactionStore(SQLBaseStore):
|
||||||
txn.execute(query, (self._clock.time_msec(),))
|
txn.execute(query, (self._clock.time_msec(),))
|
||||||
return self.cursor_to_dict(txn)
|
return self.cursor_to_dict(txn)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _persist_in_mem_txns(self):
|
|
||||||
try:
|
|
||||||
inflight = self.inflight_transactions
|
|
||||||
new_delivered = self.new_delivered_transactions
|
|
||||||
update_delivered = self.update_delivered_transactions
|
|
||||||
|
|
||||||
self.inflight_transactions = {}
|
|
||||||
self.new_delivered_transactions = {}
|
|
||||||
self.update_delivered_transactions = {}
|
|
||||||
|
|
||||||
full_rows = [
|
|
||||||
row._asdict()
|
|
||||||
for txn_map in itertools.chain(inflight.values(), new_delivered.values())
|
|
||||||
for row in txn_map.values()
|
|
||||||
]
|
|
||||||
|
|
||||||
def f(txn):
|
|
||||||
if full_rows:
|
|
||||||
self._simple_insert_many_txn(
|
|
||||||
txn=txn,
|
|
||||||
table="sent_transactions",
|
|
||||||
values=full_rows
|
|
||||||
)
|
|
||||||
|
|
||||||
for dest, txn_map in update_delivered.items():
|
|
||||||
for txn_id, update_row in txn_map.items():
|
|
||||||
self._simple_update_one_txn(
|
|
||||||
txn,
|
|
||||||
table="sent_transactions",
|
|
||||||
keyvalues={
|
|
||||||
"transaction_id": txn_id,
|
|
||||||
"destination": dest,
|
|
||||||
},
|
|
||||||
updatevalues={
|
|
||||||
"response_code": update_row.response_code,
|
|
||||||
"response_json": None, # For now, don't persist response
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if full_rows or update_delivered:
|
|
||||||
yield self.runInteraction("_persist_in_mem_txns", f)
|
|
||||||
except:
|
|
||||||
logger.exception("Failed to persist transactions!")
|
|
||||||
|
|
||||||
def _cleanup_transactions(self):
|
def _cleanup_transactions(self):
|
||||||
now = self._clock.time_msec()
|
now = self._clock.time_msec()
|
||||||
month_ago = now - 30 * 24 * 60 * 60 * 1000
|
month_ago = now - 30 * 24 * 60 * 60 * 1000
|
||||||
six_hours_ago = now - 6 * 60 * 60 * 1000
|
|
||||||
|
|
||||||
def _cleanup_transactions_txn(txn):
|
def _cleanup_transactions_txn(txn):
|
||||||
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
|
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
|
||||||
txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,))
|
|
||||||
|
|
||||||
return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn)
|
return self.runInteraction("_cleanup_transactions", _cleanup_transactions_txn)
|
||||||
|
|
Loading…
Reference in New Issue