chg: cleanup and improvements

pull/12/head
Raphaël Vinot 2018-03-30 14:33:33 +02:00
parent b4b012a430
commit 28ef7b2ecc
12 changed files with 143 additions and 116 deletions

View File

@ -0,0 +1,29 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
import logging
from .libs.helpers import long_sleep, shutdown_requested
class AbstractManager(ABC):
def __init__(self, loglevel: int=logging.DEBUG):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)
self.logger.info('Initializing {}'.format(self.__class__.__name__))
@abstractmethod
def _to_run_forever(self):
pass
def run(self, sleep_in_sec: int):
self.logger.info('Launching {}'.format(self.__class__.__name__))
while True:
if shutdown_requested():
break
self._to_run_forever()
if not long_sleep(sleep_in_sec):
break
self.logger.info('Shutting down {}'.format(self.__class__.__name__))

View File

@ -25,10 +25,22 @@ class DatabaseInsert():
while True:
if shutdown_requested():
break
uuid = self.redis_sanitized.spop('to_insert')
if not uuid:
uuids = self.redis_sanitized.spop('to_insert', 1000)
if not uuids:
break
data = self.redis_sanitized.hgetall(uuid)
p = self.redis_sanitized.pipeline(transaction=False)
[p.hgetall(uuid) for uuid in uuids]
sanitized_data = p.execute()
retry = []
done = []
prefix_missing = []
ardb_pipeline = self.ardb_storage.pipeline(transaction=False)
for i, uuid in enumerate(uuids):
data = sanitized_data[i]
if not data:
self.logger.warning('No data for UUID {}. This should not happen, but lets move on.'.format(uuid))
continue
# Data gathered from the RIS queries:
# * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo
# * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo
@ -36,24 +48,30 @@ class DatabaseInsert():
ris_entry = self.ris_cache.hgetall(data['ip'])
if not ris_entry:
# RIS data not available yet, retry later
self.redis_sanitized.sadd('to_insert', uuid)
retry.append(uuid)
# In case this IP is missing in the set to process
self.ris_cache.sadd('for_ris_lookup', data['ip'])
prefix_missing.append(data['ip'])
continue
# Format: <YYYY-MM-DD>|sources -> set([<source>, ...])
self.ardb_storage.sadd('{}|sources'.format(data['date']), data['source'])
ardb_pipeline.sadd('{}|sources'.format(data['date']), data['source'])
# Format: <YYYY-MM-DD>|<source> -> set([<asn>, ...])
self.ardb_storage.sadd('{}|{}'.format(data['date'], data['source']),
ris_entry['asn'])
ardb_pipeline.sadd('{}|{}'.format(data['date'], data['source']), ris_entry['asn'])
# Format: <YYYY-MM-DD>|<source>|<asn> -> set([<prefix>, ...])
self.ardb_storage.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']),
ardb_pipeline.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']),
ris_entry['prefix'])
# Format: <YYYY-MM-DD>|<source>|<asn>|<prefix> -> set([<ip>|<datetime>, ...])
self.ardb_storage.sadd('{}|{}|{}|{}'.format(data['date'], data['source'],
ris_entry['asn'],
ris_entry['prefix']),
ardb_pipeline.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn'], ris_entry['prefix']),
'{}|{}'.format(data['ip'], data['datetime']))
self.redis_sanitized.delete(uuid)
done.append(uuid)
ardb_pipeline.execute()
if prefix_missing:
self.ris_cache.sadd('for_ris_lookup', *prefix_missing)
p = self.redis_sanitized.pipeline(transaction=False)
if done:
p.delete(*done)
if retry:
p.sadd('to_insert', *retry)
p.execute()
unset_running(self.__class__.__name__)

View File

@ -2,28 +2,23 @@
# -*- coding: utf-8 -*-
import logging
from bgpranking.abstractmanager import AbstractManager
from bgpranking.dbinsert import DatabaseInsert
from bgpranking.libs.helpers import long_sleep, shutdown_requested
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
class DBInsertManager():
class DBInsertManager(AbstractManager):
def __init__(self, loglevel: int=logging.DEBUG):
self.loglevel = loglevel
super().__init__(loglevel)
self.dbinsert = DatabaseInsert(loglevel)
def run_insert(self):
def _to_run_forever(self):
self.dbinsert.insert()
if __name__ == '__main__':
modules_manager = DBInsertManager()
while True:
if shutdown_requested():
break
modules_manager.run_insert()
if not long_sleep(120):
break
dbinsert = DBInsertManager()
dbinsert.run(sleep_in_sec=120)

View File

@ -4,21 +4,20 @@
import logging
import asyncio
from pathlib import Path
from bgpranking.libs.helpers import long_sleep, shutdown_requested
import aiohttp
from bgpranking.abstractmanager import AbstractManager
from bgpranking.modulesfetcher import Fetcher
from bgpranking.libs.helpers import get_config_path, get_list_storage_path
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
logger = logging.getLogger('Fetcher')
class ModulesManager():
class ModulesManager(AbstractManager):
def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.DEBUG):
super().__init__(loglevel)
if not config_dir:
config_dir = get_config_path()
if not storage_directory:
@ -27,23 +26,18 @@ class ModulesManager():
modules_paths = [modulepath for modulepath in modules_config.glob('*.json')]
self.modules = [Fetcher(path, storage_directory, loglevel) for path in modules_paths]
async def run_fetchers(self):
await asyncio.gather(
*[module.fetch_list() for module in self.modules if module.fetcher]
def _to_run_forever(self):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(
*[module.fetch_list() for module in self.modules if module.fetcher])
)
except aiohttp.client_exceptions.ClientConnectorError as e:
self.logger.critical('Exception while fetching lists: {}'.format(e))
finally:
loop.close()
if __name__ == '__main__':
modules_manager = ModulesManager()
loop = asyncio.get_event_loop()
while True:
if shutdown_requested():
break
try:
loop.run_until_complete(modules_manager.run_fetchers())
except aiohttp.client_exceptions.ClientConnectorError:
logger.critical('Exception while fetching lists.')
long_sleep(60)
continue
if not long_sleep(3600):
break
modules_manager.run(sleep_in_sec=3600)

View File

@ -2,39 +2,29 @@
# -*- coding: utf-8 -*-
import logging
from bgpranking.prefixdb import PrefixDatabase
from bgpranking.libs.helpers import long_sleep, shutdown_requested
import requests
from bgpranking.abstractmanager import AbstractManager
from bgpranking.prefixdb import PrefixDatabase
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
logger = logging.getLogger('PrefixDB Fetcher')
class PrefixDBManager():
class PrefixDBManager(AbstractManager):
def __init__(self, loglevel: int=logging.DEBUG):
super().__init__(loglevel)
self.prefix_db = PrefixDatabase(loglevel=loglevel)
def load_prefixes(self):
def _to_run_forever(self):
try:
if self.prefix_db.update_required():
self.prefix_db.load_prefixes()
def needs_update(self):
return self.prefix_db.update_required()
except requests.exceptions.ConnectionError as e:
self.logger.critical('Unable to download the prefix database: {}'.format(e))
if __name__ == '__main__':
p = PrefixDBManager()
while True:
if shutdown_requested():
break
try:
if p.needs_update():
p.load_prefixes()
except requests.exceptions.ConnectionError:
logger.critical('Unable to download the prefix database.')
long_sleep(60)
continue
if not long_sleep(3600):
break
p.run(sleep_in_sec=3600)

View File

@ -3,17 +3,19 @@
import logging
from pathlib import Path
from bgpranking.abstractmanager import AbstractManager
from bgpranking.parser import RawFilesParser
from bgpranking.libs.helpers import get_config_path, get_list_storage_path
from bgpranking.libs.helpers import long_sleep, shutdown_requested
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
class ParserManager():
class ParserManager(AbstractManager):
def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.DEBUG):
super().__init__(loglevel)
if not config_dir:
config_dir = get_config_path()
if not storage_directory:
@ -22,15 +24,10 @@ class ParserManager():
modules_paths = [modulepath for modulepath in modules_config.glob('*.json')]
self.modules = [RawFilesParser(path, storage_directory, loglevel) for path in modules_paths]
def run_intake(self):
def _to_run_forever(self):
[module.parse_raw_files() for module in self.modules]
if __name__ == '__main__':
parser_manager = ParserManager()
while True:
if shutdown_requested():
break
parser_manager.run_intake()
if not long_sleep(120):
break
parser_manager.run(sleep_in_sec=120)

View File

@ -2,27 +2,24 @@
# -*- coding: utf-8 -*-
import logging
from bgpranking.abstractmanager import AbstractManager
from bgpranking.risfetcher import RISPrefixLookup
from bgpranking.libs.helpers import long_sleep, shutdown_requested
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
class RISLookupManager():
class RISLookupManager(AbstractManager):
def __init__(self, loglevel: int=logging.INFO):
super().__init__(loglevel)
self.ris_fetcher = RISPrefixLookup(loglevel=loglevel)
def run_fetcher(self):
def _to_run_forever(self):
self.ris_fetcher.run()
if __name__ == '__main__':
modules_manager = RISLookupManager()
while True:
if shutdown_requested():
break
modules_manager.run_fetcher()
if not long_sleep(120):
break
rislookup = RISLookupManager()
rislookup.run(120)

View File

@ -6,6 +6,7 @@ from subprocess import Popen
import time
from pathlib import Path
from redis import Redis
from redis.exceptions import ConnectionError
import argparse
@ -13,6 +14,7 @@ import argparse
def launch_cache(storage_directory: Path=None):
if not storage_directory:
storage_directory = get_homedir()
if not check_running('127.0.0.1', 6581) and not check_running('127.0.0.1', 6582):
Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache'))
@ -25,6 +27,7 @@ def shutdown_cache(storage_directory: Path=None):
def launch_temp(storage_directory: Path=None):
if not storage_directory:
storage_directory = get_homedir()
if not check_running('127.0.0.1', 6579) and not check_running('127.0.0.1', 6580):
Popen(["./run_redis.sh"], cwd=(storage_directory / 'temp'))
@ -37,6 +40,7 @@ def shutdown_temp(storage_directory: Path=None):
def launch_storage(storage_directory: Path=None):
if not storage_directory:
storage_directory = get_homedir()
if not check_running('127.0.0.1', 16579):
Popen(["./run_ardb.sh"], cwd=(storage_directory / 'storage'))
@ -47,8 +51,11 @@ def shutdown_storage(storage_directory: Path=None):
def check_running(host, port):
try:
r = Redis(host=host, port=port)
return r.ping()
except ConnectionError:
return False
def launch_all():

View File

@ -2,28 +2,24 @@
# -*- coding: utf-8 -*-
import logging
from bgpranking.abstractmanager import AbstractManager
from bgpranking.sanitizer import Sanitizer
from bgpranking.libs.helpers import long_sleep, shutdown_requested
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.WARNING, datefmt='%I:%M:%S')
class SanitizerManager():
class SanitizerManager(AbstractManager):
def __init__(self, loglevel: int=logging.WARNING):
self.loglevel = loglevel
super().__init__(loglevel)
self.sanitizer = Sanitizer(loglevel)
def run_sanitizer(self):
def _to_run_forever(self):
self.sanitizer.sanitize()
if __name__ == '__main__':
modules_manager = SanitizerManager()
while True:
if shutdown_requested():
break
modules_manager.run_sanitizer()
if not long_sleep(120):
break
sanitizer = SanitizerManager()
sanitizer.run(sleep_in_sec=120)

View File

@ -2,9 +2,12 @@
# -*- coding: utf-8 -*-
from subprocess import Popen
from bgpranking.libs.helpers import get_homedir
if __name__ == '__main__':
# Just fail if the env isn't set.
get_homedir()
p = Popen(['run_backend.py', '--start'])
p.wait()
Popen(['loadprefixes.py'])

View File

@ -2,9 +2,10 @@
# -*- coding: utf-8 -*-
from subprocess import Popen
from bgpranking.libs.helpers import get_homedir
if __name__ == '__main__':
get_homedir()
p = Popen(['shutdown.py'])
p.wait()
Popen(['run_backend.py', '--stop'])

View File

@ -58,7 +58,7 @@ rocksdb.scan-total-order false
rocksdb.disableWAL false
#rocksdb's options
rocksdb.options write_buffer_size=512M;max_write_buffer_number=5;min_write_buffer_number_to_merge=3;compression=kSnappyCompression;\
rocksdb.options write_buffer_size=1024M;max_write_buffer_number=5;min_write_buffer_number_to_merge=3;compression=kSnappyCompression;\
bloom_locality=1;memtable_prefix_bloom_size_ratio=0.1;\
block_based_table_factory={block_cache=512M;filter_policy=bloomfilter:10:true};\
create_if_missing=true;max_open_files=10000;rate_limiter_bytes_per_sec=50M;\