Make _check_sigs_and_hash_and_fetch use batch api for verify keys

erikj/persist_event_perf
Erik Johnston 2015-06-24 11:28:07 +01:00
parent a29319fefa
commit 44e3bed2a1
2 changed files with 45 additions and 39 deletions

View File

@ -243,14 +243,17 @@ class Keyring(object):
type(e).__name__, str(e.message), type(e).__name__, str(e.message),
) )
results = yield defer.gatherResults([ results = yield defer.gatherResults(
get_key(p_name, p_keys) [
for p_name, p_keys in self.perspective_servers.items() get_key(p_name, p_keys)
]) for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
union_of_keys = {} union_of_keys = {}
for result in results: for result in results:
for server_name, keys in results.items(): for server_name, keys in result.items():
union_of_keys.setdefault(server_name, {}).update(keys) union_of_keys.setdefault(server_name, {}).update(keys)
defer.returnValue(union_of_keys) defer.returnValue(union_of_keys)

View File

@ -53,47 +53,50 @@ class FederationBase(object):
signed_pdus = [] signed_pdus = []
@defer.inlineCallbacks deferreds = self._check_sigs_and_hashes(pdus)
def do(pdu):
try: def callback(pdu):
new_pdu = yield self._check_sigs_and_hash(pdu) signed_pdus.append(pdu)
def errback(failure, pdu):
failure.trap(SynapseError)
# Check local db.
new_pdu = yield self.store.get_event(
pdu.event_id,
allow_rejected=True,
allow_none=True,
)
if new_pdu:
signed_pdus.append(new_pdu) signed_pdus.append(new_pdu)
except SynapseError: return
# FIXME: We should handle signature failures more gracefully.
# Check local db. # Check pdu.origin
new_pdu = yield self.store.get_event( if pdu.origin != origin:
pdu.event_id, try:
allow_rejected=True, new_pdu = yield self.get_pdu(
allow_none=True, destinations=[pdu.origin],
) event_id=pdu.event_id,
if new_pdu: outlier=outlier,
signed_pdus.append(new_pdu) timeout=10000,
return )
# Check pdu.origin if new_pdu:
if pdu.origin != origin: signed_pdus.append(new_pdu)
try: return
new_pdu = yield self.get_pdu( except:
destinations=[pdu.origin], pass
event_id=pdu.event_id,
outlier=outlier,
timeout=10000,
)
if new_pdu: logger.warn(
signed_pdus.append(new_pdu) "Failed to find copy of %s with valid signature",
return pdu.event_id,
except: )
pass
logger.warn( for pdu, deferred in zip(pdus, deferreds):
"Failed to find copy of %s with valid signature", deferred.addCallbacks(callback, errback, errbackArgs=[pdu])
pdu.event_id,
)
yield defer.gatherResults( yield defer.gatherResults(
[do(pdu) for pdu in pdus], deferreds,
consumeErrors=True consumeErrors=True
).addErrback(unwrapFirstError) ).addErrback(unwrapFirstError)