chg: cleanup

pull/12/head
Raphaël Vinot 2018-03-29 23:05:07 +02:00
parent b0550bc91c
commit b4b012a430
12 changed files with 149 additions and 428 deletions

View File

@ -1,427 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import os
from dateutil import parser
from datetime import datetime, date
from hashlib import sha512 # Faster than sha256 on 64b machines.
from pathlib import Path
from dateutil.relativedelta import relativedelta
from collections import defaultdict
import zipfile
import logging
import asyncio
from pid import PidFile, PidFileError
import json
import re
from redis import Redis
from redis import StrictRedis
from uuid import uuid4
from io import BytesIO
import importlib
from typing import List
import types
import ipaddress
class BGPRankingException(Exception):
pass
class FetcherException(BGPRankingException):
pass
class ArchiveException(BGPRankingException):
pass
class CreateDirectoryException(BGPRankingException):
pass
"""
Directory structure:
storage_directory / vendor / listname -> files to import
storage_directory / vendor / listname / meta -> last modified & pid
storage_directory / vendor / listname / archive -> imported files <= 2 month old
storage_directory / vendor / listname / archive / deep -> imported files > 2 month old (zipped)
"""
def safe_create_dir(to_create: Path):
if to_create.exists() and not to_create.is_dir():
raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create))
os.makedirs(to_create, exist_ok=True)
class Fetcher():
def __init__(self, config_file: Path, storage_directory: Path,
loglevel: int=logging.DEBUG):
'''Load `config_file`, and store the fetched data into `storage_directory`
Note: if the `config_file` does not provide a URL (the file is
gathered by some oter mean), the fetcher is automatically stoped.'''
with open(config_file, 'r') as f:
module_parameters = json.load(f)
self.vendor = module_parameters['vendor']
self.listname = module_parameters['name']
self.__init_logger(loglevel)
self.fetcher = True
if 'url' not in module_parameters:
self.logger.info('No URL to fetch, breaking.')
self.fetcher = False
return
self.url = module_parameters['url']
self.logger.debug('Starting fetcher on {}'.format(self.url))
self.directory = storage_directory / self.vendor / self.listname
safe_create_dir(self.directory)
self.meta = self.directory / 'meta'
safe_create_dir(self.meta)
self.archive_dir = self.directory / 'archive'
safe_create_dir(self.archive_dir)
self.first_fetch = True
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__,
self.vendor, self.listname))
self.logger.setLevel(loglevel)
def __get_last_modified(self):
r = requests.head(self.url)
if 'Last-Modified' in r.headers:
return parser.parse(r.headers['Last-Modified'])
return None
def __newer(self):
'''Check if the file available for download is newed than the one
already downloaded by checking the `Last-Modified` header.
Note: return False if the file containing the last header content
is not existing, or the header doesn't have this key.
'''
last_modified_path = self.meta / 'lastmodified'
if not last_modified_path.exists():
# The file doesn't exists
if not self.first_fetch:
# The URL has no Last-Modified header, we cannot use it.
self.logger.debug('No Last-Modified header available')
return True
self.first_fetch = False
last_modified = self.__get_last_modified()
if last_modified:
self.logger.debug('Last-Modified header available')
with last_modified_path.open('w') as f:
f.write(last_modified.isoformat())
else:
self.logger.debug('No Last-Modified header available')
return True
with last_modified_path.open() as f:
last_modified_file = parser.parse(f.read())
last_modified = self.__get_last_modified()
if not last_modified:
# No more Last-Modified header Oo
self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname))
last_modified_path.unlink()
return True
if last_modified > last_modified_file:
self.logger.info('Got a new file.')
with last_modified_path.open('w') as f:
f.write(last_modified.isoformat())
return True
return False
def __same_as_last(self, downloaded):
'''Figure out the last downloaded file, check if it is the same as the
newly downloaded one. Returns true if both files have been downloaded the
same day.
Note: we check the new and the archive directory because we may have backlog
and the newest file is always the first one we process
'''
to_check = []
to_check_new = sorted([f for f in self.directory.iterdir() if f.is_file()])
if to_check_new:
# we have files waiting to be processed
self.logger.debug('{} file(s) are waiting to be processed'.format(len(to_check_new)))
to_check.append(to_check_new[-1])
to_check_archive = sorted([f for f in self.archive_dir.iterdir() if f.is_file()])
if to_check_archive:
# we have files already processed, in the archive
self.logger.debug('{} file(s) have been processed'.format(len(to_check_archive)))
to_check.append(to_check_archive[-1])
if not to_check:
self.logger.debug('New list, no hisorical files')
# nothing has been downloaded ever, moving on
return False
for last_file in to_check:
with last_file.open('rb') as f:
last_hash = sha512(f.read())
dl_hash = sha512(downloaded)
if (dl_hash.digest() == last_hash.digest() and
parser.parse(last_file.name.split('.')[0]).date() == date.today()):
self.logger.debug('Same file already downloaded today.')
return True
return False
@asyncio.coroutine
async def fetch_list(self):
'''Fetch & store the list'''
if not self.fetcher:
return
try:
with PidFile('{}.pid'.format(self.listname), piddir=self.meta):
if not self.__newer():
return
r = requests.get(self.url)
if self.__same_as_last(r.content):
return
self.logger.info('Got a new file \o/')
with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f:
f.write(r.content)
except PidFileError:
self.logger.info('Fetcher already running')
# get announcer: https://stat.ripe.net/data/network-info/data.json?resource=149.13.33.14
class RawFilesParser():
def __init__(self, config_file: Path, storage_directory: Path,
loglevel: int=logging.DEBUG):
with open(config_file, 'r') as f:
module_parameters = json.load(f)
self.vendor = module_parameters['vendor']
self.listname = module_parameters['name']
if 'parser' in module_parameters:
self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser']).parse_raw_file, self)
self.source = '{}-{}'.format(self.vendor, self.listname)
self.directory = storage_directory / self.vendor / self.listname
safe_create_dir(self.directory)
self.__init_logger(loglevel)
self.redis_intake = Redis(host='localhost', port=6379, db=0)
self.logger.debug('Starting intake on {}'.format(self.source))
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__,
self.vendor, self.listname))
self.logger.setLevel(loglevel)
@property
def files_to_parse(self) -> List[Path]:
return sorted([f for f in self.directory.iterdir() if f.is_file()], reverse=True)
def extract_ipv4(self, bytestream: bytes) -> List[bytes]:
return re.findall(rb'[0-9]+(?:\.[0-9]+){3}', bytestream)
def parse_raw_file(self, f: BytesIO):
self.datetime = datetime.now()
return self.extract_ipv4(f.getvalue())
@asyncio.coroutine
async def parse_raw_files(self):
for filepath in self.files_to_parse:
self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1))
with open(filepath, 'rb') as f:
to_parse = BytesIO(f.read())
p = self.redis_intake.pipeline()
for ip in self.parse_raw_file(to_parse):
uuid = uuid4()
p.hmset(uuid, {'ip': ip, 'source': self.source,
'datetime': self.datetime.isoformat()})
p.sadd('intake', uuid)
p.execute()
self._archive(filepath)
def _archive(self, filepath: Path):
'''After processing, move file to the archive directory'''
filepath.rename(self.directory / 'archive' / filepath.name)
class Sanitizer():
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.redis_intake = Redis(host='localhost', port=6379, db=0, decode_responses=True)
self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True)
self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True)
self.logger.debug('Starting import')
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)
async def sanitize(self):
while True:
uuid = self.redis_intake.spop('intake')
if not uuid:
break
data = self.redis_intake.hgetall(uuid)
try:
ip = ipaddress.ip_address(data['ip'])
except ValueError:
self.logger.info('Invalid IP address: {}'.format(data['ip']))
continue
if not ip.is_global:
self.logger.info('The IP address {} is not global'.format(data['ip']))
continue
date = parser.parse(data['datetime']).date().isoformat()
# NOTE: to consider: discard data with an old timestamp (define old)
# Add to temporay DB for further processing
self.ris_cache.sadd('for_ris_lookup', str(ip))
pipeline = self.redis_sanitized.pipeline()
pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'],
'date': date, 'datetime': data['datetime']})
pipeline.sadd('to_insert', uuid)
pipeline.execute()
self.redis_intake.delete(uuid)
class DatabaseInsert():
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.ardb_storage = StrictRedis(host='localhost', port=16379, decode_responses=True)
self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True)
self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True)
self.logger.debug('Starting import')
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)
async def insert(self):
while True:
uuid = self.redis_sanitized.spop('to_insert')
if not uuid:
break
data = self.redis_sanitized.hgetall(uuid)
# Data gathered from the RIS queries:
# * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo
# * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo
# * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview
ris_entry = self.ris_cache.hgetall(data['ip'])
if not ris_entry:
# RIS data not available yet, retry later
# FIXME: an IP can sometimes not be announced, we need to discard it
self.redis_sanitized.sadd('to_insert', uuid)
# In case this IP is missing in the set to process
self.ris_cache.sadd('for_ris_lookup', data['ip'])
continue
# Format: <YYYY-MM-DD>|sources -> set([<source>, ...])
self.ardb_storage.sadd('{}|sources'.format(data['date']), data['source'])
# Format: <YYYY-MM-DD>|<source> -> set([<asn>, ...])
self.ardb_storage.sadd('{}|{}'.format(data['date'], data['source']),
ris_entry['asn'])
# Format: <YYYY-MM-DD>|<source>|<asn> -> set([<prefix>, ...])
self.ardb_storage.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']),
ris_entry['prefix'])
# Format: <YYYY-MM-DD>|<source>|<asn>|<prefix> -> set([<ip>|<datetime>, ...])
self.ardb_storage.sadd('{}|{}|{}|{}'.format(data['date'], data['source'],
ris_entry['asn'],
ris_entry['prefix']),
'{}|{}'.format(data['ip'], data['datetime']))
self.redis_sanitized.delete(uuid)
class StatsRIPE():
def __init__(self, sourceapp='bgpranking-ng - CIRCL'):
self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}"
self.url_parameters = {'sourceapp': sourceapp}
async def network_info(self, ip: str) -> dict:
method = 'network-info'
self.url_parameters['resource'] = ip
parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()])
url = self.url.format(method=method, parameters=parameters)
response = requests.get(url)
return response.json()
async def prefix_overview(self, prefix: str) -> dict:
method = 'prefix-overview'
self.url_parameters['resource'] = prefix
parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()])
url = self.url.format(method=method, parameters=parameters)
response = requests.get(url)
return response.json()
class RoutingInformationServiceFetcher():
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.ris_cache = Redis(host='localhost', port=6381, db=0)
self.logger.debug('Starting RIS fetcher')
self.ripe = StatsRIPE()
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)
async def fetch(self):
while True:
ip = self.ris_cache.spop('for_ris_lookup')
if not ip:
break
ip = ip.decode()
network_info = await self.ripe.network_info(ip)
prefix = network_info['data']['prefix']
asns = network_info['data']['asns']
if not asns or not prefix:
self.logger.warning('The IP {} does not seem to be announced'.format(ip))
continue
prefix_overview = await self.ripe.prefix_overview(prefix)
description = prefix_overview['data']['block']['desc']
if not description:
description = prefix_overview['data']['block']['name']
for asn in asns:
self.ris_cache.hmset(ip, {'asn': asn, 'prefix': prefix,
'description': description})
class DeepArchive():
def __init__(self, config_file: Path, storage_directory: Path,
loglevel: int=logging.DEBUG):
'''Archive everyfile older than 2 month.'''
with open(config_file, 'r') as f:
module_parameters = json.load(f)
self.vendor = module_parameters['vendor']
self.listname = module_parameters['name']
self.directory = storage_directory / self.vendor / self.listname / 'archive'
safe_create_dir(self.directory)
self.deep_archive = self.directory / 'deep'
safe_create_dir(self.deep_archive)
self.__init_logger(loglevel)
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__,
self.vendor, self.listname))
self.logger.setLevel(loglevel)
def archive(self):
to_archive = defaultdict(list)
today = date.today()
last_day_to_keep = date(today.year, today.month, 1) - relativedelta(months=2)
for p in self.directory.iterdir():
if not p.is_file():
continue
filedate = parser.parse(p.name.split('.')[0]).date()
if filedate >= last_day_to_keep:
continue
to_archive['{}.zip'.format(filedate.strftime('%Y%m'))].append(p)
if to_archive:
self.logger.info('Found old files. Archiving: {}'.format(', '.join(to_archive.keys())))
else:
self.logger.debug('No old files.')
for archivename, path_list in to_archive.items():
with zipfile.ZipFile(self.deep_archive / archivename, 'x', zipfile.ZIP_DEFLATED) as z:
for f in path_list:
z.write(f, f.name)
# Delete all the files if the archiving worked out properly
[f.unlink() for f in path_list]

View File

@ -16,3 +16,7 @@ class ArchiveException(BGPRankingException):
class CreateDirectoryException(BGPRankingException): class CreateDirectoryException(BGPRankingException):
pass pass
class MissingEnv(BGPRankingException):
pass

View File

@ -4,7 +4,7 @@
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from .exceptions import CreateDirectoryException from .exceptions import CreateDirectoryException, MissingEnv
from redis import StrictRedis from redis import StrictRedis
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from datetime import datetime, timedelta from datetime import datetime, timedelta
@ -16,10 +16,14 @@ def get_config_path():
def get_list_storage_path(): def get_list_storage_path():
if not os.environ.get('VIRTUAL_ENV'):
raise MissingEnv("VIRTUAL_ENV is missing. This project really wants to run from a virtual envoronment.")
return Path(os.environ['VIRTUAL_ENV']) return Path(os.environ['VIRTUAL_ENV'])
def get_homedir(): def get_homedir():
if not os.environ.get('BGPRANKING_HOME'):
raise MissingEnv("BGPRANKING_HOME is missing. Run the following from the home directory of the repository: export BGPRANKING_HOME='./'")
return Path(os.environ['BGPRANKING_HOME']) return Path(os.environ['BGPRANKING_HOME'])

Binary file not shown.

0
bin/shutdown.py Normal file → Executable file
View File

0
bin/start.py Normal file → Executable file
View File

0
bin/stop.py Normal file → Executable file
View File

79
old/initranking_RIPE.py Normal file
View File

@ -0,0 +1,79 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import json
from redis import Redis
import asyncio
from .libs.StatsRipeText import RIPECaching
from ipaddress import ip_network
class ASNLookup(RIPECaching):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
super().__init__(sourceapp, loglevel)
self.redis_cache = Redis(host='localhost', port=6382, db=0, decode_responses=True)
self.logger.debug('Starting ASN lookup cache')
async def get_all_asns(self):
reader, writer = await asyncio.open_connection(self.hostname, self.port)
to_send = '-d ris-asns list_asns=true asn_types=o sourceapp={}\n'.format(self.sourceapp)
writer.write(to_send.encode())
ris_asns = json.loads(await reader.read())
all_asns = ris_asns['asns']['originating']
if not all_asns:
self.logger.warning('No ASNs in ris-asns, something went wrong.')
else:
self.redis_cache.sadd('asns', *all_asns)
self.redis_cache.sadd('asns_to_lookup', *all_asns)
def fix_ipv4_networks(self, networks):
'''Because we can't have nice things.
Some netorks come without the last(s) bytes (i.e. 170.254.25/24)'''
to_return = []
for net in networks:
try:
to_return.append(ip_network(net))
except ValueError:
ip, mask = net.split('/')
iplist = ip.split('.')
iplist = iplist + ['0'] * (4 - len(iplist))
to_return.append(ip_network('{}/{}'.format('.'.join(iplist), mask)))
return to_return
async def get_originating_prefixes(self):
reader, writer = await asyncio.open_connection(self.hostname, self.port)
writer.write(b'-k\n')
while True:
asn = self.redis_cache.spop('asns_to_lookup')
if not asn:
break
self.logger.debug('ASN lookup: {}'.format(asn))
to_send = '-d ris-prefixes {} list_prefixes=true types=o af=v4,v6 noise=filter sourceapp={}\n'.format(asn, self.sourceapp)
writer.write(to_send.encode())
try:
data = await reader.readuntil(b'\n}\n')
except asyncio.streams.LimitOverrunError:
self.logger.debug('ASN lookup failed: {}'.format(asn))
self.redis_cache.sadd('asns_to_lookup', asn)
writer.close()
reader, writer = await asyncio.open_connection(self.hostname, self.port)
ris_prefixes = json.loads(data)
p = self.redis_cache.pipeline()
if ris_prefixes['prefixes']['v4']['originating']:
self.logger.debug('{} has ipv4'.format(asn))
fixed_networks = self.fix_ipv4_networks(ris_prefixes['prefixes']['v4']['originating'])
p.sadd('{}|v4'.format(asn), *[str(net) for net in fixed_networks])
total_ipv4 = sum([net.num_addresses for net in fixed_networks])
p.set('{}|v4|ipcount'.format(asn), total_ipv4)
if ris_prefixes['prefixes']['v6']['originating']:
self.logger.debug('{} has ipv6'.format(asn))
p.sadd('{}|v6'.format(asn), *ris_prefixes['prefixes']['v6']['originating'])
total_ipv6 = sum([ip_network(prefix).num_addresses for prefix in ris_prefixes['prefixes']['v6']['originating']])
p.set('{}|v4|ipcount'.format(asn), total_ipv6)
p.execute()
writer.write(b'-k\n')
writer.close()

61
old/risfetcher_RIPE.py Normal file
View File

@ -0,0 +1,61 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import json
from redis import Redis
from .libs.StatsRipeText import RIPECaching
import asyncio
class RISPrefixLookup(RIPECaching):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
super().__init__(sourceapp, loglevel)
self.logger.debug('Starting RIS Prefix fetcher')
def cache_prefix(self, redis_cache, ip, network_info, prefix_overview):
prefix = network_info['prefix']
asns = network_info['asns']
# description = prefix_overview['block']['desc']
# if not description:
# description = prefix_overview['block']['name']
p = redis_cache.pipeline()
for asn in asns:
p.hmset(ip, {'asn': asn, 'prefix': prefix}) # , 'description': description})
p.expire(ip, 43200) # 12H
p.execute()
async def run(self):
redis_cache = Redis(host='localhost', port=6581, db=0, decode_responses=True)
reader, writer = await asyncio.open_connection(self.hostname, self.port)
writer.write(b'-k\n')
while True:
ip = redis_cache.spop('for_ris_lookup')
if not ip: # TODO: add a check against something to stop the loop
self.logger.debug('Nothing to lookup')
await asyncio.sleep(10)
continue
if redis_cache.exists(ip):
self.logger.debug('Already cached: {}'.format(ip))
continue
self.logger.debug('RIS lookup: {}'.format(ip))
to_send = '-d network-info {} sourceapp={}\n'.format(ip, self.sourceapp)
writer.write(to_send.encode())
data = await reader.readuntil(b'\n}\n')
network_info = json.loads(data)
if not network_info.get('prefix'):
self.logger.warning('The IP {} does not seem to be announced'.format(ip))
continue
# self.logger.debug('Prefix lookup: {}'.format(ip))
# to_send = '-d prefix-overview {} sourceapp={}\n'.format(network_info['prefix'], self.sourceapp)
# writer.write(to_send.encode())
# data = await reader.readuntil(b'\n}\n')
# prefix_overview = json.loads(data)
# self.logger.debug('RIS cache prefix info: {}'.format(ip))
# self.cache_prefix(redis_cache, ip, network_info, prefix_overview)
self.cache_prefix(redis_cache, ip, network_info, {})
writer.write(b'-k\n')
writer.close()