new: Add daily aggregated rankings

pull/12/head
Raphaël Vinot 2018-04-10 11:21:03 +02:00
parent b60c70a3c5
commit 53b08e6181
1 changed files with 17 additions and 0 deletions

View File

@ -32,10 +32,22 @@ class Ranking():
'''Failsafe if asn_meta has not been populated yet''' '''Failsafe if asn_meta has not been populated yet'''
unset_running(self.__class__.__name__) unset_running(self.__class__.__name__)
return return
asns_aggregation_key_v4 = f'{today}|asns|v4'
asns_aggregation_key_v6 = f'{today}|asns|v6'
self.ranking.delete(asns_aggregation_key_v4, asns_aggregation_key_v6)
cleaned_up = set()
for source in self.storage.smembers(f'{today}|sources'): for source in self.storage.smembers(f'{today}|sources'):
self.logger.info(f'{today} - Ranking source: {source}') self.logger.info(f'{today} - Ranking source: {source}')
r_pipeline = self.ranking.pipeline() r_pipeline = self.ranking.pipeline()
for asn in self.storage.smembers(f'{today}|{source}'): for asn in self.storage.smembers(f'{today}|{source}'):
prefixes_aggregation_key_v4 = f'{today}|{asn}|v4'
prefixes_aggregation_key_v6 = f'{today}|{asn}|v6'
if prefixes_aggregation_key_v4 not in cleaned_up:
self.ranking.delete(prefixes_aggregation_key_v4)
cleaned_up.add(prefixes_aggregation_key_v4)
if prefixes_aggregation_key_v6 not in cleaned_up:
self.ranking.delete(prefixes_aggregation_key_v6)
cleaned_up.add(prefixes_aggregation_key_v6)
if asn == '0': if asn == '0':
# Default ASN when no matches. Probably spoofed. # Default ASN when no matches. Probably spoofed.
continue continue
@ -50,16 +62,21 @@ class Ranking():
r_pipeline.zadd(f'{today}|{source}|{asn}|rankv{py_prefix.version}|prefixes', prefix_rank, prefix) r_pipeline.zadd(f'{today}|{source}|{asn}|rankv{py_prefix.version}|prefixes', prefix_rank, prefix)
if py_prefix.version == 4: if py_prefix.version == 4:
asn_rank_v4 += len(ips) * self.config_files[source]['impact'] asn_rank_v4 += len(ips) * self.config_files[source]['impact']
r_pipeline.zincrby(prefixes_aggregation_key_v4, prefix_rank * self.config_files[source]['impact'], prefix)
else: else:
asn_rank_v6 += len(ips) * self.config_files[source]['impact'] asn_rank_v6 += len(ips) * self.config_files[source]['impact']
r_pipeline.zincrby(prefixes_aggregation_key_v6, prefix_rank * self.config_files[source]['impact'], prefix)
v4count = self.asn_meta.get(f'{v4_last}|{asn}|v4|ipcount') v4count = self.asn_meta.get(f'{v4_last}|{asn}|v4|ipcount')
v6count = self.asn_meta.get(f'{v6_last}|{asn}|v6|ipcount') v6count = self.asn_meta.get(f'{v6_last}|{asn}|v6|ipcount')
if v4count: if v4count:
asn_rank_v4 /= int(v4count) asn_rank_v4 /= int(v4count)
r_pipeline.set(f'{today}|{source}|{asn}|rankv4', asn_rank_v4) r_pipeline.set(f'{today}|{source}|{asn}|rankv4', asn_rank_v4)
r_pipeline.zincrby(asns_aggregation_key_v4, asn_rank_v4, asn)
if v6count: if v6count:
asn_rank_v6 /= int(v6count) asn_rank_v6 /= int(v6count)
r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6) r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6)
r_pipeline.zincrby(asns_aggregation_key_v6, asn_rank_v6, asn)
r_pipeline.execute() r_pipeline.execute()
unset_running(self.__class__.__name__) unset_running(self.__class__.__name__)
self.logger.info('Ranking done.') self.logger.info('Ranking done.')