diff --git a/bgpranking/dbinsert.py b/bgpranking/dbinsert.py index 08835b8..0e168fa 100644 --- a/bgpranking/dbinsert.py +++ b/bgpranking/dbinsert.py @@ -47,7 +47,6 @@ class DatabaseInsert(): for_query.append({'ip': data['ip'], 'address_family': data['address_family'], 'source': 'caida', 'date': data['datetime'], 'precision_delta': {'days': 3}}) responses = self.ipasn.mass_query(for_query) - retry = [] done = [] ardb_pipeline = self.ardb_storage.pipeline(transaction=False) @@ -70,6 +69,12 @@ class DatabaseInsert(): # routing info is missing, need to try again later. retry.append(uuid) continue + if 'asn' in entry and entry['asn'] is None: + self.logger.warning(f"Unable to find the AS number associated to {data['ip']} - {data['datetime']} (got None). This should not happen...") + continue + if 'prefix' in entry and entry['prefix'] is None: + self.logger.warning(f"Unable to find the prefix associated to {data['ip']} - {data['datetime']} (got None). This should not happen...") + continue # Format: |sources -> set([, ...]) ardb_pipeline.sadd(f"{data['date']}|sources", data['source']) diff --git a/bgpranking/ranking.py b/bgpranking/ranking.py index 6e5c5ce..c47f014 100644 --- a/bgpranking/ranking.py +++ b/bgpranking/ranking.py @@ -3,7 +3,7 @@ import logging from redis import StrictRedis -from .libs.helpers import set_running, unset_running, get_socket_path, load_config_files +from .libs.helpers import set_running, unset_running, get_socket_path, load_config_files, get_ipasn, sanity_check_ipasn from datetime import datetime, date, timedelta from ipaddress import ip_network from pathlib import Path @@ -15,7 +15,7 @@ class Ranking(): self.__init_logger(loglevel) self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True) self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True) - self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True) + self.ipasn = get_ipasn() self.config_dir = config_dir def __init_logger(self, loglevel): @@ -23,8 +23,6 @@ class Ranking(): self.logger.setLevel(loglevel) def rank_a_day(self, day: str): - # FIXME: If we want to rank an older date, we need to have older datasets for the announces - v4_last, v6_last = self.asn_meta.mget('v4|last', 'v6|last') asns_aggregation_key_v4 = f'{day}|asns|v4' asns_aggregation_key_v6 = f'{day}|asns|v6' to_delete = set([asns_aggregation_key_v4, asns_aggregation_key_v6]) @@ -46,8 +44,8 @@ class Ranking(): asn_rank_v6 = 0.0 for prefix in self.storage.smembers(f'{day}|{source}|{asn}'): if prefix == 'None': - # FIXME, this should not happen - self.logger.warning(f'Fucked up prefix in "{day}|{source}|{asn}"') + # This should not happen and requires a DB cleanup. + self.logger.critical(f'Fucked up prefix in "{day}|{source}|{asn}"') continue ips = set([ip_ts.split('|')[0] for ip_ts in self.storage.smembers(f'{day}|{source}|{asn}|{prefix}')]) @@ -60,7 +58,12 @@ class Ranking(): else: asn_rank_v6 += len(ips) * self.config_files[source]['impact'] r_pipeline.zincrby(prefixes_aggregation_key_v6, prefix, prefix_rank * self.config_files[source]['impact']) - v4count, v6count = self.asn_meta.mget(f'{v4_last}|{asn}|v4|ipcount', f'{v6_last}|{asn}|v6|ipcount') + v4info = self.ipasn.asn_meta(asn=asn, source='caida', address_family='v4', date=day) + v6info = self.ipasn.asn_meta(asn=asn, source='caida', address_family='v6', date=day) + ipasnhistory_date_v4 = list(v4info['response'].keys())[0] + v4count = v4info['response'][ipasnhistory_date_v4][asn]['ipcount'] + ipasnhistory_date_v6 = list(v6info['response'].keys())[0] + v6count = v6info['response'][ipasnhistory_date_v6][asn]['ipcount'] if v4count: asn_rank_v4 /= float(v4count) if asn_rank_v4: @@ -78,12 +81,15 @@ class Ranking(): def compute(self): self.config_files = load_config_files(self.config_dir) + ready, message = sanity_check_ipasn(self.ipasn) + if not ready: + # Try again later. + self.logger.warning(message) + return + self.logger.debug(message) + self.logger.info('Start ranking') set_running(self.__class__.__name__) - if not self.asn_meta.exists('v4|last') or not self.asn_meta.exists('v6|last'): - '''Failsafe if asn_meta has not been populated yet''' - unset_running(self.__class__.__name__) - return today = date.today() now = datetime.now() today12am = now.replace(hour=12, minute=0, second=0, microsecond=0)