Merge pull request #1090 from matrix-org/erikj/transaction_queue_check

Fix tightloop on sending transaction
pull/1093/head
Erik Johnston 2016-09-09 13:19:44 +01:00 committed by GitHub
commit 0b6b999e7b
1 changed files with 132 additions and 120 deletions

View File

@ -209,11 +209,13 @@ class TransactionQueue(object):
) )
return return
yield self._send_new_transaction( success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures, destination, pending_pdus, pending_edus, pending_failures,
device_stream_id, device_stream_id,
should_delete_from_device_stream=bool(device_message_edus) should_delete_from_device_stream=bool(device_message_edus)
) )
if not success:
break
finally: finally:
# We want to be *very* sure we delete this after we stop processing # We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None) self.pending_transactions.pop(destination, None)
@ -248,6 +250,8 @@ class TransactionQueue(object):
edus = pending_edus edus = pending_edus
failures = [x.get_dict() for x in pending_failures] failures = [x.get_dict() for x in pending_failures]
success = True
try: try:
logger.debug("TX [%s] _attempt_new_transaction", destination) logger.debug("TX [%s] _attempt_new_transaction", destination)
@ -347,6 +351,7 @@ class TransactionQueue(object):
logger.info( logger.info(
"Failed to send event %s to %s", p.event_id, destination "Failed to send event %s to %s", p.event_id, destination
) )
success = False
else: else:
# Remove the acknowledged device messages from the database # Remove the acknowledged device messages from the database
if should_delete_from_device_stream: if should_delete_from_device_stream:
@ -360,6 +365,7 @@ class TransactionQueue(object):
"dropping transaction for now", "dropping transaction for now",
destination, destination,
) )
success = False
except RuntimeError as e: except RuntimeError as e:
# We capture this here as there as nothing actually listens # We capture this here as there as nothing actually listens
# for this finishing functions deferred. # for this finishing functions deferred.
@ -369,6 +375,8 @@ class TransactionQueue(object):
e, e,
) )
success = False
for p in pdus: for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination) logger.info("Failed to send event %s to %s", p.event_id, destination)
except Exception as e: except Exception as e:
@ -380,5 +388,9 @@ class TransactionQueue(object):
e, e,
) )
success = False
for p in pdus: for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination) logger.info("Failed to send event %s to %s", p.event_id, destination)
defer.returnValue(success)