Add support for sending failures
parent
d7412c4df1
commit
440cbd5235
|
@ -143,6 +143,11 @@ class ReplicationLayer(object):
|
||||||
self._transaction_queue.enqueue_edu(edu)
|
self._transaction_queue.enqueue_edu(edu)
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
@log_function
|
||||||
|
def send_failure(self, failure, destination):
|
||||||
|
self._transaction_queue.enqueue_failure(failure, destination)
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def make_query(self, destination, query_type, args,
|
def make_query(self, destination, query_type, args,
|
||||||
retry_on_dns_fail=True):
|
retry_on_dns_fail=True):
|
||||||
|
@ -558,6 +563,9 @@ class _TransactionQueue(object):
|
||||||
# destination -> list of tuple(edu, deferred)
|
# destination -> list of tuple(edu, deferred)
|
||||||
self.pending_edus_by_dest = {}
|
self.pending_edus_by_dest = {}
|
||||||
|
|
||||||
|
# destination -> list of tuple(failure, deferred)
|
||||||
|
self.pending_failures_by_dest = {}
|
||||||
|
|
||||||
# HACK to get unique tx id
|
# HACK to get unique tx id
|
||||||
self._next_txn_id = int(self._clock.time_msec())
|
self._next_txn_id = int(self._clock.time_msec())
|
||||||
|
|
||||||
|
@ -610,6 +618,18 @@ class _TransactionQueue(object):
|
||||||
|
|
||||||
return deferred
|
return deferred
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def enqueue_failure(self, failure, destination):
|
||||||
|
deferred = defer.Deferred()
|
||||||
|
|
||||||
|
self.pending_failures_by_dest.setdefault(
|
||||||
|
destination, []
|
||||||
|
).append(
|
||||||
|
(failure, deferred)
|
||||||
|
)
|
||||||
|
|
||||||
|
yield deferred
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def _attempt_new_transaction(self, destination):
|
def _attempt_new_transaction(self, destination):
|
||||||
|
@ -619,8 +639,9 @@ class _TransactionQueue(object):
|
||||||
# list of (pending_pdu, deferred, order)
|
# list of (pending_pdu, deferred, order)
|
||||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
|
pending_failures = self.pending_failures_by_dest(destination, [])
|
||||||
|
|
||||||
if not pending_pdus and not pending_edus:
|
if not pending_pdus and not pending_edus and not pending_failures:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("TX [%s] Attempting new transaction", destination)
|
logger.debug("TX [%s] Attempting new transaction", destination)
|
||||||
|
@ -630,7 +651,11 @@ class _TransactionQueue(object):
|
||||||
|
|
||||||
pdus = [x[0] for x in pending_pdus]
|
pdus = [x[0] for x in pending_pdus]
|
||||||
edus = [x[0] for x in pending_edus]
|
edus = [x[0] for x in pending_edus]
|
||||||
deferreds = [x[1] for x in pending_pdus + pending_edus]
|
failures = [x[0].get_dict() for x in pending_failures]
|
||||||
|
deferreds = [
|
||||||
|
x[1]
|
||||||
|
for x in pending_pdus + pending_edus + pending_failures
|
||||||
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.pending_transactions[destination] = 1
|
self.pending_transactions[destination] = 1
|
||||||
|
@ -644,6 +669,7 @@ class _TransactionQueue(object):
|
||||||
destination=destination,
|
destination=destination,
|
||||||
pdus=pdus,
|
pdus=pdus,
|
||||||
edus=edus,
|
edus=edus,
|
||||||
|
pdu_failures=failures,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._next_txn_id += 1
|
self._next_txn_id += 1
|
||||||
|
|
|
@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject):
|
||||||
"edus",
|
"edus",
|
||||||
"transaction_id",
|
"transaction_id",
|
||||||
"destination",
|
"destination",
|
||||||
|
"pdu_failures",
|
||||||
]
|
]
|
||||||
|
|
||||||
internal_keys = [
|
internal_keys = [
|
||||||
|
|
|
@ -128,3 +128,37 @@ class StreamToken(
|
||||||
d = self._asdict()
|
d = self._asdict()
|
||||||
d[key] = new_value
|
d[key] = new_value
|
||||||
return StreamToken(**d)
|
return StreamToken(**d)
|
||||||
|
|
||||||
|
|
||||||
|
class FederationError(RuntimeError):
|
||||||
|
""" This class is used to inform remote home servers about erroneous
|
||||||
|
PDUs they sent us.
|
||||||
|
|
||||||
|
FATAL: The remote server could not interpret the source event.
|
||||||
|
(e.g., it was missing a required field)
|
||||||
|
ERROR: The remote server interpreted the event, but it failed some other
|
||||||
|
check (e.g. auth)
|
||||||
|
WARN: The remote server accepted the event, but believes some part of it
|
||||||
|
is wrong (e.g., it referred to an invalid event)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, level, code, reason, affected, source=None):
|
||||||
|
if level not in ["FATAL", "ERROR", "WARN"]:
|
||||||
|
raise ValueError("Level is not valid: %s" % (level,))
|
||||||
|
self.level = level
|
||||||
|
self.code = code
|
||||||
|
self.reason = reason
|
||||||
|
self.affected = affected
|
||||||
|
self.source = source
|
||||||
|
|
||||||
|
msg = "%s %s: %s" % (level, code, reason,)
|
||||||
|
super(FederationError, self).__init__(msg)
|
||||||
|
|
||||||
|
def get_dict(self):
|
||||||
|
return {
|
||||||
|
"level": self.level,
|
||||||
|
"code": self.code,
|
||||||
|
"reason": self.reason,
|
||||||
|
"affected": self.affected,
|
||||||
|
"source": self.source if self.source else self.affected,
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue