chg: Reduce the queries against the ranking db
parent
f65228a2df
commit
ce96ad9825
|
@ -26,28 +26,21 @@ class Ranking():
|
||||||
self.logger.info('Start ranking')
|
self.logger.info('Start ranking')
|
||||||
set_running(self.__class__.__name__)
|
set_running(self.__class__.__name__)
|
||||||
today = date.today().isoformat()
|
today = date.today().isoformat()
|
||||||
v4_last = self.asn_meta.get('v4|last')
|
v4_last, v6_last = self.asn_meta.mget('v4|last', 'v6|last')
|
||||||
v6_last = self.asn_meta.get('v6|last')
|
|
||||||
if not v4_last or not v6_last:
|
if not v4_last or not v6_last:
|
||||||
'''Failsafe if asn_meta has not been populated yet'''
|
'''Failsafe if asn_meta has not been populated yet'''
|
||||||
unset_running(self.__class__.__name__)
|
unset_running(self.__class__.__name__)
|
||||||
return
|
return
|
||||||
asns_aggregation_key_v4 = f'{today}|asns|v4'
|
asns_aggregation_key_v4 = f'{today}|asns|v4'
|
||||||
asns_aggregation_key_v6 = f'{today}|asns|v6'
|
asns_aggregation_key_v6 = f'{today}|asns|v6'
|
||||||
self.ranking.delete(asns_aggregation_key_v4, asns_aggregation_key_v6)
|
to_delete = set([asns_aggregation_key_v4, asns_aggregation_key_v6])
|
||||||
cleaned_up = set()
|
r_pipeline = self.ranking.pipeline()
|
||||||
for source in self.storage.smembers(f'{today}|sources'):
|
for source in self.storage.smembers(f'{today}|sources'):
|
||||||
self.logger.info(f'{today} - Ranking source: {source}')
|
self.logger.info(f'{today} - Ranking source: {source}')
|
||||||
r_pipeline = self.ranking.pipeline()
|
|
||||||
for asn in self.storage.smembers(f'{today}|{source}'):
|
for asn in self.storage.smembers(f'{today}|{source}'):
|
||||||
prefixes_aggregation_key_v4 = f'{today}|{asn}|v4'
|
prefixes_aggregation_key_v4 = f'{today}|{asn}|v4'
|
||||||
prefixes_aggregation_key_v6 = f'{today}|{asn}|v6'
|
prefixes_aggregation_key_v6 = f'{today}|{asn}|v6'
|
||||||
if prefixes_aggregation_key_v4 not in cleaned_up:
|
to_delete.update([prefixes_aggregation_key_v4, prefixes_aggregation_key_v6])
|
||||||
self.ranking.delete(prefixes_aggregation_key_v4)
|
|
||||||
cleaned_up.add(prefixes_aggregation_key_v4)
|
|
||||||
if prefixes_aggregation_key_v6 not in cleaned_up:
|
|
||||||
self.ranking.delete(prefixes_aggregation_key_v6)
|
|
||||||
cleaned_up.add(prefixes_aggregation_key_v6)
|
|
||||||
if asn == '0':
|
if asn == '0':
|
||||||
# Default ASN when no matches. Probably spoofed.
|
# Default ASN when no matches. Probably spoofed.
|
||||||
continue
|
continue
|
||||||
|
@ -66,8 +59,7 @@ class Ranking():
|
||||||
else:
|
else:
|
||||||
asn_rank_v6 += len(ips) * self.config_files[source]['impact']
|
asn_rank_v6 += len(ips) * self.config_files[source]['impact']
|
||||||
r_pipeline.zincrby(prefixes_aggregation_key_v6, prefix, prefix_rank * self.config_files[source]['impact'])
|
r_pipeline.zincrby(prefixes_aggregation_key_v6, prefix, prefix_rank * self.config_files[source]['impact'])
|
||||||
v4count = self.asn_meta.get(f'{v4_last}|{asn}|v4|ipcount')
|
v4count, v6count = self.asn_meta.mget(f'{v4_last}|{asn}|v4|ipcount', f'{v6_last}|{asn}|v6|ipcount')
|
||||||
v6count = self.asn_meta.get(f'{v6_last}|{asn}|v6|ipcount')
|
|
||||||
if v4count:
|
if v4count:
|
||||||
asn_rank_v4 /= float(v4count)
|
asn_rank_v4 /= float(v4count)
|
||||||
if asn_rank_v4:
|
if asn_rank_v4:
|
||||||
|
@ -78,7 +70,8 @@ class Ranking():
|
||||||
if asn_rank_v6:
|
if asn_rank_v6:
|
||||||
r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6)
|
r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6)
|
||||||
r_pipeline.zincrby(asns_aggregation_key_v6, asn, asn_rank_v6)
|
r_pipeline.zincrby(asns_aggregation_key_v6, asn, asn_rank_v6)
|
||||||
r_pipeline.execute()
|
self.ranking.delete(*to_delete)
|
||||||
|
r_pipeline.execute()
|
||||||
|
|
||||||
unset_running(self.__class__.__name__)
|
unset_running(self.__class__.__name__)
|
||||||
self.logger.info('Ranking done.')
|
self.logger.info('Ranking done.')
|
||||||
|
|
Loading…
Reference in New Issue