diff --git a/bgpranking/abstractmanager.py b/bgpranking/abstractmanager.py index f2ae4e6..3b9f91b 100644 --- a/bgpranking/abstractmanager.py +++ b/bgpranking/abstractmanager.py @@ -10,23 +10,23 @@ from .libs.helpers import long_sleep, shutdown_requested class AbstractManager(ABC): def __init__(self, loglevel: int=logging.DEBUG): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(loglevel) - self.logger.info('Initializing {}'.format(self.__class__.__name__)) + self.logger.info(f'Initializing {self.__class__.__name__}') @abstractmethod def _to_run_forever(self): pass def run(self, sleep_in_sec: int): - self.logger.info('Launching {}'.format(self.__class__.__name__)) + self.logger.info(f'Launching {self.__class__.__name__}') while True: if shutdown_requested(): break try: self._to_run_forever() except Exception: - self.logger.exception('Something went terribly wrong in {}.'.format(self.__class__.__name__)) + self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') if not long_sleep(sleep_in_sec): break - self.logger.info('Shutting down {}'.format(self.__class__.__name__)) + self.logger.info(f'Shutting down {self.__class__.__name__}') diff --git a/bgpranking/archive.py b/bgpranking/archive.py index c9bd955..ad3e042 100644 --- a/bgpranking/archive.py +++ b/bgpranking/archive.py @@ -29,8 +29,7 @@ class DeepArchive(): self.__init_logger(loglevel) def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, - self.vendor, self.listname)) + self.logger = logging.getLogger(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') self.logger.setLevel(loglevel) def archive(self): diff --git a/bgpranking/dbinsert.py b/bgpranking/dbinsert.py index e51bc86..12350ff 100644 --- a/bgpranking/dbinsert.py +++ b/bgpranking/dbinsert.py @@ -16,7 +16,7 @@ class DatabaseInsert(): self.logger.debug('Starting import') def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(loglevel) def insert(self): @@ -38,7 +38,7 @@ class DatabaseInsert(): for i, uuid in enumerate(uuids): data = sanitized_data[i] if not data: - self.logger.warning('No data for UUID {}. This should not happen, but lets move on.'.format(uuid)) + self.logger.warning(f'No data for UUID {uuid}. This should not happen, but lets move on.') continue # Data gathered from the RIS queries: # * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo @@ -52,17 +52,16 @@ class DatabaseInsert(): prefix_missing.append(data['ip']) continue # Format: |sources -> set([, ...]) - ardb_pipeline.sadd('{}|sources'.format(data['date']), data['source']) + ardb_pipeline.sadd(f"{data['date']}|sources", data['source']) # Format: | -> set([, ...]) - ardb_pipeline.sadd('{}|{}'.format(data['date'], data['source']), ris_entry['asn']) + ardb_pipeline.sadd(f"{data['date']}|{data['source']}", ris_entry['asn']) # Format: || -> set([, ...]) - ardb_pipeline.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']), - ris_entry['prefix']) + ardb_pipeline.sadd(f"{data['date']}|{data['source']}|{ris_entry['asn']}", ris_entry['prefix']) # Format: ||| -> set([|, ...]) - ardb_pipeline.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn'], ris_entry['prefix']), - '{}|{}'.format(data['ip'], data['datetime'])) + ardb_pipeline.sadd(f"{data['date']}|{data['source']}|{ris_entry['asn']}|{ris_entry['prefix']}", + f"{data['ip']}|{data['datetime']}") done.append(uuid) ardb_pipeline.execute() if prefix_missing: diff --git a/bgpranking/libs/helpers.py b/bgpranking/libs/helpers.py index 2b65f30..bd43899 100644 --- a/bgpranking/libs/helpers.py +++ b/bgpranking/libs/helpers.py @@ -9,6 +9,20 @@ from redis import StrictRedis from redis.exceptions import ConnectionError from datetime import datetime, timedelta import time +import json + + +def load_config_files(config_dir: Path=None) -> dict: + if not config_dir: + config_dir = get_config_path() + modules_config = config_dir / 'modules' + modules_paths = [modulepath for modulepath in modules_config.glob('*.json')] + configs = {} + for p in modules_paths: + with open(p, 'r') as f: + j = json.load(f) + configs[f"{j['vendor']}-{j['name']}"] = j + return configs def get_config_path(): @@ -32,7 +46,7 @@ def get_homedir(): def safe_create_dir(to_create: Path): if to_create.exists() and not to_create.is_dir(): - raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create)) + raise CreateDirectoryException(f'The path {to_create} already exists and is not a directory') os.makedirs(to_create, exist_ok=True) diff --git a/bgpranking/modulesfetcher.py b/bgpranking/modulesfetcher.py index 9b96472..b159500 100644 --- a/bgpranking/modulesfetcher.py +++ b/bgpranking/modulesfetcher.py @@ -31,7 +31,7 @@ class Fetcher(): self.fetcher = False return self.url = module_parameters['url'] - self.logger.debug('Starting fetcher on {}'.format(self.url)) + self.logger.debug(f'Starting fetcher on {self.url}') self.directory = storage_directory / self.vendor / self.listname safe_create_dir(self.directory) self.meta = self.directory / 'meta' @@ -41,8 +41,7 @@ class Fetcher(): self.first_fetch = True def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, - self.vendor, self.listname)) + self.logger = logging.getLogger(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') self.logger.setLevel(loglevel) async def __get_last_modified(self): @@ -81,7 +80,7 @@ class Fetcher(): last_modified = await self.__get_last_modified() if not last_modified: # No more Last-Modified header Oo - self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname)) + self.logger.warning(f'{self.listname}: Last-Modified header was present, isn\'t anymore!') last_modified_path.unlink() return True if last_modified > last_modified_file: @@ -127,11 +126,11 @@ class Fetcher(): '''Fetch & store the list''' if not self.fetcher: return - set_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) + set_running(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') try: - with PidFile('{}.pid'.format(self.listname), piddir=self.meta): + with PidFile(f'{self.listname}.pid', piddir=self.meta): if not await self.__newer(): - unset_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) + unset_running(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') return async with aiohttp.ClientSession() as session: async with session.get(self.url) as r: @@ -141,8 +140,8 @@ class Fetcher(): self.logger.info('Got a new file \o/') with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: f.write(content) - unset_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) + unset_running(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') except PidFileError: self.logger.info('Fetcher already running') finally: - unset_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) + unset_running(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') diff --git a/bgpranking/parser.py b/bgpranking/parser.py index 56e4f16..15c2cf1 100644 --- a/bgpranking/parser.py +++ b/bgpranking/parser.py @@ -27,16 +27,15 @@ class RawFilesParser(): self.listname = module_parameters['name'] if 'parser' in module_parameters: self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser'], 'bgpranking').parse_raw_file, self) - self.source = '{}-{}'.format(self.vendor, self.listname) + self.source = f'{self.vendor}-{self.listname}' self.directory = storage_directory / self.vendor / self.listname safe_create_dir(self.directory) self.__init_logger(loglevel) self.redis_intake = StrictRedis(unix_socket_path=get_socket_path('intake'), db=0) - self.logger.debug('Starting intake on {}'.format(self.source)) + self.logger.debug(f'Starting intake on {self.source}') def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, - self.vendor, self.listname)) + self.logger = logging.getLogger(f'{self.__class__.__name__}-{self.vendor}-{self.listname}') self.logger.setLevel(loglevel) @property @@ -56,7 +55,7 @@ class RawFilesParser(): return self.extract_ipv4(f.getvalue()) def parse_raw_files(self): - set_running('{}-{}'.format(self.__class__.__name__, self.source)) + set_running(f'{self.__class__.__name__}-{self.source}') for filepath in self.files_to_parse: self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1)) with open(filepath, 'rb') as f: @@ -69,7 +68,7 @@ class RawFilesParser(): p.sadd('intake', uuid) p.execute() self._archive(filepath) - unset_running('{}-{}'.format(self.__class__.__name__, self.source)) + unset_running(f'{self.__class__.__name__}-{self.source}') def _archive(self, filepath: Path): '''After processing, move file to the archive directory''' diff --git a/bgpranking/prefixdb.py b/bgpranking/prefixdb.py index a15745b..6612f4f 100644 --- a/bgpranking/prefixdb.py +++ b/bgpranking/prefixdb.py @@ -11,7 +11,7 @@ from collections import defaultdict import re import time from .libs.helpers import set_running, unset_running, get_socket_path - +from dateutil.parser import parse # Dataset source: Routeviews Prefix to AS mappings Dataset for IPv4 and IPv6 # http://www.caida.org/data/routing/routeviews-prefix2as.xml @@ -22,11 +22,12 @@ class PrefixDatabase(): def __init__(self, loglevel: int=logging.DEBUG): self.__init_logger(loglevel) self.prefix_cache = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=0, decode_responses=True) + self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True) self.ipv6_url = 'http://data.caida.org/datasets/routing/routeviews6-prefix2as/{}' self.ipv4_url = 'http://data.caida.org/datasets/routing/routeviews-prefix2as/{}' def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(loglevel) def update_required(self): @@ -42,13 +43,14 @@ class PrefixDatabase(): r = requests.get(root_url.format('pfx2as-creation.log')) last_entry = r.text.split('\n')[-2] path = last_entry.split('\t')[-1] - if path == self.prefix_cache.get('current|{}'.format(address_family)): - self.logger.debug('Same file already loaded: {}'.format(path)) + if path == self.prefix_cache.get(f'current|{address_family}'): + self.logger.debug(f'Same file already loaded: {path}') return False, path return True, path - def _init_routes(self, address_family, root_url, path): - self.logger.debug('Loading {}'.format(path)) + def _init_routes(self, address_family, root_url, path) -> bool: + self.logger.debug(f'Loading {path}') + date = parse(re.findall('http://data.caida.org/datasets/routing/routeviews[6]?-prefix2as/(?:.*)/(?:.*)/routeviews-rv[2,6]-(.*)-(?:.*).pfx2as.gz', path)[0]).date().isoformat() r = requests.get(root_url.format(path)) to_import = defaultdict(lambda: {address_family: set(), 'ipcount': 0}) with gzip.open(BytesIO(r.content), 'r') as f: @@ -56,17 +58,23 @@ class PrefixDatabase(): prefix, length, asns = line.decode().strip().split('\t') # The meaning of AS set and multi-origin AS in unclear. Taking the first ASN in the list only. asn = re.split('[,_]', asns)[0] - network = ip_network('{}/{}'.format(prefix, length)) + network = ip_network(f'{prefix}/{length}') to_import[asn][address_family].add(str(network)) to_import[asn]['ipcount'] += network.num_addresses p = self.prefix_cache.pipeline() + p_asn_meta = self.asn_meta.pipeline() p.sadd('asns', *to_import.keys()) + p_asn_meta.sadd(f'{address_family}|last', date) # Not necessarely today + p_asn_meta.sadd(f'{date}|asns|{address_family}', *to_import.keys()) for asn, data in to_import.items(): - p.sadd('{}|{}'.format(asn, address_family), *data[address_family]) - p.set('{}|{}|ipcount'.format(asn, address_family), data['ipcount']) - p.set('current|{}'.format(address_family), path) + p.sadd(f'{asn}|{address_family}', *data[address_family]) + p.set(f'{asn}|{address_family}|ipcount', data['ipcount']) + p_asn_meta.sadd(f'{date}|{asn}|{address_family}', *data[address_family]) + p_asn_meta.set(f'{date}|{asn}|{address_family}|ipcount', data['ipcount']) + p.set(f'current|{address_family}', path) p.execute() + p_asn_meta.execute() return True def load_prefixes(self): diff --git a/bgpranking/ranking.py b/bgpranking/ranking.py new file mode 100644 index 0000000..ebdfea9 --- /dev/null +++ b/bgpranking/ranking.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from redis import StrictRedis +from .libs.helpers import set_running, unset_running, get_socket_path, load_config_files +from datetime import date +from ipaddress import ip_network +from pathlib import Path + + +class Ranking(): + + def __init__(self, config_dir: Path=None, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True) + self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True) + self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True) + self.prefix_cache = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=0, decode_responses=True) + self.config_files = load_config_files(config_dir) + + def __init_logger(self, loglevel): + self.logger = logging.getLogger(f'{self.__class__.__name__}') + self.logger.setLevel(loglevel) + + def compute(self): + self.logger.info('Start ranking') + set_running(self.__class__.__name__) + today = date.today().isoformat() + v4_last = self.asn_meta.get('v4|last') + v6_last = self.asn_meta.get('v6|last') + if not v4_last or not v6_last: + '''Failsafe if asn_meta has not been populated yet''' + return + for source in self.ardb_storage.smembers(f'{today}|sources'): + self.logger.info(f'{today} - Ranking source: {source}') + r_pipeline = self.ranking.pipeline() + for asn in self.ardb_storage.smembers(f'{today}|{source}'): + self.logger.debug(f'{today} - Ranking source: {source} / ASN: {asn}') + asn_rank_v4 = 0.0 + asn_rank_v6 = 0.0 + for prefix in self.ardb_storage.smembers(f'{today}|{source}|{asn}'): + ips = set([ip_ts.split('|')[0] + for ip_ts in self.ardb_storage.smembers(f'{today}|{source}|{asn}|{prefix}')]) + prefix_rank = float(len(ips)) / ip_network(prefix).num_addresses + r_pipeline.zadd(f'{today}|{source}|{asn}|rankv{prefix_rank.version}|prefixes', prefix, prefix_rank) + if prefix_rank.version == 4: + asn_rank_v4 += len(ips) * self.config_files[source]['impact'] + else: + asn_rank_v6 += len(ips) * self.config_files[source]['impact'] + asn_rank_v4 /= int(self.asn_meta.get(f'{v4_last}|{asn}|v4|ipcount')) + asn_rank_v6 /= int(self.asn_meta.get(f'{v6_last}|{asn}|v6|ipcount')) + if asn_rank_v4: + r_pipeline.set(f'{today}|{source}|{asn}|rankv4', asn_rank_v4) + if asn_rank_v6: + r_pipeline.set(f'{today}|{source}|{asn}|rankv6', asn_rank_v6) + r_pipeline.execute() + unset_running(self.__class__.__name__) + self.logger.info('Ranking done.') diff --git a/bgpranking/risfetcher.py b/bgpranking/risfetcher.py index d895c00..5052a1b 100644 --- a/bgpranking/risfetcher.py +++ b/bgpranking/risfetcher.py @@ -22,7 +22,7 @@ class RISPrefixLookup(): self.init_tree() def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(loglevel) def cache_prefix(self, pipe, ip, prefix, asns): @@ -31,9 +31,9 @@ class RISPrefixLookup(): def init_tree(self): for asn in self.prefix_db.smembers('asns'): - for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v4')): + for prefix in self.prefix_db.smembers(f'{asn}|v4'): self.tree_v4[prefix] = asn - for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v6')): + for prefix in self.prefix_db.smembers(f'{asn}|v6'): self.tree_v6[prefix] = asn self.tree_v4['0.0.0.0/0'] = 0 self.tree_v4['::/0'] = 0 @@ -54,7 +54,7 @@ class RISPrefixLookup(): pipe = self.longest_prefix_matching.pipeline(transaction=False) for ip in ips: if self.longest_prefix_matching.exists(ip): - self.logger.debug('Already cached: {}'.format(ip)) + self.logger.debug(f'Already cached: {ip}') continue ip = ipaddress.ip_address(ip) if ip.version == 4: @@ -64,7 +64,7 @@ class RISPrefixLookup(): prefix = self.tree_v6.get_key(ip) asns = self.tree_v6.get(ip) if not prefix: - self.logger.warning('The IP {} does not seem to be announced'.format(ip)) + self.logger.warning(f'The IP {ip} does not seem to be announced') continue self.cache_prefix(pipe, ip, prefix, asns) pipe.execute() diff --git a/bgpranking/sanitizer.py b/bgpranking/sanitizer.py index be7709f..f5d05e0 100644 --- a/bgpranking/sanitizer.py +++ b/bgpranking/sanitizer.py @@ -19,7 +19,7 @@ class Sanitizer(): self.logger.debug('Starting import') def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(loglevel) def sanitize(self): @@ -37,10 +37,10 @@ class Sanitizer(): try: ip = ipaddress.ip_address(data['ip']) except ValueError: - self.logger.info('Invalid IP address: {}'.format(data['ip'])) + self.logger.info(f"Invalid IP address: {data['ip']}") continue if not ip.is_global: - self.logger.info('The IP address {} is not global'.format(data['ip'])) + self.logger.info(f"The IP address {data['ip']} is not global") continue date = parser.parse(data['datetime']).date().isoformat() diff --git a/bin/fetcher.py b/bin/fetcher.py index c18671f..c87a1ef 100755 --- a/bin/fetcher.py +++ b/bin/fetcher.py @@ -34,7 +34,7 @@ class ModulesManager(AbstractManager): return_exceptions=True) ) except aiohttp.client_exceptions.ClientConnectorError as e: - self.logger.critical('Exception while fetching lists: {}'.format(e)) + self.logger.critical(f'Exception while fetching lists: {e}') if __name__ == '__main__': diff --git a/bin/loadprefixes.py b/bin/loadprefixes.py index 46db4eb..5208b2a 100755 --- a/bin/loadprefixes.py +++ b/bin/loadprefixes.py @@ -22,7 +22,7 @@ class PrefixDBManager(AbstractManager): if self.prefix_db.update_required(): self.prefix_db.load_prefixes() except requests.exceptions.ConnectionError as e: - self.logger.critical('Unable to download the prefix database: {}'.format(e)) + self.logger.critical(f'Unable to download the prefix database: {e}') if __name__ == '__main__': diff --git a/bin/ranking.py b/bin/ranking.py new file mode 100755 index 0000000..61ba662 --- /dev/null +++ b/bin/ranking.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from bgpranking.abstractmanager import AbstractManager +from bgpranking.ranking import Ranking +from pathlib import Path + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class RankingManager(AbstractManager): + + def __init__(self, config_dir: Path=None, loglevel: int=logging.DEBUG): + super().__init__(loglevel) + self.ranking = Ranking(config_dir, loglevel) + + def _to_run_forever(self): + self.ranking.compute() + + +if __name__ == '__main__': + dbinsert = RankingManager() + dbinsert.run(sleep_in_sec=3600) diff --git a/bin/run_backend.py b/bin/run_backend.py index 54e86d0..75ed348 100755 --- a/bin/run_backend.py +++ b/bin/run_backend.py @@ -71,9 +71,9 @@ def check_all(stop=False): break for b in backends: if not stop and not b[1]: - print('Waiting on {}'.format(b[0])) + print(f"Waiting on {b[0]}") if stop and b[1]: - print('Waiting on {}'.format(b[0])) + print(f"Waiting on {b[0]}") time.sleep(1) diff --git a/bin/start.py b/bin/start.py index 9705a3d..b82967b 100755 --- a/bin/start.py +++ b/bin/start.py @@ -16,3 +16,4 @@ if __name__ == '__main__': Popen(['parser.py']) Popen(['sanitizer.py']) Popen(['dbinsert.py']) + Popen(['ranking.py']) diff --git a/setup.py b/setup.py index d80b69c..b0c8bae 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setup( packages=['bgpranking'], scripts=['bin/archiver.py', 'bin/dbinsert.py', 'bin/fetcher.py', 'bin/parser.py', 'bin/loadprefixes.py', 'bin/rislookup.py', 'bin/sanitizer.py', 'bin/run_backend.py', - 'bin/monitor.py', 'bin/start.py', 'bin/stop.py', 'bin/shutdown.py'], + 'bin/monitor.py', 'bin/ranking.py', 'bin/start.py', 'bin/stop.py', 'bin/shutdown.py'], classifiers=[ 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', 'Development Status :: 3 - Alpha',