new: Add caching to statsripe
parent
09ac601acb
commit
dd1e3ea447
|
@ -3,9 +3,13 @@
|
|||
|
||||
import requests
|
||||
from enum import Enum
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network
|
||||
from typing import TypeVar
|
||||
from .helpers import get_homedir, safe_create_dir
|
||||
import json
|
||||
from dateutil.parser import parse
|
||||
import copy
|
||||
|
||||
IPTypes = TypeVar('IPTypes', IPv4Address, IPv6Address, 'str')
|
||||
PrefixTypes = TypeVar('PrefixTypes', IPv4Network, IPv6Network, 'str')
|
||||
|
@ -36,18 +40,57 @@ class StatsRIPE():
|
|||
def __init__(self, sourceapp='bgpranking-ng - CIRCL'):
|
||||
self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}"
|
||||
self.sourceapp = sourceapp
|
||||
self.cache_dir = get_homedir() / 'rawdata' / 'stats_ripe'
|
||||
|
||||
def __time_to_text(self, query_time: TimeTypes) -> str:
|
||||
if isinstance(query_time, datetime):
|
||||
return query_time.isoformat()
|
||||
return query_time
|
||||
|
||||
def _get_cache(self, method, parameters):
|
||||
'''The dataset is updated every 8 hours (midnight, 8, 16).
|
||||
If parameters has a key 'query_time' on any of these hours, try to get it.
|
||||
If not, try to get the closest one.
|
||||
If it has nothing, assume non and try to get the closest timestamp
|
||||
When caching, get query_time from response['data']['query_time']
|
||||
'''
|
||||
parameters = copy.copy(parameters)
|
||||
if not parameters.get('query_time'):
|
||||
# use timedelta because the generation of the new dataset takes a while.
|
||||
parameters['query_time'] = (datetime.now() - timedelta(hours=8)).isoformat()
|
||||
|
||||
d = parse(parameters['query_time'])
|
||||
if d.hour == 8 and d.minute == 0 and d.second == 0:
|
||||
pass
|
||||
else:
|
||||
d = d.replace(hour=min([0, 8, 16], key=lambda x: abs(x - d.hour)),
|
||||
minute=0, second=0, microsecond=0)
|
||||
parameters['query_time'] = d.isoformat()
|
||||
cache_filename = '&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()])
|
||||
c_path = self.cache_dir / method / cache_filename
|
||||
if c_path.exists():
|
||||
with open(c_path, 'r') as f:
|
||||
return json.load(f)
|
||||
return False
|
||||
|
||||
def _save_cache(self, method, parameters, response):
|
||||
parameters['query_time'] = response['data']['query_time']
|
||||
cache_filename = '&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()])
|
||||
safe_create_dir(self.cache_dir / method)
|
||||
c_path = self.cache_dir / method / cache_filename
|
||||
with open(c_path, 'w') as f:
|
||||
json.dump(response, f, indent=2)
|
||||
|
||||
def _get(self, method: str, parameters: dict) -> dict:
|
||||
parameters['sourceapp'] = self.sourceapp
|
||||
cached = self._get_cache(method, parameters)
|
||||
if cached:
|
||||
return cached
|
||||
url = self.url.format(method=method, parameters='&'.join(['{}={}'.format(k, str(v).lower()) for k, v in parameters.items()]))
|
||||
print(url)
|
||||
response = requests.get(url)
|
||||
return response.json()
|
||||
j_content = response.json()
|
||||
self._save_cache(method, parameters, j_content)
|
||||
return j_content
|
||||
|
||||
def network_info(self, ip: IPTypes) -> dict:
|
||||
parameters = {'resource': ip}
|
||||
|
|
Loading…
Reference in New Issue