fix: Use ipcount from IPASN History

pull/12/head
Raphaël Vinot 2018-11-15 19:05:57 +01:00
parent 5ce2f91430
commit 4afd45356d
2 changed files with 23 additions and 12 deletions

View File

@ -47,7 +47,6 @@ class DatabaseInsert():
for_query.append({'ip': data['ip'], 'address_family': data['address_family'], 'source': 'caida', for_query.append({'ip': data['ip'], 'address_family': data['address_family'], 'source': 'caida',
'date': data['datetime'], 'precision_delta': {'days': 3}}) 'date': data['datetime'], 'precision_delta': {'days': 3}})
responses = self.ipasn.mass_query(for_query) responses = self.ipasn.mass_query(for_query)
retry = [] retry = []
done = [] done = []
ardb_pipeline = self.ardb_storage.pipeline(transaction=False) ardb_pipeline = self.ardb_storage.pipeline(transaction=False)
@ -70,6 +69,12 @@ class DatabaseInsert():
# routing info is missing, need to try again later. # routing info is missing, need to try again later.
retry.append(uuid) retry.append(uuid)
continue continue
if 'asn' in entry and entry['asn'] is None:
self.logger.warning(f"Unable to find the AS number associated to {data['ip']} - {data['datetime']} (got None). This should not happen...")
continue
if 'prefix' in entry and entry['prefix'] is None:
self.logger.warning(f"Unable to find the prefix associated to {data['ip']} - {data['datetime']} (got None). This should not happen...")
continue
# Format: <YYYY-MM-DD>|sources -> set([<source>, ...]) # Format: <YYYY-MM-DD>|sources -> set([<source>, ...])
ardb_pipeline.sadd(f"{data['date']}|sources", data['source']) ardb_pipeline.sadd(f"{data['date']}|sources", data['source'])

View File

@ -3,7 +3,7 @@
import logging import logging
from redis import StrictRedis from redis import StrictRedis
from .libs.helpers import set_running, unset_running, get_socket_path, load_config_files from .libs.helpers import set_running, unset_running, get_socket_path, load_config_files, get_ipasn, sanity_check_ipasn
from datetime import datetime, date, timedelta from datetime import datetime, date, timedelta
from ipaddress import ip_network from ipaddress import ip_network
from pathlib import Path from pathlib import Path
@ -15,7 +15,7 @@ class Ranking():
self.__init_logger(loglevel) self.__init_logger(loglevel)
self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True) self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True) self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True)
self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True) self.ipasn = get_ipasn()
self.config_dir = config_dir self.config_dir = config_dir
def __init_logger(self, loglevel): def __init_logger(self, loglevel):
@ -23,8 +23,6 @@ class Ranking():
self.logger.setLevel(loglevel) self.logger.setLevel(loglevel)
def rank_a_day(self, day: str): def rank_a_day(self, day: str):
# FIXME: If we want to rank an older date, we need to have older datasets for the announces
v4_last, v6_last = self.asn_meta.mget('v4|last', 'v6|last')
asns_aggregation_key_v4 = f'{day}|asns|v4' asns_aggregation_key_v4 = f'{day}|asns|v4'
asns_aggregation_key_v6 = f'{day}|asns|v6' asns_aggregation_key_v6 = f'{day}|asns|v6'
to_delete = set([asns_aggregation_key_v4, asns_aggregation_key_v6]) to_delete = set([asns_aggregation_key_v4, asns_aggregation_key_v6])
@ -46,8 +44,8 @@ class Ranking():
asn_rank_v6 = 0.0 asn_rank_v6 = 0.0
for prefix in self.storage.smembers(f'{day}|{source}|{asn}'): for prefix in self.storage.smembers(f'{day}|{source}|{asn}'):
if prefix == 'None': if prefix == 'None':
# FIXME, this should not happen # This should not happen and requires a DB cleanup.
self.logger.warning(f'Fucked up prefix in "{day}|{source}|{asn}"') self.logger.critical(f'Fucked up prefix in "{day}|{source}|{asn}"')
continue continue
ips = set([ip_ts.split('|')[0] ips = set([ip_ts.split('|')[0]
for ip_ts in self.storage.smembers(f'{day}|{source}|{asn}|{prefix}')]) for ip_ts in self.storage.smembers(f'{day}|{source}|{asn}|{prefix}')])
@ -60,7 +58,12 @@ class Ranking():
else: else:
asn_rank_v6 += len(ips) * self.config_files[source]['impact'] asn_rank_v6 += len(ips) * self.config_files[source]['impact']
r_pipeline.zincrby(prefixes_aggregation_key_v6, prefix, prefix_rank * self.config_files[source]['impact']) r_pipeline.zincrby(prefixes_aggregation_key_v6, prefix, prefix_rank * self.config_files[source]['impact'])
v4count, v6count = self.asn_meta.mget(f'{v4_last}|{asn}|v4|ipcount', f'{v6_last}|{asn}|v6|ipcount') v4info = self.ipasn.asn_meta(asn=asn, source='caida', address_family='v4', date=day)
v6info = self.ipasn.asn_meta(asn=asn, source='caida', address_family='v6', date=day)
ipasnhistory_date_v4 = list(v4info['response'].keys())[0]
v4count = v4info['response'][ipasnhistory_date_v4][asn]['ipcount']
ipasnhistory_date_v6 = list(v6info['response'].keys())[0]
v6count = v6info['response'][ipasnhistory_date_v6][asn]['ipcount']
if v4count: if v4count:
asn_rank_v4 /= float(v4count) asn_rank_v4 /= float(v4count)
if asn_rank_v4: if asn_rank_v4:
@ -78,12 +81,15 @@ class Ranking():
def compute(self): def compute(self):
self.config_files = load_config_files(self.config_dir) self.config_files = load_config_files(self.config_dir)
ready, message = sanity_check_ipasn(self.ipasn)
if not ready:
# Try again later.
self.logger.warning(message)
return
self.logger.debug(message)
self.logger.info('Start ranking') self.logger.info('Start ranking')
set_running(self.__class__.__name__) set_running(self.__class__.__name__)
if not self.asn_meta.exists('v4|last') or not self.asn_meta.exists('v6|last'):
'''Failsafe if asn_meta has not been populated yet'''
unset_running(self.__class__.__name__)
return
today = date.today() today = date.today()
now = datetime.now() now = datetime.now()
today12am = now.replace(hour=12, minute=0, second=0, microsecond=0) today12am = now.replace(hour=12, minute=0, second=0, microsecond=0)