Fix 'age' key to update on retries
parent
34d7896b06
commit
6ac0b4ade8
|
@ -292,8 +292,8 @@ class ReplicationLayer(object):
|
|||
transaction = Transaction(**transaction_data)
|
||||
|
||||
for p in transaction.pdus:
|
||||
if "age" in p:
|
||||
p["age_ts"] = int(self.clock.time_msec()) - int(p["age"])
|
||||
if "age_ts" in p:
|
||||
p["age"] = int(self._clock.time_msec()) - int(p["age_ts"])
|
||||
|
||||
pdu_list = [Pdu(**p) for p in transaction.pdus]
|
||||
|
||||
|
@ -602,8 +602,21 @@ class _TransactionQueue(object):
|
|||
logger.debug("TX [%s] Sending transaction...", destination)
|
||||
|
||||
# Actually send the transaction
|
||||
|
||||
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
||||
# keys work
|
||||
def cb(transaction):
|
||||
now = int(self._clock.time_msec())
|
||||
if "pdus" in transaction:
|
||||
for p in transaction["pdus"]:
|
||||
if "age_ts" in p:
|
||||
p["age"] = now - int(p["age_ts"])
|
||||
|
||||
return transaction
|
||||
|
||||
code, response = yield self.transport_layer.send_transaction(
|
||||
transaction
|
||||
transaction,
|
||||
on_send_callback=cb,
|
||||
)
|
||||
|
||||
logger.debug("TX [%s] Sent transaction", destination)
|
||||
|
|
|
@ -144,7 +144,7 @@ class TransportLayer(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_transaction(self, transaction):
|
||||
def send_transaction(self, transaction, on_send_callback=None):
|
||||
""" Sends the given Transaction to it's destination
|
||||
|
||||
Args:
|
||||
|
@ -165,10 +165,23 @@ class TransportLayer(object):
|
|||
|
||||
data = transaction.get_dict()
|
||||
|
||||
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
||||
# keys work
|
||||
def cb(destination, method, path_bytes, producer):
|
||||
if not on_send_callback:
|
||||
return
|
||||
|
||||
transaction = json.loads(producer.body)
|
||||
|
||||
new_transaction = on_send_callback(transaction)
|
||||
|
||||
producer.reset(new_transaction)
|
||||
|
||||
code, response = yield self.client.put_json(
|
||||
transaction.destination,
|
||||
path=PREFIX + "/send/%s/" % transaction.transaction_id,
|
||||
data=data
|
||||
data=data,
|
||||
on_send_callback=cb,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
|
|
|
@ -122,7 +122,7 @@ class TwistedHttpClient(HttpClient):
|
|||
self.hs = hs
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def put_json(self, destination, path, data):
|
||||
def put_json(self, destination, path, data, on_send_callback=None):
|
||||
if destination in _destination_mappings:
|
||||
destination = _destination_mappings[destination]
|
||||
|
||||
|
@ -131,7 +131,8 @@ class TwistedHttpClient(HttpClient):
|
|||
"PUT",
|
||||
path.encode("ascii"),
|
||||
producer=_JsonProducer(data),
|
||||
headers_dict={"Content-Type": ["application/json"]}
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
on_send_callback=on_send_callback,
|
||||
)
|
||||
|
||||
logger.debug("Getting resp body")
|
||||
|
@ -218,7 +219,7 @@ class TwistedHttpClient(HttpClient):
|
|||
@defer.inlineCallbacks
|
||||
def _create_request(self, destination, method, path_bytes, param_bytes=b"",
|
||||
query_bytes=b"", producer=None, headers_dict={},
|
||||
retry_on_dns_fail=True):
|
||||
retry_on_dns_fail=True, on_send_callback=None):
|
||||
""" Creates and sends a request to the given url
|
||||
"""
|
||||
headers_dict[b"User-Agent"] = [b"Synapse"]
|
||||
|
@ -242,6 +243,9 @@ class TwistedHttpClient(HttpClient):
|
|||
endpoint = self._getEndpoint(reactor, destination);
|
||||
|
||||
while True:
|
||||
if on_send_callback:
|
||||
on_send_callback(destination, method, path_bytes, producer)
|
||||
|
||||
try:
|
||||
response = yield self.agent.request(
|
||||
destination,
|
||||
|
@ -310,6 +314,9 @@ class _JsonProducer(object):
|
|||
""" Used by the twisted http client to create the HTTP body from json
|
||||
"""
|
||||
def __init__(self, jsn):
|
||||
self.reset(jsn)
|
||||
|
||||
def reset(self, jsn):
|
||||
self.body = encode_canonical_json(jsn)
|
||||
self.length = len(self.body)
|
||||
|
||||
|
|
Loading…
Reference in New Issue