From b4b012a430eb1d50bd9b8a55d44a75dc9fea9db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 29 Mar 2018 23:05:07 +0200 Subject: [PATCH] chg: cleanup --- bgpranking/fetcher/__init__.py | 0 bgpranking/fetcher/simple_feed_fetcher.py | 427 ---------------------- bgpranking/libs/exceptions.py | 4 + bgpranking/libs/helpers.py | 6 +- bin/.rislookup.py.swp | Bin 12288 -> 0 bytes bin/shutdown.py | 0 bin/start.py | 0 bin/stop.py | 0 {bgpranking/libs => old}/StatsRipe.py | 0 {bgpranking/libs => old}/StatsRipeText.py | 0 old/initranking_RIPE.py | 79 ++++ old/risfetcher_RIPE.py | 61 ++++ 12 files changed, 149 insertions(+), 428 deletions(-) delete mode 100644 bgpranking/fetcher/__init__.py delete mode 100644 bgpranking/fetcher/simple_feed_fetcher.py delete mode 100644 bin/.rislookup.py.swp mode change 100644 => 100755 bin/shutdown.py mode change 100644 => 100755 bin/start.py mode change 100644 => 100755 bin/stop.py rename {bgpranking/libs => old}/StatsRipe.py (100%) rename {bgpranking/libs => old}/StatsRipeText.py (100%) create mode 100644 old/initranking_RIPE.py create mode 100644 old/risfetcher_RIPE.py diff --git a/bgpranking/fetcher/__init__.py b/bgpranking/fetcher/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bgpranking/fetcher/simple_feed_fetcher.py b/bgpranking/fetcher/simple_feed_fetcher.py deleted file mode 100644 index 9e6d56e..0000000 --- a/bgpranking/fetcher/simple_feed_fetcher.py +++ /dev/null @@ -1,427 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import requests -import os -from dateutil import parser -from datetime import datetime, date -from hashlib import sha512 # Faster than sha256 on 64b machines. -from pathlib import Path -from dateutil.relativedelta import relativedelta -from collections import defaultdict -import zipfile -import logging -import asyncio -from pid import PidFile, PidFileError -import json -import re -from redis import Redis -from redis import StrictRedis -from uuid import uuid4 -from io import BytesIO -import importlib - -from typing import List -import types -import ipaddress - - -class BGPRankingException(Exception): - pass - - -class FetcherException(BGPRankingException): - pass - - -class ArchiveException(BGPRankingException): - pass - - -class CreateDirectoryException(BGPRankingException): - pass - - -""" -Directory structure: -storage_directory / vendor / listname -> files to import -storage_directory / vendor / listname / meta -> last modified & pid -storage_directory / vendor / listname / archive -> imported files <= 2 month old -storage_directory / vendor / listname / archive / deep -> imported files > 2 month old (zipped) -""" - - -def safe_create_dir(to_create: Path): - if to_create.exists() and not to_create.is_dir(): - raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create)) - os.makedirs(to_create, exist_ok=True) - - -class Fetcher(): - - def __init__(self, config_file: Path, storage_directory: Path, - loglevel: int=logging.DEBUG): - '''Load `config_file`, and store the fetched data into `storage_directory` - Note: if the `config_file` does not provide a URL (the file is - gathered by some oter mean), the fetcher is automatically stoped.''' - with open(config_file, 'r') as f: - module_parameters = json.load(f) - self.vendor = module_parameters['vendor'] - self.listname = module_parameters['name'] - self.__init_logger(loglevel) - self.fetcher = True - if 'url' not in module_parameters: - self.logger.info('No URL to fetch, breaking.') - self.fetcher = False - return - self.url = module_parameters['url'] - self.logger.debug('Starting fetcher on {}'.format(self.url)) - self.directory = storage_directory / self.vendor / self.listname - safe_create_dir(self.directory) - self.meta = self.directory / 'meta' - safe_create_dir(self.meta) - self.archive_dir = self.directory / 'archive' - safe_create_dir(self.archive_dir) - self.first_fetch = True - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, - self.vendor, self.listname)) - self.logger.setLevel(loglevel) - - def __get_last_modified(self): - r = requests.head(self.url) - if 'Last-Modified' in r.headers: - return parser.parse(r.headers['Last-Modified']) - return None - - def __newer(self): - '''Check if the file available for download is newed than the one - already downloaded by checking the `Last-Modified` header. - Note: return False if the file containing the last header content - is not existing, or the header doesn't have this key. - ''' - last_modified_path = self.meta / 'lastmodified' - if not last_modified_path.exists(): - # The file doesn't exists - if not self.first_fetch: - # The URL has no Last-Modified header, we cannot use it. - self.logger.debug('No Last-Modified header available') - return True - self.first_fetch = False - last_modified = self.__get_last_modified() - if last_modified: - self.logger.debug('Last-Modified header available') - with last_modified_path.open('w') as f: - f.write(last_modified.isoformat()) - else: - self.logger.debug('No Last-Modified header available') - return True - with last_modified_path.open() as f: - last_modified_file = parser.parse(f.read()) - last_modified = self.__get_last_modified() - if not last_modified: - # No more Last-Modified header Oo - self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname)) - last_modified_path.unlink() - return True - if last_modified > last_modified_file: - self.logger.info('Got a new file.') - with last_modified_path.open('w') as f: - f.write(last_modified.isoformat()) - return True - return False - - def __same_as_last(self, downloaded): - '''Figure out the last downloaded file, check if it is the same as the - newly downloaded one. Returns true if both files have been downloaded the - same day. - Note: we check the new and the archive directory because we may have backlog - and the newest file is always the first one we process - ''' - to_check = [] - to_check_new = sorted([f for f in self.directory.iterdir() if f.is_file()]) - if to_check_new: - # we have files waiting to be processed - self.logger.debug('{} file(s) are waiting to be processed'.format(len(to_check_new))) - to_check.append(to_check_new[-1]) - to_check_archive = sorted([f for f in self.archive_dir.iterdir() if f.is_file()]) - if to_check_archive: - # we have files already processed, in the archive - self.logger.debug('{} file(s) have been processed'.format(len(to_check_archive))) - to_check.append(to_check_archive[-1]) - if not to_check: - self.logger.debug('New list, no hisorical files') - # nothing has been downloaded ever, moving on - return False - for last_file in to_check: - with last_file.open('rb') as f: - last_hash = sha512(f.read()) - dl_hash = sha512(downloaded) - if (dl_hash.digest() == last_hash.digest() and - parser.parse(last_file.name.split('.')[0]).date() == date.today()): - self.logger.debug('Same file already downloaded today.') - return True - return False - - @asyncio.coroutine - async def fetch_list(self): - '''Fetch & store the list''' - if not self.fetcher: - return - try: - with PidFile('{}.pid'.format(self.listname), piddir=self.meta): - if not self.__newer(): - return - r = requests.get(self.url) - if self.__same_as_last(r.content): - return - self.logger.info('Got a new file \o/') - with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: - f.write(r.content) - except PidFileError: - self.logger.info('Fetcher already running') - - -# get announcer: https://stat.ripe.net/data/network-info/data.json?resource=149.13.33.14 - -class RawFilesParser(): - - def __init__(self, config_file: Path, storage_directory: Path, - loglevel: int=logging.DEBUG): - with open(config_file, 'r') as f: - module_parameters = json.load(f) - self.vendor = module_parameters['vendor'] - self.listname = module_parameters['name'] - if 'parser' in module_parameters: - self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser']).parse_raw_file, self) - self.source = '{}-{}'.format(self.vendor, self.listname) - self.directory = storage_directory / self.vendor / self.listname - safe_create_dir(self.directory) - self.__init_logger(loglevel) - self.redis_intake = Redis(host='localhost', port=6379, db=0) - self.logger.debug('Starting intake on {}'.format(self.source)) - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, - self.vendor, self.listname)) - self.logger.setLevel(loglevel) - - @property - def files_to_parse(self) -> List[Path]: - return sorted([f for f in self.directory.iterdir() if f.is_file()], reverse=True) - - def extract_ipv4(self, bytestream: bytes) -> List[bytes]: - return re.findall(rb'[0-9]+(?:\.[0-9]+){3}', bytestream) - - def parse_raw_file(self, f: BytesIO): - self.datetime = datetime.now() - return self.extract_ipv4(f.getvalue()) - - @asyncio.coroutine - async def parse_raw_files(self): - for filepath in self.files_to_parse: - self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1)) - with open(filepath, 'rb') as f: - to_parse = BytesIO(f.read()) - p = self.redis_intake.pipeline() - for ip in self.parse_raw_file(to_parse): - uuid = uuid4() - p.hmset(uuid, {'ip': ip, 'source': self.source, - 'datetime': self.datetime.isoformat()}) - p.sadd('intake', uuid) - p.execute() - self._archive(filepath) - - def _archive(self, filepath: Path): - '''After processing, move file to the archive directory''' - filepath.rename(self.directory / 'archive' / filepath.name) - - -class Sanitizer(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.__init_logger(loglevel) - self.redis_intake = Redis(host='localhost', port=6379, db=0, decode_responses=True) - self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True) - self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) - self.logger.debug('Starting import') - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) - self.logger.setLevel(loglevel) - - async def sanitize(self): - while True: - uuid = self.redis_intake.spop('intake') - if not uuid: - break - data = self.redis_intake.hgetall(uuid) - try: - ip = ipaddress.ip_address(data['ip']) - except ValueError: - self.logger.info('Invalid IP address: {}'.format(data['ip'])) - continue - if not ip.is_global: - self.logger.info('The IP address {} is not global'.format(data['ip'])) - continue - - date = parser.parse(data['datetime']).date().isoformat() - # NOTE: to consider: discard data with an old timestamp (define old) - - # Add to temporay DB for further processing - self.ris_cache.sadd('for_ris_lookup', str(ip)) - pipeline = self.redis_sanitized.pipeline() - pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], - 'date': date, 'datetime': data['datetime']}) - pipeline.sadd('to_insert', uuid) - pipeline.execute() - self.redis_intake.delete(uuid) - - -class DatabaseInsert(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.__init_logger(loglevel) - self.ardb_storage = StrictRedis(host='localhost', port=16379, decode_responses=True) - self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True) - self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) - self.logger.debug('Starting import') - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) - self.logger.setLevel(loglevel) - - async def insert(self): - while True: - uuid = self.redis_sanitized.spop('to_insert') - if not uuid: - break - data = self.redis_sanitized.hgetall(uuid) - # Data gathered from the RIS queries: - # * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo - # * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo - # * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview - ris_entry = self.ris_cache.hgetall(data['ip']) - if not ris_entry: - # RIS data not available yet, retry later - # FIXME: an IP can sometimes not be announced, we need to discard it - self.redis_sanitized.sadd('to_insert', uuid) - # In case this IP is missing in the set to process - self.ris_cache.sadd('for_ris_lookup', data['ip']) - continue - # Format: |sources -> set([, ...]) - self.ardb_storage.sadd('{}|sources'.format(data['date']), data['source']) - - # Format: | -> set([, ...]) - self.ardb_storage.sadd('{}|{}'.format(data['date'], data['source']), - ris_entry['asn']) - # Format: || -> set([, ...]) - self.ardb_storage.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']), - ris_entry['prefix']) - - # Format: ||| -> set([|, ...]) - self.ardb_storage.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], - ris_entry['asn'], - ris_entry['prefix']), - '{}|{}'.format(data['ip'], data['datetime'])) - self.redis_sanitized.delete(uuid) - - -class StatsRIPE(): - - def __init__(self, sourceapp='bgpranking-ng - CIRCL'): - self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}" - self.url_parameters = {'sourceapp': sourceapp} - - async def network_info(self, ip: str) -> dict: - method = 'network-info' - self.url_parameters['resource'] = ip - parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()]) - url = self.url.format(method=method, parameters=parameters) - response = requests.get(url) - return response.json() - - async def prefix_overview(self, prefix: str) -> dict: - method = 'prefix-overview' - self.url_parameters['resource'] = prefix - parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()]) - url = self.url.format(method=method, parameters=parameters) - response = requests.get(url) - return response.json() - - -class RoutingInformationServiceFetcher(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.__init_logger(loglevel) - self.ris_cache = Redis(host='localhost', port=6381, db=0) - self.logger.debug('Starting RIS fetcher') - self.ripe = StatsRIPE() - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) - self.logger.setLevel(loglevel) - - async def fetch(self): - while True: - ip = self.ris_cache.spop('for_ris_lookup') - if not ip: - break - ip = ip.decode() - network_info = await self.ripe.network_info(ip) - prefix = network_info['data']['prefix'] - asns = network_info['data']['asns'] - if not asns or not prefix: - self.logger.warning('The IP {} does not seem to be announced'.format(ip)) - continue - prefix_overview = await self.ripe.prefix_overview(prefix) - description = prefix_overview['data']['block']['desc'] - if not description: - description = prefix_overview['data']['block']['name'] - for asn in asns: - self.ris_cache.hmset(ip, {'asn': asn, 'prefix': prefix, - 'description': description}) - - -class DeepArchive(): - - def __init__(self, config_file: Path, storage_directory: Path, - loglevel: int=logging.DEBUG): - '''Archive everyfile older than 2 month.''' - with open(config_file, 'r') as f: - module_parameters = json.load(f) - self.vendor = module_parameters['vendor'] - self.listname = module_parameters['name'] - self.directory = storage_directory / self.vendor / self.listname / 'archive' - safe_create_dir(self.directory) - self.deep_archive = self.directory / 'deep' - safe_create_dir(self.deep_archive) - self.__init_logger(loglevel) - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, - self.vendor, self.listname)) - self.logger.setLevel(loglevel) - - def archive(self): - to_archive = defaultdict(list) - today = date.today() - last_day_to_keep = date(today.year, today.month, 1) - relativedelta(months=2) - for p in self.directory.iterdir(): - if not p.is_file(): - continue - filedate = parser.parse(p.name.split('.')[0]).date() - if filedate >= last_day_to_keep: - continue - to_archive['{}.zip'.format(filedate.strftime('%Y%m'))].append(p) - if to_archive: - self.logger.info('Found old files. Archiving: {}'.format(', '.join(to_archive.keys()))) - else: - self.logger.debug('No old files.') - for archivename, path_list in to_archive.items(): - with zipfile.ZipFile(self.deep_archive / archivename, 'x', zipfile.ZIP_DEFLATED) as z: - for f in path_list: - z.write(f, f.name) - # Delete all the files if the archiving worked out properly - [f.unlink() for f in path_list] diff --git a/bgpranking/libs/exceptions.py b/bgpranking/libs/exceptions.py index fa53aa6..d5ef0f1 100644 --- a/bgpranking/libs/exceptions.py +++ b/bgpranking/libs/exceptions.py @@ -16,3 +16,7 @@ class ArchiveException(BGPRankingException): class CreateDirectoryException(BGPRankingException): pass + + +class MissingEnv(BGPRankingException): + pass diff --git a/bgpranking/libs/helpers.py b/bgpranking/libs/helpers.py index 1d1cd6b..bbfdfd3 100644 --- a/bgpranking/libs/helpers.py +++ b/bgpranking/libs/helpers.py @@ -4,7 +4,7 @@ import os import sys from pathlib import Path -from .exceptions import CreateDirectoryException +from .exceptions import CreateDirectoryException, MissingEnv from redis import StrictRedis from redis.exceptions import ConnectionError from datetime import datetime, timedelta @@ -16,10 +16,14 @@ def get_config_path(): def get_list_storage_path(): + if not os.environ.get('VIRTUAL_ENV'): + raise MissingEnv("VIRTUAL_ENV is missing. This project really wants to run from a virtual envoronment.") return Path(os.environ['VIRTUAL_ENV']) def get_homedir(): + if not os.environ.get('BGPRANKING_HOME'): + raise MissingEnv("BGPRANKING_HOME is missing. Run the following from the home directory of the repository: export BGPRANKING_HOME='./'") return Path(os.environ['BGPRANKING_HOME']) diff --git a/bin/.rislookup.py.swp b/bin/.rislookup.py.swp deleted file mode 100644 index bf51047d2d5b888043976ef09c476f065d1d1a06..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI2O=}ZD7{{lct%{=b;9+VF*_33{w?kRTNf07U6{~`R44driX6WwhIy2jsi-Knl zeg{E6fM*ZZPoQT%fnPv9=`;HhV_Pb{2!&_h&+KNN{XftAW^)L6hD(q71G?@m5g3;U zxw-k_$>T+rJiSJU^0TqeW4m(V2+}C=jo3Re2o|4LhTTXQ-5o}m^5wRWQAbAIp^#lA zbeyK!dFE#O|MJ3hB!C1?K_E9_XML8et}GW{^@SVr^vdOjr!c@!B!C2v01`j~NB{{S z0VIF~&N=~;&5#$ciF4IPu2=KSp}D%n2?-zpB!C2v01`j~NB{{S0VIF~kN^@mg9QA5 zkXPpknVW^;;s5{o_y4zxgnR*g1|5K2g9e}t(60-Gd;%SSK7!tX-hvcp4YUfH1N{Wg zAE57`ub}s!H=viG40H#y2(r1af^Z-KB!C2v01`j~NB{{S0VIF~{yzcw*BC19ZwJXGSQ(Vv`6pvw>FCA=|PznazUkxNk8F?(O!=>8Pp^sV@(f; z+5+s*JPuvxysm4s^;UHTJfzdr+fb;76we*=Xj#QQ#Y3@I*5brz6!RS(_v(w1fOTd} zNMRUrtZf^3mXRJ6(%A3_I>X)m=Iy&S8JE_aE^CaoSrA-6h@GvpGW~1*l z23}*U*&=nGp|8d0Rw_dgIbo`xogKm)UyqDPcuUiUV>uQVg>`x7HJpTN4aeNl%{C#S zN)!60uicHsP`hIuXIyD2k}Orz?@gN?J?|qHa5B})QyaiRPa`H1-+~w+3$(M?p`$ba cejY8_e(fh@VZNJdRW#ajxkIykGfpM>4fnPm`2YX_ diff --git a/bin/shutdown.py b/bin/shutdown.py old mode 100644 new mode 100755 diff --git a/bin/start.py b/bin/start.py old mode 100644 new mode 100755 diff --git a/bin/stop.py b/bin/stop.py old mode 100644 new mode 100755 diff --git a/bgpranking/libs/StatsRipe.py b/old/StatsRipe.py similarity index 100% rename from bgpranking/libs/StatsRipe.py rename to old/StatsRipe.py diff --git a/bgpranking/libs/StatsRipeText.py b/old/StatsRipeText.py similarity index 100% rename from bgpranking/libs/StatsRipeText.py rename to old/StatsRipeText.py diff --git a/old/initranking_RIPE.py b/old/initranking_RIPE.py new file mode 100644 index 0000000..6ece6b6 --- /dev/null +++ b/old/initranking_RIPE.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import json +from redis import Redis +import asyncio + +from .libs.StatsRipeText import RIPECaching +from ipaddress import ip_network + + +class ASNLookup(RIPECaching): + + def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG): + super().__init__(sourceapp, loglevel) + self.redis_cache = Redis(host='localhost', port=6382, db=0, decode_responses=True) + self.logger.debug('Starting ASN lookup cache') + + async def get_all_asns(self): + reader, writer = await asyncio.open_connection(self.hostname, self.port) + to_send = '-d ris-asns list_asns=true asn_types=o sourceapp={}\n'.format(self.sourceapp) + writer.write(to_send.encode()) + ris_asns = json.loads(await reader.read()) + all_asns = ris_asns['asns']['originating'] + if not all_asns: + self.logger.warning('No ASNs in ris-asns, something went wrong.') + else: + self.redis_cache.sadd('asns', *all_asns) + self.redis_cache.sadd('asns_to_lookup', *all_asns) + + def fix_ipv4_networks(self, networks): + '''Because we can't have nice things. + Some netorks come without the last(s) bytes (i.e. 170.254.25/24)''' + to_return = [] + for net in networks: + try: + to_return.append(ip_network(net)) + except ValueError: + ip, mask = net.split('/') + iplist = ip.split('.') + iplist = iplist + ['0'] * (4 - len(iplist)) + to_return.append(ip_network('{}/{}'.format('.'.join(iplist), mask))) + return to_return + + async def get_originating_prefixes(self): + reader, writer = await asyncio.open_connection(self.hostname, self.port) + writer.write(b'-k\n') + while True: + asn = self.redis_cache.spop('asns_to_lookup') + if not asn: + break + self.logger.debug('ASN lookup: {}'.format(asn)) + to_send = '-d ris-prefixes {} list_prefixes=true types=o af=v4,v6 noise=filter sourceapp={}\n'.format(asn, self.sourceapp) + writer.write(to_send.encode()) + try: + data = await reader.readuntil(b'\n}\n') + except asyncio.streams.LimitOverrunError: + self.logger.debug('ASN lookup failed: {}'.format(asn)) + self.redis_cache.sadd('asns_to_lookup', asn) + writer.close() + reader, writer = await asyncio.open_connection(self.hostname, self.port) + + ris_prefixes = json.loads(data) + p = self.redis_cache.pipeline() + if ris_prefixes['prefixes']['v4']['originating']: + self.logger.debug('{} has ipv4'.format(asn)) + fixed_networks = self.fix_ipv4_networks(ris_prefixes['prefixes']['v4']['originating']) + p.sadd('{}|v4'.format(asn), *[str(net) for net in fixed_networks]) + total_ipv4 = sum([net.num_addresses for net in fixed_networks]) + p.set('{}|v4|ipcount'.format(asn), total_ipv4) + if ris_prefixes['prefixes']['v6']['originating']: + self.logger.debug('{} has ipv6'.format(asn)) + p.sadd('{}|v6'.format(asn), *ris_prefixes['prefixes']['v6']['originating']) + total_ipv6 = sum([ip_network(prefix).num_addresses for prefix in ris_prefixes['prefixes']['v6']['originating']]) + p.set('{}|v4|ipcount'.format(asn), total_ipv6) + p.execute() + writer.write(b'-k\n') + writer.close() diff --git a/old/risfetcher_RIPE.py b/old/risfetcher_RIPE.py new file mode 100644 index 0000000..c838101 --- /dev/null +++ b/old/risfetcher_RIPE.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import json +from redis import Redis + +from .libs.StatsRipeText import RIPECaching +import asyncio + + +class RISPrefixLookup(RIPECaching): + + def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG): + super().__init__(sourceapp, loglevel) + self.logger.debug('Starting RIS Prefix fetcher') + + def cache_prefix(self, redis_cache, ip, network_info, prefix_overview): + prefix = network_info['prefix'] + asns = network_info['asns'] + # description = prefix_overview['block']['desc'] + # if not description: + # description = prefix_overview['block']['name'] + p = redis_cache.pipeline() + for asn in asns: + p.hmset(ip, {'asn': asn, 'prefix': prefix}) # , 'description': description}) + p.expire(ip, 43200) # 12H + p.execute() + + async def run(self): + redis_cache = Redis(host='localhost', port=6581, db=0, decode_responses=True) + reader, writer = await asyncio.open_connection(self.hostname, self.port) + + writer.write(b'-k\n') + while True: + ip = redis_cache.spop('for_ris_lookup') + if not ip: # TODO: add a check against something to stop the loop + self.logger.debug('Nothing to lookup') + await asyncio.sleep(10) + continue + if redis_cache.exists(ip): + self.logger.debug('Already cached: {}'.format(ip)) + continue + self.logger.debug('RIS lookup: {}'.format(ip)) + to_send = '-d network-info {} sourceapp={}\n'.format(ip, self.sourceapp) + writer.write(to_send.encode()) + data = await reader.readuntil(b'\n}\n') + network_info = json.loads(data) + if not network_info.get('prefix'): + self.logger.warning('The IP {} does not seem to be announced'.format(ip)) + continue + # self.logger.debug('Prefix lookup: {}'.format(ip)) + # to_send = '-d prefix-overview {} sourceapp={}\n'.format(network_info['prefix'], self.sourceapp) + # writer.write(to_send.encode()) + # data = await reader.readuntil(b'\n}\n') + # prefix_overview = json.loads(data) + # self.logger.debug('RIS cache prefix info: {}'.format(ip)) + # self.cache_prefix(redis_cache, ip, network_info, prefix_overview) + self.cache_prefix(redis_cache, ip, network_info, {}) + writer.write(b'-k\n') + writer.close()