fix: last commit.
parent
69ae93badb
commit
332a2aeed8
|
@ -50,7 +50,7 @@ class PrefixDatabase():
|
||||||
|
|
||||||
def _init_routes(self, address_family, root_url, path) -> bool:
|
def _init_routes(self, address_family, root_url, path) -> bool:
|
||||||
self.logger.debug(f'Loading {path}')
|
self.logger.debug(f'Loading {path}')
|
||||||
date = parse(re.findall('http://data.caida.org/datasets/routing/routeviews[6]?-prefix2as/(?:.*)/(?:.*)/routeviews-rv[2,6]-(.*)-(?:.*).pfx2as.gz', path)[0]).date().isoformat()
|
date = parse(re.findall('(?:.*)/(?:.*)/routeviews-rv[2,6]-(.*)-(?:.*).pfx2as.gz', path)[0]).date().isoformat()
|
||||||
r = requests.get(root_url.format(path))
|
r = requests.get(root_url.format(path))
|
||||||
to_import = defaultdict(lambda: {address_family: set(), 'ipcount': 0})
|
to_import = defaultdict(lambda: {address_family: set(), 'ipcount': 0})
|
||||||
with gzip.open(BytesIO(r.content), 'r') as f:
|
with gzip.open(BytesIO(r.content), 'r') as f:
|
||||||
|
@ -65,7 +65,7 @@ class PrefixDatabase():
|
||||||
p = self.prefix_cache.pipeline()
|
p = self.prefix_cache.pipeline()
|
||||||
p_asn_meta = self.asn_meta.pipeline()
|
p_asn_meta = self.asn_meta.pipeline()
|
||||||
p.sadd('asns', *to_import.keys())
|
p.sadd('asns', *to_import.keys())
|
||||||
p_asn_meta.sadd(f'{address_family}|last', date) # Not necessarely today
|
p_asn_meta.set(f'{address_family}|last', date) # Not necessarely today
|
||||||
p_asn_meta.sadd(f'{date}|asns|{address_family}', *to_import.keys())
|
p_asn_meta.sadd(f'{date}|asns|{address_family}', *to_import.keys())
|
||||||
for asn, data in to_import.items():
|
for asn, data in to_import.items():
|
||||||
p.sadd(f'{asn}|{address_family}', *data[address_family])
|
p.sadd(f'{asn}|{address_family}', *data[address_family])
|
||||||
|
@ -80,6 +80,8 @@ class PrefixDatabase():
|
||||||
def load_prefixes(self):
|
def load_prefixes(self):
|
||||||
set_running(self.__class__.__name__)
|
set_running(self.__class__.__name__)
|
||||||
self.prefix_cache.delete('ready')
|
self.prefix_cache.delete('ready')
|
||||||
|
self.asn_meta.delete('v4|last')
|
||||||
|
self.asn_meta.delete('v6|last')
|
||||||
self.logger.info('Prefix update starting in a few seconds.')
|
self.logger.info('Prefix update starting in a few seconds.')
|
||||||
time.sleep(15)
|
time.sleep(15)
|
||||||
v4_is_new, v4_path = self._has_new('v4', self.ipv4_url)
|
v4_is_new, v4_path = self._has_new('v4', self.ipv4_url)
|
||||||
|
|
|
@ -16,7 +16,6 @@ class Ranking():
|
||||||
self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
|
self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
|
||||||
self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True)
|
self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True)
|
||||||
self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True)
|
self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True)
|
||||||
self.prefix_cache = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=0, decode_responses=True)
|
|
||||||
self.config_files = load_config_files(config_dir)
|
self.config_files = load_config_files(config_dir)
|
||||||
|
|
||||||
def __init_logger(self, loglevel):
|
def __init_logger(self, loglevel):
|
||||||
|
@ -32,27 +31,33 @@ class Ranking():
|
||||||
if not v4_last or not v6_last:
|
if not v4_last or not v6_last:
|
||||||
'''Failsafe if asn_meta has not been populated yet'''
|
'''Failsafe if asn_meta has not been populated yet'''
|
||||||
return
|
return
|
||||||
for source in self.ardb_storage.smembers(f'{today}|sources'):
|
for source in self.storage.smembers(f'{today}|sources'):
|
||||||
self.logger.info(f'{today} - Ranking source: {source}')
|
self.logger.info(f'{today} - Ranking source: {source}')
|
||||||
r_pipeline = self.ranking.pipeline()
|
r_pipeline = self.ranking.pipeline()
|
||||||
for asn in self.ardb_storage.smembers(f'{today}|{source}'):
|
for asn in self.storage.smembers(f'{today}|{source}'):
|
||||||
|
if asn == '0':
|
||||||
|
# Default ASN when no matches. Probably spoofed.
|
||||||
|
continue
|
||||||
self.logger.debug(f'{today} - Ranking source: {source} / ASN: {asn}')
|
self.logger.debug(f'{today} - Ranking source: {source} / ASN: {asn}')
|
||||||
asn_rank_v4 = 0.0
|
asn_rank_v4 = 0.0
|
||||||
asn_rank_v6 = 0.0
|
asn_rank_v6 = 0.0
|
||||||
for prefix in self.ardb_storage.smembers(f'{today}|{source}|{asn}'):
|
for prefix in self.storage.smembers(f'{today}|{source}|{asn}'):
|
||||||
ips = set([ip_ts.split('|')[0]
|
ips = set([ip_ts.split('|')[0]
|
||||||
for ip_ts in self.ardb_storage.smembers(f'{today}|{source}|{asn}|{prefix}')])
|
for ip_ts in self.storage.smembers(f'{today}|{source}|{asn}|{prefix}')])
|
||||||
prefix_rank = float(len(ips)) / ip_network(prefix).num_addresses
|
py_prefix = ip_network(prefix)
|
||||||
r_pipeline.zadd(f'{today}|{source}|{asn}|rankv{prefix_rank.version}|prefixes', prefix, prefix_rank)
|
prefix_rank = float(len(ips)) / py_prefix.num_addresses
|
||||||
if prefix_rank.version == 4:
|
r_pipeline.zadd(f'{today}|{source}|{asn}|rankv{py_prefix.version}|prefixes', prefix_rank, prefix)
|
||||||
|
if py_prefix.version == 4:
|
||||||
asn_rank_v4 += len(ips) * self.config_files[source]['impact']
|
asn_rank_v4 += len(ips) * self.config_files[source]['impact']
|
||||||
else:
|
else:
|
||||||
asn_rank_v6 += len(ips) * self.config_files[source]['impact']
|
asn_rank_v6 += len(ips) * self.config_files[source]['impact']
|
||||||
asn_rank_v4 /= int(self.asn_meta.get(f'{v4_last}|{asn}|v4|ipcount'))
|
v4count = self.asn_meta.get(f'{v4_last}|{asn}|v4|ipcount')
|
||||||
asn_rank_v6 /= int(self.asn_meta.get(f'{v6_last}|{asn}|v6|ipcount'))
|
v6count = self.asn_meta.get(f'{v6_last}|{asn}|v6|ipcount')
|
||||||
if asn_rank_v4:
|
if v4count:
|
||||||
|
asn_rank_v4 /= int(v4count)
|
||||||
r_pipeline.set(f'{today}|{source}|{asn}|rankv4', asn_rank_v4)
|
r_pipeline.set(f'{today}|{source}|{asn}|rankv4', asn_rank_v4)
|
||||||
if asn_rank_v6:
|
if v6count:
|
||||||
|
asn_rank_v6 /= int(v6count)
|
||||||
r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6)
|
r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6)
|
||||||
r_pipeline.execute()
|
r_pipeline.execute()
|
||||||
unset_running(self.__class__.__name__)
|
unset_running(self.__class__.__name__)
|
||||||
|
|
|
@ -19,7 +19,9 @@ class RISPrefixLookup():
|
||||||
self.longest_prefix_matching = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True)
|
self.longest_prefix_matching = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True)
|
||||||
self.tree_v4 = pytricia.PyTricia()
|
self.tree_v4 = pytricia.PyTricia()
|
||||||
self.tree_v6 = pytricia.PyTricia(128)
|
self.tree_v6 = pytricia.PyTricia(128)
|
||||||
self.init_tree()
|
self.force_init = True
|
||||||
|
self.current_v4 = None
|
||||||
|
self.current_v6 = None
|
||||||
|
|
||||||
def __init_logger(self, loglevel):
|
def __init_logger(self, loglevel):
|
||||||
self.logger = logging.getLogger(f'{self.__class__.__name__}')
|
self.logger = logging.getLogger(f'{self.__class__.__name__}')
|
||||||
|
@ -36,7 +38,9 @@ class RISPrefixLookup():
|
||||||
for prefix in self.prefix_db.smembers(f'{asn}|v6'):
|
for prefix in self.prefix_db.smembers(f'{asn}|v6'):
|
||||||
self.tree_v6[prefix] = asn
|
self.tree_v6[prefix] = asn
|
||||||
self.tree_v4['0.0.0.0/0'] = 0
|
self.tree_v4['0.0.0.0/0'] = 0
|
||||||
self.tree_v4['::/0'] = 0
|
self.tree_v6['::/0'] = 0
|
||||||
|
self.current_v4 = self.prefix_db.get('current|v4')
|
||||||
|
self.current_v6 = self.prefix_db.get('current|v6')
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
set_running(self.__class__.__name__)
|
set_running(self.__class__.__name__)
|
||||||
|
@ -46,7 +50,14 @@ class RISPrefixLookup():
|
||||||
if not self.prefix_db.get('ready'):
|
if not self.prefix_db.get('ready'):
|
||||||
self.logger.debug('Prefix database not ready.')
|
self.logger.debug('Prefix database not ready.')
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
self.force_init = True
|
||||||
continue
|
continue
|
||||||
|
if (self.force_init or
|
||||||
|
(self.current_v4 != self.prefix_db.get('current|v4')) or
|
||||||
|
(self.current_v6 != self.prefix_db.get('current|v6'))):
|
||||||
|
self.init_tree()
|
||||||
|
self.force_init = False
|
||||||
|
|
||||||
ips = self.longest_prefix_matching.spop('for_ris_lookup', 100)
|
ips = self.longest_prefix_matching.spop('for_ris_lookup', 100)
|
||||||
if not ips: # TODO: add a check against something to stop the loop
|
if not ips: # TODO: add a check against something to stop the loop
|
||||||
self.logger.debug('Nothing to lookup')
|
self.logger.debug('Nothing to lookup')
|
||||||
|
|
|
@ -12,7 +12,7 @@ logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
|
||||||
|
|
||||||
class RankingManager(AbstractManager):
|
class RankingManager(AbstractManager):
|
||||||
|
|
||||||
def __init__(self, config_dir: Path=None, loglevel: int=logging.DEBUG):
|
def __init__(self, config_dir: Path=None, loglevel: int=logging.INFO):
|
||||||
super().__init__(loglevel)
|
super().__init__(loglevel)
|
||||||
self.ranking = Ranking(config_dir, loglevel)
|
self.ranking = Ranking(config_dir, loglevel)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue