diff --git a/bgpranking/abstractmanager.py b/bgpranking/abstractmanager.py new file mode 100644 index 0000000..d4fc730 --- /dev/null +++ b/bgpranking/abstractmanager.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from abc import ABC, abstractmethod +import logging + +from .libs.helpers import long_sleep, shutdown_requested + + +class AbstractManager(ABC): + + def __init__(self, loglevel: int=logging.DEBUG): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + self.logger.info('Initializing {}'.format(self.__class__.__name__)) + + @abstractmethod + def _to_run_forever(self): + pass + + def run(self, sleep_in_sec: int): + self.logger.info('Launching {}'.format(self.__class__.__name__)) + while True: + if shutdown_requested(): + break + self._to_run_forever() + if not long_sleep(sleep_in_sec): + break + self.logger.info('Shutting down {}'.format(self.__class__.__name__)) diff --git a/bgpranking/dbinsert.py b/bgpranking/dbinsert.py index ada2fde..3f57edd 100644 --- a/bgpranking/dbinsert.py +++ b/bgpranking/dbinsert.py @@ -25,35 +25,53 @@ class DatabaseInsert(): while True: if shutdown_requested(): break - uuid = self.redis_sanitized.spop('to_insert') - if not uuid: + uuids = self.redis_sanitized.spop('to_insert', 1000) + if not uuids: break - data = self.redis_sanitized.hgetall(uuid) - # Data gathered from the RIS queries: - # * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo - # * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo - # * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview - ris_entry = self.ris_cache.hgetall(data['ip']) - if not ris_entry: - # RIS data not available yet, retry later - self.redis_sanitized.sadd('to_insert', uuid) - # In case this IP is missing in the set to process - self.ris_cache.sadd('for_ris_lookup', data['ip']) - continue - # Format: |sources -> set([, ...]) - self.ardb_storage.sadd('{}|sources'.format(data['date']), data['source']) + p = self.redis_sanitized.pipeline(transaction=False) + [p.hgetall(uuid) for uuid in uuids] + sanitized_data = p.execute() - # Format: | -> set([, ...]) - self.ardb_storage.sadd('{}|{}'.format(data['date'], data['source']), - ris_entry['asn']) - # Format: || -> set([, ...]) - self.ardb_storage.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']), + retry = [] + done = [] + prefix_missing = [] + ardb_pipeline = self.ardb_storage.pipeline(transaction=False) + for i, uuid in enumerate(uuids): + data = sanitized_data[i] + if not data: + self.logger.warning('No data for UUID {}. This should not happen, but lets move on.'.format(uuid)) + continue + # Data gathered from the RIS queries: + # * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo + # * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo + # * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview + ris_entry = self.ris_cache.hgetall(data['ip']) + if not ris_entry: + # RIS data not available yet, retry later + retry.append(uuid) + # In case this IP is missing in the set to process + prefix_missing.append(data['ip']) + continue + # Format: |sources -> set([, ...]) + ardb_pipeline.sadd('{}|sources'.format(data['date']), data['source']) + + # Format: | -> set([, ...]) + ardb_pipeline.sadd('{}|{}'.format(data['date'], data['source']), ris_entry['asn']) + # Format: || -> set([, ...]) + ardb_pipeline.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']), ris_entry['prefix']) - # Format: ||| -> set([|, ...]) - self.ardb_storage.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], - ris_entry['asn'], - ris_entry['prefix']), + # Format: ||| -> set([|, ...]) + ardb_pipeline.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn'], ris_entry['prefix']), '{}|{}'.format(data['ip'], data['datetime'])) - self.redis_sanitized.delete(uuid) + done.append(uuid) + ardb_pipeline.execute() + if prefix_missing: + self.ris_cache.sadd('for_ris_lookup', *prefix_missing) + p = self.redis_sanitized.pipeline(transaction=False) + if done: + p.delete(*done) + if retry: + p.sadd('to_insert', *retry) + p.execute() unset_running(self.__class__.__name__) diff --git a/bin/dbinsert.py b/bin/dbinsert.py index 9aca9fa..40f2e68 100755 --- a/bin/dbinsert.py +++ b/bin/dbinsert.py @@ -2,28 +2,23 @@ # -*- coding: utf-8 -*- import logging +from bgpranking.abstractmanager import AbstractManager from bgpranking.dbinsert import DatabaseInsert -from bgpranking.libs.helpers import long_sleep, shutdown_requested logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') -class DBInsertManager(): +class DBInsertManager(AbstractManager): def __init__(self, loglevel: int=logging.DEBUG): - self.loglevel = loglevel + super().__init__(loglevel) self.dbinsert = DatabaseInsert(loglevel) - def run_insert(self): + def _to_run_forever(self): self.dbinsert.insert() if __name__ == '__main__': - modules_manager = DBInsertManager() - while True: - if shutdown_requested(): - break - modules_manager.run_insert() - if not long_sleep(120): - break + dbinsert = DBInsertManager() + dbinsert.run(sleep_in_sec=120) diff --git a/bin/fetcher.py b/bin/fetcher.py index beb3fcf..9ee38c7 100755 --- a/bin/fetcher.py +++ b/bin/fetcher.py @@ -4,21 +4,20 @@ import logging import asyncio from pathlib import Path -from bgpranking.libs.helpers import long_sleep, shutdown_requested import aiohttp +from bgpranking.abstractmanager import AbstractManager from bgpranking.modulesfetcher import Fetcher from bgpranking.libs.helpers import get_config_path, get_list_storage_path logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') -logger = logging.getLogger('Fetcher') - -class ModulesManager(): +class ModulesManager(AbstractManager): def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.DEBUG): + super().__init__(loglevel) if not config_dir: config_dir = get_config_path() if not storage_directory: @@ -27,23 +26,18 @@ class ModulesManager(): modules_paths = [modulepath for modulepath in modules_config.glob('*.json')] self.modules = [Fetcher(path, storage_directory, loglevel) for path in modules_paths] - async def run_fetchers(self): - await asyncio.gather( - *[module.fetch_list() for module in self.modules if module.fetcher] - ) + def _to_run_forever(self): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(asyncio.gather( + *[module.fetch_list() for module in self.modules if module.fetcher]) + ) + except aiohttp.client_exceptions.ClientConnectorError as e: + self.logger.critical('Exception while fetching lists: {}'.format(e)) + finally: + loop.close() if __name__ == '__main__': modules_manager = ModulesManager() - loop = asyncio.get_event_loop() - while True: - if shutdown_requested(): - break - try: - loop.run_until_complete(modules_manager.run_fetchers()) - except aiohttp.client_exceptions.ClientConnectorError: - logger.critical('Exception while fetching lists.') - long_sleep(60) - continue - if not long_sleep(3600): - break + modules_manager.run(sleep_in_sec=3600) diff --git a/bin/loadprefixes.py b/bin/loadprefixes.py index f3980ab..46db4eb 100755 --- a/bin/loadprefixes.py +++ b/bin/loadprefixes.py @@ -2,39 +2,29 @@ # -*- coding: utf-8 -*- import logging -from bgpranking.prefixdb import PrefixDatabase -from bgpranking.libs.helpers import long_sleep, shutdown_requested import requests +from bgpranking.abstractmanager import AbstractManager +from bgpranking.prefixdb import PrefixDatabase + logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') -logger = logging.getLogger('PrefixDB Fetcher') - -class PrefixDBManager(): +class PrefixDBManager(AbstractManager): def __init__(self, loglevel: int=logging.DEBUG): + super().__init__(loglevel) self.prefix_db = PrefixDatabase(loglevel=loglevel) - def load_prefixes(self): - self.prefix_db.load_prefixes() - - def needs_update(self): - return self.prefix_db.update_required() + def _to_run_forever(self): + try: + if self.prefix_db.update_required(): + self.prefix_db.load_prefixes() + except requests.exceptions.ConnectionError as e: + self.logger.critical('Unable to download the prefix database: {}'.format(e)) if __name__ == '__main__': p = PrefixDBManager() - while True: - if shutdown_requested(): - break - try: - if p.needs_update(): - p.load_prefixes() - except requests.exceptions.ConnectionError: - logger.critical('Unable to download the prefix database.') - long_sleep(60) - continue - if not long_sleep(3600): - break + p.run(sleep_in_sec=3600) diff --git a/bin/parser.py b/bin/parser.py index 61048fa..42a4431 100755 --- a/bin/parser.py +++ b/bin/parser.py @@ -3,17 +3,19 @@ import logging from pathlib import Path + +from bgpranking.abstractmanager import AbstractManager from bgpranking.parser import RawFilesParser from bgpranking.libs.helpers import get_config_path, get_list_storage_path -from bgpranking.libs.helpers import long_sleep, shutdown_requested logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') -class ParserManager(): +class ParserManager(AbstractManager): def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.DEBUG): + super().__init__(loglevel) if not config_dir: config_dir = get_config_path() if not storage_directory: @@ -22,15 +24,10 @@ class ParserManager(): modules_paths = [modulepath for modulepath in modules_config.glob('*.json')] self.modules = [RawFilesParser(path, storage_directory, loglevel) for path in modules_paths] - def run_intake(self): + def _to_run_forever(self): [module.parse_raw_files() for module in self.modules] if __name__ == '__main__': parser_manager = ParserManager() - while True: - if shutdown_requested(): - break - parser_manager.run_intake() - if not long_sleep(120): - break + parser_manager.run(sleep_in_sec=120) diff --git a/bin/rislookup.py b/bin/rislookup.py index 8553fe9..39f4e91 100755 --- a/bin/rislookup.py +++ b/bin/rislookup.py @@ -2,27 +2,24 @@ # -*- coding: utf-8 -*- import logging + +from bgpranking.abstractmanager import AbstractManager from bgpranking.risfetcher import RISPrefixLookup -from bgpranking.libs.helpers import long_sleep, shutdown_requested logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') -class RISLookupManager(): +class RISLookupManager(AbstractManager): def __init__(self, loglevel: int=logging.INFO): + super().__init__(loglevel) self.ris_fetcher = RISPrefixLookup(loglevel=loglevel) - def run_fetcher(self): + def _to_run_forever(self): self.ris_fetcher.run() if __name__ == '__main__': - modules_manager = RISLookupManager() - while True: - if shutdown_requested(): - break - modules_manager.run_fetcher() - if not long_sleep(120): - break + rislookup = RISLookupManager() + rislookup.run(120) diff --git a/bin/run_backend.py b/bin/run_backend.py index 68475fc..0a0e516 100755 --- a/bin/run_backend.py +++ b/bin/run_backend.py @@ -6,6 +6,7 @@ from subprocess import Popen import time from pathlib import Path from redis import Redis +from redis.exceptions import ConnectionError import argparse @@ -13,7 +14,8 @@ import argparse def launch_cache(storage_directory: Path=None): if not storage_directory: storage_directory = get_homedir() - Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache')) + if not check_running('127.0.0.1', 6581) and not check_running('127.0.0.1', 6582): + Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache')) def shutdown_cache(storage_directory: Path=None): @@ -25,7 +27,8 @@ def shutdown_cache(storage_directory: Path=None): def launch_temp(storage_directory: Path=None): if not storage_directory: storage_directory = get_homedir() - Popen(["./run_redis.sh"], cwd=(storage_directory / 'temp')) + if not check_running('127.0.0.1', 6579) and not check_running('127.0.0.1', 6580): + Popen(["./run_redis.sh"], cwd=(storage_directory / 'temp')) def shutdown_temp(storage_directory: Path=None): @@ -37,7 +40,8 @@ def shutdown_temp(storage_directory: Path=None): def launch_storage(storage_directory: Path=None): if not storage_directory: storage_directory = get_homedir() - Popen(["./run_ardb.sh"], cwd=(storage_directory / 'storage')) + if not check_running('127.0.0.1', 16579): + Popen(["./run_ardb.sh"], cwd=(storage_directory / 'storage')) def shutdown_storage(storage_directory: Path=None): @@ -47,8 +51,11 @@ def shutdown_storage(storage_directory: Path=None): def check_running(host, port): - r = Redis(host=host, port=port) - return r.ping() + try: + r = Redis(host=host, port=port) + return r.ping() + except ConnectionError: + return False def launch_all(): diff --git a/bin/sanitizer.py b/bin/sanitizer.py index e1a8036..21a6891 100755 --- a/bin/sanitizer.py +++ b/bin/sanitizer.py @@ -2,28 +2,24 @@ # -*- coding: utf-8 -*- import logging + +from bgpranking.abstractmanager import AbstractManager from bgpranking.sanitizer import Sanitizer -from bgpranking.libs.helpers import long_sleep, shutdown_requested logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.WARNING, datefmt='%I:%M:%S') -class SanitizerManager(): +class SanitizerManager(AbstractManager): def __init__(self, loglevel: int=logging.WARNING): - self.loglevel = loglevel + super().__init__(loglevel) self.sanitizer = Sanitizer(loglevel) - def run_sanitizer(self): + def _to_run_forever(self): self.sanitizer.sanitize() if __name__ == '__main__': - modules_manager = SanitizerManager() - while True: - if shutdown_requested(): - break - modules_manager.run_sanitizer() - if not long_sleep(120): - break + sanitizer = SanitizerManager() + sanitizer.run(sleep_in_sec=120) diff --git a/bin/start.py b/bin/start.py index ab4fc40..9705a3d 100755 --- a/bin/start.py +++ b/bin/start.py @@ -2,9 +2,12 @@ # -*- coding: utf-8 -*- from subprocess import Popen +from bgpranking.libs.helpers import get_homedir if __name__ == '__main__': + # Just fail if the env isn't set. + get_homedir() p = Popen(['run_backend.py', '--start']) p.wait() Popen(['loadprefixes.py']) diff --git a/bin/stop.py b/bin/stop.py index 07babce..c0c5a9c 100755 --- a/bin/stop.py +++ b/bin/stop.py @@ -2,9 +2,10 @@ # -*- coding: utf-8 -*- from subprocess import Popen - +from bgpranking.libs.helpers import get_homedir if __name__ == '__main__': + get_homedir() p = Popen(['shutdown.py']) p.wait() Popen(['run_backend.py', '--stop']) diff --git a/storage/ardb.conf b/storage/ardb.conf index 5cbd01d..000d93e 100644 --- a/storage/ardb.conf +++ b/storage/ardb.conf @@ -58,7 +58,7 @@ rocksdb.scan-total-order false rocksdb.disableWAL false #rocksdb's options -rocksdb.options write_buffer_size=512M;max_write_buffer_number=5;min_write_buffer_number_to_merge=3;compression=kSnappyCompression;\ +rocksdb.options write_buffer_size=1024M;max_write_buffer_number=5;min_write_buffer_number_to_merge=3;compression=kSnappyCompression;\ bloom_locality=1;memtable_prefix_bloom_size_ratio=0.1;\ block_based_table_factory={block_cache=512M;filter_policy=bloomfilter:10:true};\ create_if_missing=true;max_open_files=10000;rate_limiter_bytes_per_sec=50M;\