BGP-Ranking/bgpranking/libs/statsripe.py

146 lines
5.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
from enum import Enum
2018-07-27 18:38:30 +02:00
from datetime import datetime, timedelta
from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network
from typing import TypeVar
2018-07-27 18:38:30 +02:00
from .helpers import get_homedir, safe_create_dir
try:
import simplejson as json
except ImportError:
import json
2018-07-27 18:38:30 +02:00
from dateutil.parser import parse
import copy
IPTypes = TypeVar('IPTypes', IPv4Address, IPv6Address, 'str')
PrefixTypes = TypeVar('PrefixTypes', IPv4Network, IPv6Network, 'str')
TimeTypes = TypeVar('TimeTypes', datetime, 'str')
class ASNsTypes(Enum):
transiting = 't'
originating = 'o'
all_types = 't,o'
undefined = ''
class AddressFamilies(Enum):
ipv4 = 'v4'
ipv6 = 'v6'
all_families = 'v4,v6'
undefined = ''
class Noise(Enum):
keep = 'keep'
remove = 'filter'
class StatsRIPE():
def __init__(self, sourceapp='bgpranking-ng - CIRCL'):
self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}"
self.sourceapp = sourceapp
2018-07-27 18:38:30 +02:00
self.cache_dir = get_homedir() / 'rawdata' / 'stats_ripe'
def __time_to_text(self, query_time: TimeTypes) -> str:
2018-07-27 14:33:25 +02:00
if isinstance(query_time, datetime):
return query_time.isoformat()
return query_time
2018-07-27 18:38:30 +02:00
def _get_cache(self, method, parameters):
'''The dataset is updated every 8 hours (midnight, 8, 16).
If parameters has a key 'query_time' on any of these hours, try to get it.
If not, try to get the closest one.
If it has nothing, assume non and try to get the closest timestamp
When caching, get query_time from response['data']['query_time']
'''
parameters = copy.copy(parameters)
if not parameters.get('query_time'):
# use timedelta because the generation of the new dataset takes a while.
parameters['query_time'] = (datetime.now() - timedelta(hours=8)).isoformat()
d = parse(parameters['query_time'])
if d.hour == 8 and d.minute == 0 and d.second == 0:
pass
else:
d = d.replace(hour=min([0, 8, 16], key=lambda x: abs(x - d.hour)),
minute=0, second=0, microsecond=0)
parameters['query_time'] = d.isoformat()
cache_filename = '&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()])
c_path = self.cache_dir / method / cache_filename
if c_path.exists():
with open(c_path, 'r') as f:
return json.load(f)
return False
def _save_cache(self, method, parameters, response):
parameters['query_time'] = response['data']['query_time']
cache_filename = '&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()])
safe_create_dir(self.cache_dir / method)
c_path = self.cache_dir / method / cache_filename
with open(c_path, 'w') as f:
json.dump(response, f, indent=2)
def _get(self, method: str, parameters: dict) -> dict:
parameters['sourceapp'] = self.sourceapp
2018-07-27 18:38:30 +02:00
cached = self._get_cache(method, parameters)
if cached:
return cached
url = self.url.format(method=method, parameters='&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()]))
response = requests.get(url)
2018-07-27 18:38:30 +02:00
j_content = response.json()
self._save_cache(method, parameters, j_content)
return j_content
2018-07-27 14:33:25 +02:00
def network_info(self, ip: IPTypes) -> dict:
parameters = {'resource': ip}
return self._get('network-info', parameters)
2018-07-27 14:33:25 +02:00
def prefix_overview(self, prefix: PrefixTypes, min_peers_seeing: int= 0,
max_related: int=0, query_time: TimeTypes=None) -> dict:
parameters = {'resource': prefix}
if min_peers_seeing:
parameters['min_peers_seeing'] = min_peers_seeing
if max_related:
parameters['max_related'] = max_related
if query_time:
parameters['query_time'] = self.__time_to_text(query_time)
return self._get('prefix-overview', parameters)
2018-07-27 14:33:25 +02:00
def ris_asns(self, query_time: TimeTypes=None, list_asns: bool=False, asn_types: ASNsTypes=ASNsTypes.undefined):
parameters = {}
if list_asns:
parameters['list_asns'] = list_asns
if asn_types:
parameters['asn_types'] = asn_types.value
if query_time:
2018-07-27 14:33:25 +02:00
parameters['query_time'] = self.__time_to_text(query_time)
return self._get('ris-asns', parameters)
2018-07-27 14:33:25 +02:00
def ris_prefixes(self, asn: int, query_time: TimeTypes=None,
list_prefixes: bool=False, types: ASNsTypes=ASNsTypes.undefined,
af: AddressFamilies=AddressFamilies.undefined, noise: Noise=Noise.keep):
parameters = {'resource': str(asn)}
if query_time:
parameters['query_time'] = self.__time_to_text(query_time)
if list_prefixes:
parameters['list_prefixes'] = list_prefixes
if types:
parameters['types'] = types.value
if af:
parameters['af'] = af.value
if noise:
parameters['noise'] = noise.value
return self._get('ris-prefixes', parameters)
2018-07-27 14:33:25 +02:00
def country_asns(self, country: str, details: int=0, query_time: TimeTypes=None):
parameters = {'resource': country}
if details:
parameters['lod'] = details
2018-07-31 11:57:49 +02:00
if query_time:
parameters['query_time'] = self.__time_to_text(query_time)
2018-07-27 14:33:25 +02:00
return self._get('country-asns', parameters)