BGP-Ranking/bgpranking/querying.py

266 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2018-04-10 21:27:35 +02:00
# -*- coding: utf-8 -*-
from typing import TypeVar, Union
2018-04-10 23:22:32 +02:00
import datetime
2018-05-31 15:48:11 +02:00
from datetime import timedelta
2018-04-10 21:27:35 +02:00
from dateutil.parser import parse
2018-06-01 17:13:56 +02:00
from collections import defaultdict
2018-04-10 21:27:35 +02:00
import logging
import json
2018-04-10 21:27:35 +02:00
from redis import StrictRedis
from .libs.helpers import get_socket_path, get_config_path
from .libs.exceptions import InvalidDateFormat
2018-07-27 14:33:25 +02:00
from .libs.statsripe import StatsRIPE
2018-04-10 21:27:35 +02:00
2018-04-10 23:22:32 +02:00
Dates = TypeVar('Dates', datetime.datetime, datetime.date, str)
2018-04-10 21:27:35 +02:00
class Querying():
2018-04-10 21:27:35 +02:00
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1)
2018-04-10 21:27:35 +02:00
self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True)
self.cache = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
2018-04-10 21:27:35 +02:00
def __init_logger(self, loglevel: int):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(loglevel)
def __normalize_date(self, date: Dates):
2018-04-10 23:22:32 +02:00
if isinstance(date, datetime.datetime):
2018-04-10 21:27:35 +02:00
return date.date().isoformat()
2018-04-10 23:22:32 +02:00
elif isinstance(date, datetime.date):
2018-04-10 21:27:35 +02:00
return date.isoformat()
elif isinstance(date, str):
try:
return parse(date).date().isoformat()
except ValueError:
raise InvalidDateFormat('Unable to parse the date. Should be YYYY-MM-DD.')
def _ranking_cache_wrapper(self, key):
if not self.cache.exists(key):
if self.ranking.exists(key):
key_dump = self.ranking.dump(key)
# Cache for 10 hours
self.cache.restore(key, 36000, key_dump, True)
def asns_global_ranking(self, date: Dates=datetime.date.today(), source: Union[list, str]='',
ipversion: str='v4', limit: int=100):
2018-04-10 21:27:35 +02:00
'''Aggregated ranking of all the ASNs known in the system, weighted by source.'''
to_return = {'meta': {'ipversion': ipversion, 'limit': limit}, 'source': source,
'response': set()}
2018-04-10 21:27:35 +02:00
d = self.__normalize_date(date)
to_return['meta']['date'] = d
2018-04-12 18:09:04 +02:00
if source:
if isinstance(source, list):
keys = []
for s in source:
key = f'{d}|{s}|asns|{ipversion}'
self._ranking_cache_wrapper(key)
keys.append(key)
# union the ranked sets
key = '|'.join(sorted(source)) + f'|{d}|asns|{ipversion}'
if not self.cache.exists(key):
self.cache.zunionstore(key, keys)
else:
key = f'{d}|{source}|asns|{ipversion}'
2018-04-12 18:09:04 +02:00
else:
key = f'{d}|asns|{ipversion}'
self._ranking_cache_wrapper(key)
to_return['response'] = self.cache.zrevrange(key, start=0, end=limit, withscores=True)
return to_return
2018-04-10 21:27:35 +02:00
def asn_details(self, asn: int, date: Dates= datetime.date.today(), source: Union[list, str]='',
ipversion: str='v4'):
2018-04-10 21:27:35 +02:00
'''Aggregated ranking of all the prefixes anounced by the given ASN, weighted by source.'''
to_return = {'meta': {'asn': asn, 'ipversion': ipversion, 'source': source},
'response': set()}
2018-04-10 21:27:35 +02:00
d = self.__normalize_date(date)
to_return['meta']['date'] = d
2018-04-12 18:09:04 +02:00
if source:
if isinstance(source, list):
keys = []
for s in source:
key = f'{d}|{s}|{asn}|{ipversion}|prefixes'
self._ranking_cache_wrapper(key)
keys.append(key)
# union the ranked sets
key = '|'.join(sorted(source)) + f'|{d}|{asn}|{ipversion}'
if not self.cache.exists(key):
self.cache.zunionstore(key, keys)
else:
key = f'{d}|{source}|{asn}|{ipversion}|prefixes'
2018-04-12 18:09:04 +02:00
else:
key = f'{d}|{asn}|{ipversion}'
self._ranking_cache_wrapper(key)
to_return['response'] = self.cache.zrevrange(key, start=0, end=-1, withscores=True)
return to_return
2018-04-10 21:27:35 +02:00
def asn_rank(self, asn: int, date: Dates=datetime.date.today(), source: Union[list, str]='',
ipversion: str='v4', with_position: bool=False):
2018-04-10 21:27:35 +02:00
'''Get the rank of a single ASN, weighted by source.'''
to_return = {'meta': {'asn': asn, 'ipversion': ipversion,
'source': source, 'with_position': with_position},
'response': 0.0}
2018-04-10 21:27:35 +02:00
d = self.__normalize_date(date)
to_return['meta']['date'] = d
2018-04-12 18:09:04 +02:00
if source:
to_return['meta']['source'] = source
if isinstance(source, list):
keys = []
for s in source:
key = f'{d}|{s}|{asn}|{ipversion}'
self._ranking_cache_wrapper(key)
keys.append(key)
r = sum(float(self.cache.get(key)) for key in keys if self.cache.exists(key))
else:
key = f'{d}|{source}|{asn}|{ipversion}'
self._ranking_cache_wrapper(key)
r = self.cache.get(key)
2018-04-12 18:09:04 +02:00
else:
key = f'{d}|asns|{ipversion}'
self._ranking_cache_wrapper(key)
r = self.cache.zscore(key, asn)
if not r:
r = 0
if with_position and not source:
2019-05-21 12:30:28 +02:00
position = self.cache.zrevrank(key, asn)
if position is not None:
position += 1
to_return['response'] = {'rank': float(r), 'position': position,
'total_known_asns': self.cache.zcard(key)}
else:
to_return['response'] = float(r)
return to_return
2018-04-10 21:27:35 +02:00
2018-05-31 15:48:11 +02:00
def get_sources(self, date: Dates=datetime.date.today()):
2018-04-12 18:09:04 +02:00
'''Get the sources availables for a specific day (default: today).'''
to_return = {'meta': {}, 'response': set()}
2018-04-10 21:27:35 +02:00
d = self.__normalize_date(date)
to_return['meta']['date'] = d
2018-04-12 18:09:04 +02:00
key = f'{d}|sources'
to_return['response'] = self.storage.smembers(key)
return to_return
2018-04-13 18:02:44 +02:00
def get_asn_descriptions(self, asn: int, all_descriptions=False):
to_return = {'meta': {'asn': asn, 'all_descriptions': all_descriptions},
'response': []}
2018-04-13 18:02:44 +02:00
descriptions = self.asn_meta.hgetall(f'{asn}|descriptions')
if all_descriptions or not descriptions:
to_return['response'] = descriptions
else:
to_return['response'] = descriptions[sorted(descriptions.keys(), reverse=True)[0]]
return to_return
2018-05-31 15:48:11 +02:00
def get_prefix_ips(self, asn: int, prefix: str, date: Dates=datetime.date.today(),
source: Union[list, str]='', ipversion: str='v4'):
to_return = {'meta': {'asn': asn, 'prefix': prefix, 'ipversion': ipversion,
'source': source},
'response': defaultdict(list)}
d = self.__normalize_date(date)
to_return['meta']['date'] = d
2018-06-01 17:13:56 +02:00
if source:
to_return['meta']['source'] = source
if isinstance(source, list):
sources = source
else:
sources = [source]
2018-06-01 17:13:56 +02:00
else:
sources = self.get_sources(d)['response']
2018-06-01 17:13:56 +02:00
for source in sources:
ips = set([ip_ts.split('|')[0]
for ip_ts in self.storage.smembers(f'{d}|{source}|{asn}|{prefix}')])
[to_return['response'][ip].append(source) for ip in ips]
return to_return
2018-06-01 17:13:56 +02:00
def get_asn_history(self, asn: int, period: int=100, source: Union[list, str]='',
ipversion: str='v4', date: Dates=datetime.date.today()):
to_return = {'meta': {'asn': asn, 'period': period, 'ipversion': ipversion,
'source': source},
'response': []}
2018-06-07 16:18:50 +02:00
if isinstance(date, str):
date = parse(date).date()
if date + timedelta(days=period / 3) > datetime.date.today():
# the period to display will be around the date passed at least 2/3 before the date, at most 1/3 after
# FIXME: That is not doing what it is supposed to...
2018-06-07 16:18:50 +02:00
date = datetime.date.today()
to_return['meta']['date'] = date.isoformat()
2018-05-31 15:48:11 +02:00
for i in range(period):
2018-06-07 16:18:50 +02:00
d = date - timedelta(days=i)
rank = self.asn_rank(asn, d, source, ipversion)
if 'response' not in rank:
2018-05-31 17:37:35 +02:00
rank = 0
to_return['response'].insert(0, (d.isoformat(), rank['response']))
2018-05-31 15:48:11 +02:00
return to_return
2018-07-27 14:33:25 +02:00
def country_rank(self, country: str, date: Dates=datetime.date.today(), source: Union[list, str]='',
ipversion: str='v4'):
to_return = {'meta': {'country': country, 'ipversion': ipversion,
'source': source},
'response': []}
2018-07-27 14:33:25 +02:00
d = self.__normalize_date(date)
to_return['meta']['date'] = d
ripe = StatsRIPE()
2018-07-27 14:33:25 +02:00
response = ripe.country_asns(country, query_time=d, details=1)
if (not response.get('data') or not response['data'].get('countries') or not
response['data']['countries'][0].get('routed')):
logging.warning(f'Invalid response: {response}')
# FIXME: return something
2018-07-31 10:54:38 +02:00
return 0, [(0, 0)]
routed_asns = response['data']['countries'][0]['routed']
ranks = [self.asn_rank(asn, d, source, ipversion)['response'] for asn in routed_asns]
to_return['response'] = [sum(ranks), zip(routed_asns, ranks)]
return to_return
2018-07-27 14:33:25 +02:00
def country_history(self, country: Union[list, str], period: int=30, source: Union[list, str]='',
ipversion: str='v4', date: Dates=datetime.date.today()):
to_return = {}
to_return = {'meta': {'country': country, 'ipversion': ipversion,
'source': source},
'response': defaultdict(list)}
2018-07-27 14:33:25 +02:00
if isinstance(date, str):
date = parse(date).date()
if date + timedelta(days=period / 3) > datetime.date.today():
# the period to display will be around the date passed at least 2/3 before the date, at most 1/3 after
date = datetime.date.today()
if isinstance(country, str):
country = [country]
for c in country:
for i in range(period):
d = date - timedelta(days=i)
rank, details = self.country_rank(c, d, source, ipversion)['response']
if rank is None:
rank = 0
to_return['response'][c].insert(0, (d.isoformat(), rank, list(details)))
2018-07-27 14:33:25 +02:00
return to_return
def get_source_config(self):
pass
def get_sources_configs(self):
config_dir = get_config_path() / 'modules'
loaded = []
for modulepath in config_dir.glob('*.json'):
with open(modulepath) as f:
loaded.append(json.load(f))
return {'{}-{}'.format(config['vendor'], config['name']): config for config in loaded}