BGP-Ranking/bgpranking/helpers.py

67 lines
1.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from functools import lru_cache
from pathlib import Path
from typing import Dict, List
import requests
from pyipasnhistory import IPASNHistory
from .default import get_homedir, get_config, ThirdPartyUnreachable, safe_create_dir
@lru_cache(64)
def get_data_dir() -> Path:
capture_dir = get_homedir() / 'rawdata'
safe_create_dir(capture_dir)
return capture_dir
@lru_cache(64)
def get_modules_dir() -> Path:
modules_dir = get_homedir() / 'config' / 'modules'
safe_create_dir(modules_dir)
return modules_dir
@lru_cache(64)
def get_modules() -> List[Path]:
return [modulepath for modulepath in get_modules_dir().glob('*.json')]
def load_all_modules_configs() -> Dict[str, Dict]:
configs = {}
for p in get_modules():
with p.open() as f:
j = json.load(f)
configs[f"{j['vendor']}-{j['name']}"] = j
return configs
def get_ipasn():
ipasnhistory_url = get_config('generic', 'ipasnhistory_url')
ipasn = IPASNHistory(ipasnhistory_url)
if not ipasn.is_up:
raise ThirdPartyUnreachable(f"Unable to reach IPASNHistory on {ipasnhistory_url}")
return ipasn
def sanity_check_ipasn(ipasn):
try:
meta = ipasn.meta()
except requests.exceptions.ConnectionError:
return False, "IP ASN History is not reachable, try again later."
if 'error' in meta:
raise ThirdPartyUnreachable(f'IP ASN History has a problem: {meta["error"]}')
v4_percent = meta['cached_dates']['caida']['v4']['percent']
v6_percent = meta['cached_dates']['caida']['v6']['percent']
if v4_percent < 90 or v6_percent < 90: # (this way it works if we only load 10 days)
# Try again later.
return False, f"IP ASN History is not ready: v4 {v4_percent}% / v6 {v6_percent}% loaded"
return True, f"IP ASN History is ready: v4 {v4_percent}% / v6 {v6_percent}% loaded"