chg: Handle connecting to IPASN History safely

pull/12/head
Raphaël Vinot 2018-11-28 11:52:34 +01:00
parent 0fed442b1d
commit 5e89cae640
2 changed files with 17 additions and 5 deletions

View File

@ -29,7 +29,7 @@ class DatabaseInsert():
set_running(self.__class__.__name__) set_running(self.__class__.__name__)
while True: while True:
if shutdown_requested(): if shutdown_requested() or not self.ipasn.is_up:
break break
uuids = self.redis_sanitized.spop('to_insert', 100) uuids = self.redis_sanitized.spop('to_insert', 100)
if not uuids: if not uuids:
@ -46,7 +46,13 @@ class DatabaseInsert():
continue continue
for_query.append({'ip': data['ip'], 'address_family': data['address_family'], 'source': 'caida', for_query.append({'ip': data['ip'], 'address_family': data['address_family'], 'source': 'caida',
'date': data['datetime'], 'precision_delta': {'days': 3}}) 'date': data['datetime'], 'precision_delta': {'days': 3}})
try:
responses = self.ipasn.mass_query(for_query) responses = self.ipasn.mass_query(for_query)
except Exception:
self.logger.exception('Mass query in IPASN History failed, trying again later.')
# Rollback the spop
self.redis_sanitized.sadd('to_insert', *uuids)
break
retry = [] retry = []
done = [] done = []
ardb_pipeline = self.ardb_storage.pipeline(transaction=False) ardb_pipeline = self.ardb_storage.pipeline(transaction=False)

View File

@ -33,7 +33,7 @@ class Sanitizer():
set_running(self.__class__.__name__) set_running(self.__class__.__name__)
while True: while True:
if shutdown_requested(): if shutdown_requested() or not self.ipasn.is_up:
break break
uuids = self.redis_intake.spop('intake', 100) uuids = self.redis_intake.spop('intake', 100)
if not uuids: if not uuids:
@ -70,6 +70,12 @@ class Sanitizer():
pipeline.execute() pipeline.execute()
self.redis_intake.delete(*uuids) self.redis_intake.delete(*uuids)
try:
# Just cache everything so the lookup scripts can do their thing. # Just cache everything so the lookup scripts can do their thing.
self.ipasn.mass_cache(for_cache) self.ipasn.mass_cache(for_cache)
except Exception:
self.logger.exception('Mass cache in IPASN History failed, trying again later.')
# Rollback the spop
self.redis_intake.sadd('intake', *uuids)
break
unset_running(self.__class__.__name__) unset_running(self.__class__.__name__)