#!/usr/bin/env python # -*- coding: utf-8 -*- import requests from enum import Enum from datetime import datetime from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network from typing import TypeVar IPTypes = TypeVar('IPTypes', IPv4Address, IPv6Address, 'str') PrefixTypes = TypeVar('PrefixTypes', IPv4Network, IPv6Network, 'str') TimeTypes = TypeVar('TimeTypes', datetime, 'str') class ASNsTypes(Enum): transiting = 't' originating = 'o' all_types = 't,o' undefined = '' class AddressFamilies(Enum): ipv4 = 'v4' ipv6 = 'v6' all_families = 'v4,v6' undefined = '' class Noise(Enum): keep = 'keep' remove = 'filter' class StatsRIPE(): def __init__(self, sourceapp='bgpranking-ng - CIRCL'): self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}" self.sourceapp = sourceapp def __time_to_text(self, query_time: TimeTypes) -> str: if isinstance(query_time, datetime): return query_time.isoformat() return query_time def _get(self, method: str, parameters: dict) -> dict: parameters['sourceapp'] = self.sourceapp url = self.url.format(method=method, parameters='&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()])) print(url) response = requests.get(url) return response.json() def network_info(self, ip: IPTypes) -> dict: parameters = {'resource': ip} return self._get('network-info', parameters) def prefix_overview(self, prefix: PrefixTypes, min_peers_seeing: int= 0, max_related: int=0, query_time: TimeTypes=None) -> dict: parameters = {'resource': prefix} if min_peers_seeing: parameters['min_peers_seeing'] = min_peers_seeing if max_related: parameters['max_related'] = max_related if query_time: parameters['query_time'] = self.__time_to_text(query_time) return self._get('prefix-overview', parameters) def ris_asns(self, query_time: TimeTypes=None, list_asns: bool=False, asn_types: ASNsTypes=ASNsTypes.undefined): parameters = {} if list_asns: parameters['list_asns'] = list_asns if asn_types: parameters['asn_types'] = asn_types.value if query_time: parameters['query_time'] = self.__time_to_text(query_time) return self._get('ris-asns', parameters) def ris_prefixes(self, asn: int, query_time: TimeTypes=None, list_prefixes: bool=False, types: ASNsTypes=ASNsTypes.undefined, af: AddressFamilies=AddressFamilies.undefined, noise: Noise=Noise.keep): parameters = {'resource': str(asn)} if query_time: parameters['query_time'] = self.__time_to_text(query_time) if list_prefixes: parameters['list_prefixes'] = list_prefixes if types: parameters['types'] = types.value if af: parameters['af'] = af.value if noise: parameters['noise'] = noise.value return self._get('ris-prefixes', parameters) def country_asns(self, country: str, details: int=0, query_time: TimeTypes=None): parameters = {'resource': country} if details: parameters['lod'] = details # FIXME: query_time makes the backend fail. # if query_time: # parameters['query_time'] = self.__time_to_text(query_time) return self._get('country-asns', parameters)