From adf2f1e157947fdd49b3d266031dda8ecb1184c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 29 Mar 2018 22:37:28 +0200 Subject: [PATCH] new: major refactoring --- archive.py | 30 -- __init__.py => bgpranking/__init__.py | 0 {listimport => bgpranking}/archive.py | 5 +- .../config/modules}/Alienvault.json | 0 .../config/modules}/BlocklistDeApache.json | 0 .../config/modules}/BlocklistDeBots.json | 0 .../config/modules}/BlocklistDeFTP.json | 0 .../config/modules}/BlocklistDeIMAP.json | 0 .../config/modules}/BlocklistDeMail.json | 0 .../config/modules}/BlocklistDeSIP.json | 0 .../config/modules}/BlocklistDeSSH.json | 0 .../config/modules}/BlocklistDeStrong.json | 0 .../config/modules}/CIArmy.json | 0 .../config/modules}/CleanMXMalwares.json | 0 .../config/modules}/CleanMXPhishing.json | 0 .../config/modules}/CleanMXPortals.json | 0 .../config/modules}/DshieldDaily.json | 0 .../config/modules}/DshieldTopIPs.json | 0 .../modules}/EmergingThreatsCompromized.json | 0 .../modules}/FeodotrackerIPBlockList.json | 0 .../config/modules}/Malc0de.json | 0 .../config/modules}/MalwareDomainListIP.json | 0 .../config/modules}/NothinkSNMP.json | 0 .../config/modules}/NothinkSSH.json | 0 .../config/modules}/NothinkTelnet.json | 0 .../modules}/PalevotrackerIPBlockList.json | 0 .../modules}/RansomwareIPBlockList.json | 0 .../config/modules}/SSHBlackListBase.json | 0 .../modules}/ZeustrackerIPBlockList.json | 0 .../config/modules}/jq_all_the_things.sh | 0 .../config/modules}/module.schema | 0 .../config/modules}/validate_all.sh | 0 {listimport => bgpranking}/dbinsert.py | 8 +- .../fetcher}/__init__.py | 0 bgpranking/fetcher/simple_feed_fetcher.py | 427 ++++++++++++++++++ {listimport => bgpranking}/libs/StatsRipe.py | 0 .../libs/StatsRipeText.py | 0 {listimport => bgpranking}/libs/__init__.py | 0 {listimport => bgpranking}/libs/exceptions.py | 0 bgpranking/libs/helpers.py | 63 +++ {listimport => bgpranking}/modulesfetcher.py | 8 +- bgpranking/monitor.py | 38 ++ {listimport => bgpranking}/parser.py | 8 +- .../parsers/__init__.py | 0 {listimport => bgpranking}/parsers/abusech.py | 0 {listimport => bgpranking}/parsers/default.py | 0 {listimport => bgpranking}/parsers/dshield.py | 0 {listimport => bgpranking}/parsers/malc0de.py | 0 {listimport => bgpranking}/parsers/nothink.py | 0 .../initranking.py => bgpranking/prefixdb.py | 35 +- bgpranking/risfetcher.py | 71 +++ bgpranking/sanitizer.py | 57 +++ bin/.rislookup.py.swp | Bin 0 -> 12288 bytes bin/archiver.py | 42 ++ dbinsert.py => bin/dbinsert.py | 16 +- bin/fetcher.py | 49 ++ bin/loadprefixes.py | 40 ++ bin/monitor.py | 25 + bin/parser.py | 36 ++ bin/rislookup.py | 28 ++ bin/run_backend.py | 102 +++++ sanitize.py => bin/sanitizer.py | 16 +- bin/shutdown.py | 16 + bin/start.py | 15 + bin/stop.py | 10 + fetcher.py | 36 -- intake.py | 34 -- listimport/libs/helpers.py | 12 - listimport/risfetcher.py | 60 --- listimport/sanitizer.py | 49 -- ranking.py | 23 - requirements.txt | 4 + ris.py | 22 - setup.py | 33 ++ storage/ardb.conf | 0 75 files changed, 1124 insertions(+), 294 deletions(-) delete mode 100755 archive.py rename __init__.py => bgpranking/__init__.py (100%) rename {listimport => bgpranking}/archive.py (92%) rename {listimport/modules_config => bgpranking/config/modules}/Alienvault.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeApache.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeBots.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeFTP.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeIMAP.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeMail.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeSIP.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeSSH.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/BlocklistDeStrong.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/CIArmy.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/CleanMXMalwares.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/CleanMXPhishing.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/CleanMXPortals.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/DshieldDaily.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/DshieldTopIPs.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/EmergingThreatsCompromized.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/FeodotrackerIPBlockList.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/Malc0de.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/MalwareDomainListIP.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/NothinkSNMP.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/NothinkSSH.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/NothinkTelnet.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/PalevotrackerIPBlockList.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/RansomwareIPBlockList.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/SSHBlackListBase.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/ZeustrackerIPBlockList.json (100%) rename {listimport/modules_config => bgpranking/config/modules}/jq_all_the_things.sh (100%) rename {listimport/modules_config => bgpranking/config/modules}/module.schema (100%) rename {listimport/modules_config => bgpranking/config/modules}/validate_all.sh (100%) rename {listimport => bgpranking}/dbinsert.py (91%) rename {listimport => bgpranking/fetcher}/__init__.py (100%) create mode 100644 bgpranking/fetcher/simple_feed_fetcher.py rename {listimport => bgpranking}/libs/StatsRipe.py (100%) rename {listimport => bgpranking}/libs/StatsRipeText.py (100%) rename {listimport => bgpranking}/libs/__init__.py (100%) rename {listimport => bgpranking}/libs/exceptions.py (100%) create mode 100644 bgpranking/libs/helpers.py rename {listimport => bgpranking}/modulesfetcher.py (92%) create mode 100644 bgpranking/monitor.py rename {listimport => bgpranking}/parser.py (92%) rename {listimport => bgpranking}/parsers/__init__.py (100%) rename {listimport => bgpranking}/parsers/abusech.py (100%) rename {listimport => bgpranking}/parsers/default.py (100%) rename {listimport => bgpranking}/parsers/dshield.py (100%) rename {listimport => bgpranking}/parsers/malc0de.py (100%) rename {listimport => bgpranking}/parsers/nothink.py (100%) rename listimport/initranking.py => bgpranking/prefixdb.py (64%) create mode 100644 bgpranking/risfetcher.py create mode 100644 bgpranking/sanitizer.py create mode 100644 bin/.rislookup.py.swp create mode 100755 bin/archiver.py rename dbinsert.py => bin/dbinsert.py (57%) create mode 100755 bin/fetcher.py create mode 100755 bin/loadprefixes.py create mode 100755 bin/monitor.py create mode 100755 bin/parser.py create mode 100755 bin/rislookup.py create mode 100755 bin/run_backend.py rename sanitize.py => bin/sanitizer.py (56%) create mode 100644 bin/shutdown.py create mode 100644 bin/start.py create mode 100644 bin/stop.py delete mode 100755 fetcher.py delete mode 100755 intake.py delete mode 100644 listimport/libs/helpers.py delete mode 100644 listimport/risfetcher.py delete mode 100644 listimport/sanitizer.py delete mode 100755 ranking.py create mode 100644 requirements.txt delete mode 100755 ris.py create mode 100644 setup.py mode change 100755 => 100644 storage/ardb.conf diff --git a/archive.py b/archive.py deleted file mode 100755 index 8fa76c3..0000000 --- a/archive.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from listimport.archive import DeepArchive -import logging -from pathlib import Path - -logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', - level=logging.INFO, datefmt='%I:%M:%S') - - -class ModulesArchiver(): - - def __init__(self, config_dir: Path=Path('listimport', 'modules_config'), - storage_directory: Path=Path('rawdata'), - loglevel: int=logging.INFO): - self.config_dir = config_dir - self.storage_directory = storage_directory - self.loglevel = loglevel - self.modules_paths = [modulepath for modulepath in self.config_dir.glob('*.json')] - self.modules = [DeepArchive(path, self.storage_directory, self.loglevel) - for path in self.modules_paths] - - def archive(self): - [module.archive() for module in self.modules] - - -if __name__ == '__main__': - archiver = ModulesArchiver() - archiver.archive() diff --git a/__init__.py b/bgpranking/__init__.py similarity index 100% rename from __init__.py rename to bgpranking/__init__.py diff --git a/listimport/archive.py b/bgpranking/archive.py similarity index 92% rename from listimport/archive.py rename to bgpranking/archive.py index 6a0f49b..c9bd955 100644 --- a/listimport/archive.py +++ b/bgpranking/archive.py @@ -10,7 +10,7 @@ import zipfile import logging import json -from .libs.helpers import safe_create_dir +from .libs.helpers import safe_create_dir, set_running, unset_running class DeepArchive(): @@ -34,6 +34,8 @@ class DeepArchive(): self.logger.setLevel(loglevel) def archive(self): + set_running(self.__class__.__name__) + to_archive = defaultdict(list) today = date.today() last_day_to_keep = date(today.year, today.month, 1) - relativedelta(months=2) @@ -54,3 +56,4 @@ class DeepArchive(): z.write(f, f.name) # Delete all the files if the archiving worked out properly [f.unlink() for f in path_list] + unset_running(self.__class__.__name__) diff --git a/listimport/modules_config/Alienvault.json b/bgpranking/config/modules/Alienvault.json similarity index 100% rename from listimport/modules_config/Alienvault.json rename to bgpranking/config/modules/Alienvault.json diff --git a/listimport/modules_config/BlocklistDeApache.json b/bgpranking/config/modules/BlocklistDeApache.json similarity index 100% rename from listimport/modules_config/BlocklistDeApache.json rename to bgpranking/config/modules/BlocklistDeApache.json diff --git a/listimport/modules_config/BlocklistDeBots.json b/bgpranking/config/modules/BlocklistDeBots.json similarity index 100% rename from listimport/modules_config/BlocklistDeBots.json rename to bgpranking/config/modules/BlocklistDeBots.json diff --git a/listimport/modules_config/BlocklistDeFTP.json b/bgpranking/config/modules/BlocklistDeFTP.json similarity index 100% rename from listimport/modules_config/BlocklistDeFTP.json rename to bgpranking/config/modules/BlocklistDeFTP.json diff --git a/listimport/modules_config/BlocklistDeIMAP.json b/bgpranking/config/modules/BlocklistDeIMAP.json similarity index 100% rename from listimport/modules_config/BlocklistDeIMAP.json rename to bgpranking/config/modules/BlocklistDeIMAP.json diff --git a/listimport/modules_config/BlocklistDeMail.json b/bgpranking/config/modules/BlocklistDeMail.json similarity index 100% rename from listimport/modules_config/BlocklistDeMail.json rename to bgpranking/config/modules/BlocklistDeMail.json diff --git a/listimport/modules_config/BlocklistDeSIP.json b/bgpranking/config/modules/BlocklistDeSIP.json similarity index 100% rename from listimport/modules_config/BlocklistDeSIP.json rename to bgpranking/config/modules/BlocklistDeSIP.json diff --git a/listimport/modules_config/BlocklistDeSSH.json b/bgpranking/config/modules/BlocklistDeSSH.json similarity index 100% rename from listimport/modules_config/BlocklistDeSSH.json rename to bgpranking/config/modules/BlocklistDeSSH.json diff --git a/listimport/modules_config/BlocklistDeStrong.json b/bgpranking/config/modules/BlocklistDeStrong.json similarity index 100% rename from listimport/modules_config/BlocklistDeStrong.json rename to bgpranking/config/modules/BlocklistDeStrong.json diff --git a/listimport/modules_config/CIArmy.json b/bgpranking/config/modules/CIArmy.json similarity index 100% rename from listimport/modules_config/CIArmy.json rename to bgpranking/config/modules/CIArmy.json diff --git a/listimport/modules_config/CleanMXMalwares.json b/bgpranking/config/modules/CleanMXMalwares.json similarity index 100% rename from listimport/modules_config/CleanMXMalwares.json rename to bgpranking/config/modules/CleanMXMalwares.json diff --git a/listimport/modules_config/CleanMXPhishing.json b/bgpranking/config/modules/CleanMXPhishing.json similarity index 100% rename from listimport/modules_config/CleanMXPhishing.json rename to bgpranking/config/modules/CleanMXPhishing.json diff --git a/listimport/modules_config/CleanMXPortals.json b/bgpranking/config/modules/CleanMXPortals.json similarity index 100% rename from listimport/modules_config/CleanMXPortals.json rename to bgpranking/config/modules/CleanMXPortals.json diff --git a/listimport/modules_config/DshieldDaily.json b/bgpranking/config/modules/DshieldDaily.json similarity index 100% rename from listimport/modules_config/DshieldDaily.json rename to bgpranking/config/modules/DshieldDaily.json diff --git a/listimport/modules_config/DshieldTopIPs.json b/bgpranking/config/modules/DshieldTopIPs.json similarity index 100% rename from listimport/modules_config/DshieldTopIPs.json rename to bgpranking/config/modules/DshieldTopIPs.json diff --git a/listimport/modules_config/EmergingThreatsCompromized.json b/bgpranking/config/modules/EmergingThreatsCompromized.json similarity index 100% rename from listimport/modules_config/EmergingThreatsCompromized.json rename to bgpranking/config/modules/EmergingThreatsCompromized.json diff --git a/listimport/modules_config/FeodotrackerIPBlockList.json b/bgpranking/config/modules/FeodotrackerIPBlockList.json similarity index 100% rename from listimport/modules_config/FeodotrackerIPBlockList.json rename to bgpranking/config/modules/FeodotrackerIPBlockList.json diff --git a/listimport/modules_config/Malc0de.json b/bgpranking/config/modules/Malc0de.json similarity index 100% rename from listimport/modules_config/Malc0de.json rename to bgpranking/config/modules/Malc0de.json diff --git a/listimport/modules_config/MalwareDomainListIP.json b/bgpranking/config/modules/MalwareDomainListIP.json similarity index 100% rename from listimport/modules_config/MalwareDomainListIP.json rename to bgpranking/config/modules/MalwareDomainListIP.json diff --git a/listimport/modules_config/NothinkSNMP.json b/bgpranking/config/modules/NothinkSNMP.json similarity index 100% rename from listimport/modules_config/NothinkSNMP.json rename to bgpranking/config/modules/NothinkSNMP.json diff --git a/listimport/modules_config/NothinkSSH.json b/bgpranking/config/modules/NothinkSSH.json similarity index 100% rename from listimport/modules_config/NothinkSSH.json rename to bgpranking/config/modules/NothinkSSH.json diff --git a/listimport/modules_config/NothinkTelnet.json b/bgpranking/config/modules/NothinkTelnet.json similarity index 100% rename from listimport/modules_config/NothinkTelnet.json rename to bgpranking/config/modules/NothinkTelnet.json diff --git a/listimport/modules_config/PalevotrackerIPBlockList.json b/bgpranking/config/modules/PalevotrackerIPBlockList.json similarity index 100% rename from listimport/modules_config/PalevotrackerIPBlockList.json rename to bgpranking/config/modules/PalevotrackerIPBlockList.json diff --git a/listimport/modules_config/RansomwareIPBlockList.json b/bgpranking/config/modules/RansomwareIPBlockList.json similarity index 100% rename from listimport/modules_config/RansomwareIPBlockList.json rename to bgpranking/config/modules/RansomwareIPBlockList.json diff --git a/listimport/modules_config/SSHBlackListBase.json b/bgpranking/config/modules/SSHBlackListBase.json similarity index 100% rename from listimport/modules_config/SSHBlackListBase.json rename to bgpranking/config/modules/SSHBlackListBase.json diff --git a/listimport/modules_config/ZeustrackerIPBlockList.json b/bgpranking/config/modules/ZeustrackerIPBlockList.json similarity index 100% rename from listimport/modules_config/ZeustrackerIPBlockList.json rename to bgpranking/config/modules/ZeustrackerIPBlockList.json diff --git a/listimport/modules_config/jq_all_the_things.sh b/bgpranking/config/modules/jq_all_the_things.sh similarity index 100% rename from listimport/modules_config/jq_all_the_things.sh rename to bgpranking/config/modules/jq_all_the_things.sh diff --git a/listimport/modules_config/module.schema b/bgpranking/config/modules/module.schema similarity index 100% rename from listimport/modules_config/module.schema rename to bgpranking/config/modules/module.schema diff --git a/listimport/modules_config/validate_all.sh b/bgpranking/config/modules/validate_all.sh similarity index 100% rename from listimport/modules_config/validate_all.sh rename to bgpranking/config/modules/validate_all.sh diff --git a/listimport/dbinsert.py b/bgpranking/dbinsert.py similarity index 91% rename from listimport/dbinsert.py rename to bgpranking/dbinsert.py index b845b49..ada2fde 100644 --- a/listimport/dbinsert.py +++ b/bgpranking/dbinsert.py @@ -4,6 +4,7 @@ import logging from redis import Redis from redis import StrictRedis +from .libs.helpers import shutdown_requested, set_running, unset_running class DatabaseInsert(): @@ -19,8 +20,11 @@ class DatabaseInsert(): self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) self.logger.setLevel(loglevel) - async def insert(self): + def insert(self): + set_running(self.__class__.__name__) while True: + if shutdown_requested(): + break uuid = self.redis_sanitized.spop('to_insert') if not uuid: break @@ -32,7 +36,6 @@ class DatabaseInsert(): ris_entry = self.ris_cache.hgetall(data['ip']) if not ris_entry: # RIS data not available yet, retry later - # FIXME: an IP can sometimes not be announced, we need to discard it self.redis_sanitized.sadd('to_insert', uuid) # In case this IP is missing in the set to process self.ris_cache.sadd('for_ris_lookup', data['ip']) @@ -53,3 +56,4 @@ class DatabaseInsert(): ris_entry['prefix']), '{}|{}'.format(data['ip'], data['datetime'])) self.redis_sanitized.delete(uuid) + unset_running(self.__class__.__name__) diff --git a/listimport/__init__.py b/bgpranking/fetcher/__init__.py similarity index 100% rename from listimport/__init__.py rename to bgpranking/fetcher/__init__.py diff --git a/bgpranking/fetcher/simple_feed_fetcher.py b/bgpranking/fetcher/simple_feed_fetcher.py new file mode 100644 index 0000000..9e6d56e --- /dev/null +++ b/bgpranking/fetcher/simple_feed_fetcher.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import requests +import os +from dateutil import parser +from datetime import datetime, date +from hashlib import sha512 # Faster than sha256 on 64b machines. +from pathlib import Path +from dateutil.relativedelta import relativedelta +from collections import defaultdict +import zipfile +import logging +import asyncio +from pid import PidFile, PidFileError +import json +import re +from redis import Redis +from redis import StrictRedis +from uuid import uuid4 +from io import BytesIO +import importlib + +from typing import List +import types +import ipaddress + + +class BGPRankingException(Exception): + pass + + +class FetcherException(BGPRankingException): + pass + + +class ArchiveException(BGPRankingException): + pass + + +class CreateDirectoryException(BGPRankingException): + pass + + +""" +Directory structure: +storage_directory / vendor / listname -> files to import +storage_directory / vendor / listname / meta -> last modified & pid +storage_directory / vendor / listname / archive -> imported files <= 2 month old +storage_directory / vendor / listname / archive / deep -> imported files > 2 month old (zipped) +""" + + +def safe_create_dir(to_create: Path): + if to_create.exists() and not to_create.is_dir(): + raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create)) + os.makedirs(to_create, exist_ok=True) + + +class Fetcher(): + + def __init__(self, config_file: Path, storage_directory: Path, + loglevel: int=logging.DEBUG): + '''Load `config_file`, and store the fetched data into `storage_directory` + Note: if the `config_file` does not provide a URL (the file is + gathered by some oter mean), the fetcher is automatically stoped.''' + with open(config_file, 'r') as f: + module_parameters = json.load(f) + self.vendor = module_parameters['vendor'] + self.listname = module_parameters['name'] + self.__init_logger(loglevel) + self.fetcher = True + if 'url' not in module_parameters: + self.logger.info('No URL to fetch, breaking.') + self.fetcher = False + return + self.url = module_parameters['url'] + self.logger.debug('Starting fetcher on {}'.format(self.url)) + self.directory = storage_directory / self.vendor / self.listname + safe_create_dir(self.directory) + self.meta = self.directory / 'meta' + safe_create_dir(self.meta) + self.archive_dir = self.directory / 'archive' + safe_create_dir(self.archive_dir) + self.first_fetch = True + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, + self.vendor, self.listname)) + self.logger.setLevel(loglevel) + + def __get_last_modified(self): + r = requests.head(self.url) + if 'Last-Modified' in r.headers: + return parser.parse(r.headers['Last-Modified']) + return None + + def __newer(self): + '''Check if the file available for download is newed than the one + already downloaded by checking the `Last-Modified` header. + Note: return False if the file containing the last header content + is not existing, or the header doesn't have this key. + ''' + last_modified_path = self.meta / 'lastmodified' + if not last_modified_path.exists(): + # The file doesn't exists + if not self.first_fetch: + # The URL has no Last-Modified header, we cannot use it. + self.logger.debug('No Last-Modified header available') + return True + self.first_fetch = False + last_modified = self.__get_last_modified() + if last_modified: + self.logger.debug('Last-Modified header available') + with last_modified_path.open('w') as f: + f.write(last_modified.isoformat()) + else: + self.logger.debug('No Last-Modified header available') + return True + with last_modified_path.open() as f: + last_modified_file = parser.parse(f.read()) + last_modified = self.__get_last_modified() + if not last_modified: + # No more Last-Modified header Oo + self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname)) + last_modified_path.unlink() + return True + if last_modified > last_modified_file: + self.logger.info('Got a new file.') + with last_modified_path.open('w') as f: + f.write(last_modified.isoformat()) + return True + return False + + def __same_as_last(self, downloaded): + '''Figure out the last downloaded file, check if it is the same as the + newly downloaded one. Returns true if both files have been downloaded the + same day. + Note: we check the new and the archive directory because we may have backlog + and the newest file is always the first one we process + ''' + to_check = [] + to_check_new = sorted([f for f in self.directory.iterdir() if f.is_file()]) + if to_check_new: + # we have files waiting to be processed + self.logger.debug('{} file(s) are waiting to be processed'.format(len(to_check_new))) + to_check.append(to_check_new[-1]) + to_check_archive = sorted([f for f in self.archive_dir.iterdir() if f.is_file()]) + if to_check_archive: + # we have files already processed, in the archive + self.logger.debug('{} file(s) have been processed'.format(len(to_check_archive))) + to_check.append(to_check_archive[-1]) + if not to_check: + self.logger.debug('New list, no hisorical files') + # nothing has been downloaded ever, moving on + return False + for last_file in to_check: + with last_file.open('rb') as f: + last_hash = sha512(f.read()) + dl_hash = sha512(downloaded) + if (dl_hash.digest() == last_hash.digest() and + parser.parse(last_file.name.split('.')[0]).date() == date.today()): + self.logger.debug('Same file already downloaded today.') + return True + return False + + @asyncio.coroutine + async def fetch_list(self): + '''Fetch & store the list''' + if not self.fetcher: + return + try: + with PidFile('{}.pid'.format(self.listname), piddir=self.meta): + if not self.__newer(): + return + r = requests.get(self.url) + if self.__same_as_last(r.content): + return + self.logger.info('Got a new file \o/') + with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: + f.write(r.content) + except PidFileError: + self.logger.info('Fetcher already running') + + +# get announcer: https://stat.ripe.net/data/network-info/data.json?resource=149.13.33.14 + +class RawFilesParser(): + + def __init__(self, config_file: Path, storage_directory: Path, + loglevel: int=logging.DEBUG): + with open(config_file, 'r') as f: + module_parameters = json.load(f) + self.vendor = module_parameters['vendor'] + self.listname = module_parameters['name'] + if 'parser' in module_parameters: + self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser']).parse_raw_file, self) + self.source = '{}-{}'.format(self.vendor, self.listname) + self.directory = storage_directory / self.vendor / self.listname + safe_create_dir(self.directory) + self.__init_logger(loglevel) + self.redis_intake = Redis(host='localhost', port=6379, db=0) + self.logger.debug('Starting intake on {}'.format(self.source)) + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, + self.vendor, self.listname)) + self.logger.setLevel(loglevel) + + @property + def files_to_parse(self) -> List[Path]: + return sorted([f for f in self.directory.iterdir() if f.is_file()], reverse=True) + + def extract_ipv4(self, bytestream: bytes) -> List[bytes]: + return re.findall(rb'[0-9]+(?:\.[0-9]+){3}', bytestream) + + def parse_raw_file(self, f: BytesIO): + self.datetime = datetime.now() + return self.extract_ipv4(f.getvalue()) + + @asyncio.coroutine + async def parse_raw_files(self): + for filepath in self.files_to_parse: + self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1)) + with open(filepath, 'rb') as f: + to_parse = BytesIO(f.read()) + p = self.redis_intake.pipeline() + for ip in self.parse_raw_file(to_parse): + uuid = uuid4() + p.hmset(uuid, {'ip': ip, 'source': self.source, + 'datetime': self.datetime.isoformat()}) + p.sadd('intake', uuid) + p.execute() + self._archive(filepath) + + def _archive(self, filepath: Path): + '''After processing, move file to the archive directory''' + filepath.rename(self.directory / 'archive' / filepath.name) + + +class Sanitizer(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.redis_intake = Redis(host='localhost', port=6379, db=0, decode_responses=True) + self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True) + self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) + self.logger.debug('Starting import') + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + async def sanitize(self): + while True: + uuid = self.redis_intake.spop('intake') + if not uuid: + break + data = self.redis_intake.hgetall(uuid) + try: + ip = ipaddress.ip_address(data['ip']) + except ValueError: + self.logger.info('Invalid IP address: {}'.format(data['ip'])) + continue + if not ip.is_global: + self.logger.info('The IP address {} is not global'.format(data['ip'])) + continue + + date = parser.parse(data['datetime']).date().isoformat() + # NOTE: to consider: discard data with an old timestamp (define old) + + # Add to temporay DB for further processing + self.ris_cache.sadd('for_ris_lookup', str(ip)) + pipeline = self.redis_sanitized.pipeline() + pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], + 'date': date, 'datetime': data['datetime']}) + pipeline.sadd('to_insert', uuid) + pipeline.execute() + self.redis_intake.delete(uuid) + + +class DatabaseInsert(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.ardb_storage = StrictRedis(host='localhost', port=16379, decode_responses=True) + self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True) + self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) + self.logger.debug('Starting import') + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + async def insert(self): + while True: + uuid = self.redis_sanitized.spop('to_insert') + if not uuid: + break + data = self.redis_sanitized.hgetall(uuid) + # Data gathered from the RIS queries: + # * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo + # * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo + # * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview + ris_entry = self.ris_cache.hgetall(data['ip']) + if not ris_entry: + # RIS data not available yet, retry later + # FIXME: an IP can sometimes not be announced, we need to discard it + self.redis_sanitized.sadd('to_insert', uuid) + # In case this IP is missing in the set to process + self.ris_cache.sadd('for_ris_lookup', data['ip']) + continue + # Format: |sources -> set([, ...]) + self.ardb_storage.sadd('{}|sources'.format(data['date']), data['source']) + + # Format: | -> set([, ...]) + self.ardb_storage.sadd('{}|{}'.format(data['date'], data['source']), + ris_entry['asn']) + # Format: || -> set([, ...]) + self.ardb_storage.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']), + ris_entry['prefix']) + + # Format: ||| -> set([|, ...]) + self.ardb_storage.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], + ris_entry['asn'], + ris_entry['prefix']), + '{}|{}'.format(data['ip'], data['datetime'])) + self.redis_sanitized.delete(uuid) + + +class StatsRIPE(): + + def __init__(self, sourceapp='bgpranking-ng - CIRCL'): + self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}" + self.url_parameters = {'sourceapp': sourceapp} + + async def network_info(self, ip: str) -> dict: + method = 'network-info' + self.url_parameters['resource'] = ip + parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()]) + url = self.url.format(method=method, parameters=parameters) + response = requests.get(url) + return response.json() + + async def prefix_overview(self, prefix: str) -> dict: + method = 'prefix-overview' + self.url_parameters['resource'] = prefix + parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()]) + url = self.url.format(method=method, parameters=parameters) + response = requests.get(url) + return response.json() + + +class RoutingInformationServiceFetcher(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.ris_cache = Redis(host='localhost', port=6381, db=0) + self.logger.debug('Starting RIS fetcher') + self.ripe = StatsRIPE() + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + async def fetch(self): + while True: + ip = self.ris_cache.spop('for_ris_lookup') + if not ip: + break + ip = ip.decode() + network_info = await self.ripe.network_info(ip) + prefix = network_info['data']['prefix'] + asns = network_info['data']['asns'] + if not asns or not prefix: + self.logger.warning('The IP {} does not seem to be announced'.format(ip)) + continue + prefix_overview = await self.ripe.prefix_overview(prefix) + description = prefix_overview['data']['block']['desc'] + if not description: + description = prefix_overview['data']['block']['name'] + for asn in asns: + self.ris_cache.hmset(ip, {'asn': asn, 'prefix': prefix, + 'description': description}) + + +class DeepArchive(): + + def __init__(self, config_file: Path, storage_directory: Path, + loglevel: int=logging.DEBUG): + '''Archive everyfile older than 2 month.''' + with open(config_file, 'r') as f: + module_parameters = json.load(f) + self.vendor = module_parameters['vendor'] + self.listname = module_parameters['name'] + self.directory = storage_directory / self.vendor / self.listname / 'archive' + safe_create_dir(self.directory) + self.deep_archive = self.directory / 'deep' + safe_create_dir(self.deep_archive) + self.__init_logger(loglevel) + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, + self.vendor, self.listname)) + self.logger.setLevel(loglevel) + + def archive(self): + to_archive = defaultdict(list) + today = date.today() + last_day_to_keep = date(today.year, today.month, 1) - relativedelta(months=2) + for p in self.directory.iterdir(): + if not p.is_file(): + continue + filedate = parser.parse(p.name.split('.')[0]).date() + if filedate >= last_day_to_keep: + continue + to_archive['{}.zip'.format(filedate.strftime('%Y%m'))].append(p) + if to_archive: + self.logger.info('Found old files. Archiving: {}'.format(', '.join(to_archive.keys()))) + else: + self.logger.debug('No old files.') + for archivename, path_list in to_archive.items(): + with zipfile.ZipFile(self.deep_archive / archivename, 'x', zipfile.ZIP_DEFLATED) as z: + for f in path_list: + z.write(f, f.name) + # Delete all the files if the archiving worked out properly + [f.unlink() for f in path_list] diff --git a/listimport/libs/StatsRipe.py b/bgpranking/libs/StatsRipe.py similarity index 100% rename from listimport/libs/StatsRipe.py rename to bgpranking/libs/StatsRipe.py diff --git a/listimport/libs/StatsRipeText.py b/bgpranking/libs/StatsRipeText.py similarity index 100% rename from listimport/libs/StatsRipeText.py rename to bgpranking/libs/StatsRipeText.py diff --git a/listimport/libs/__init__.py b/bgpranking/libs/__init__.py similarity index 100% rename from listimport/libs/__init__.py rename to bgpranking/libs/__init__.py diff --git a/listimport/libs/exceptions.py b/bgpranking/libs/exceptions.py similarity index 100% rename from listimport/libs/exceptions.py rename to bgpranking/libs/exceptions.py diff --git a/bgpranking/libs/helpers.py b/bgpranking/libs/helpers.py new file mode 100644 index 0000000..1d1cd6b --- /dev/null +++ b/bgpranking/libs/helpers.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys +from pathlib import Path +from .exceptions import CreateDirectoryException +from redis import StrictRedis +from redis.exceptions import ConnectionError +from datetime import datetime, timedelta +import time + + +def get_config_path(): + return Path(sys.modules['bgpranking'].__file__).parent / 'config' + + +def get_list_storage_path(): + return Path(os.environ['VIRTUAL_ENV']) + + +def get_homedir(): + return Path(os.environ['BGPRANKING_HOME']) + + +def safe_create_dir(to_create: Path): + if to_create.exists() and not to_create.is_dir(): + raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create)) + os.makedirs(to_create, exist_ok=True) + + +def set_running(name: str): + r = StrictRedis(host='localhost', port=6582, db=1, decode_responses=True) + r.hset('running', name, 1) + + +def unset_running(name: str): + r = StrictRedis(host='localhost', port=6582, db=1, decode_responses=True) + r.hdel('running', name) + + +def is_running(): + r = StrictRedis(host='localhost', port=6582, db=1, decode_responses=True) + return r.hgetall('running') + + +def shutdown_requested(): + try: + r = StrictRedis(host='localhost', port=6582, db=1, decode_responses=True) + return r.exists('shutdown') + except ConnectionRefusedError: + return True + except ConnectionError: + return True + + +def long_sleep(sleep_in_sec: int, shutdown_check: int=10): + sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec) + while sleep_until > datetime.now(): + time.sleep(shutdown_check) + if shutdown_requested(): + return False + return True diff --git a/listimport/modulesfetcher.py b/bgpranking/modulesfetcher.py similarity index 92% rename from listimport/modulesfetcher.py rename to bgpranking/modulesfetcher.py index 56d2062..9b96472 100644 --- a/listimport/modulesfetcher.py +++ b/bgpranking/modulesfetcher.py @@ -10,7 +10,7 @@ import logging from pid import PidFile, PidFileError import json -from .libs.helpers import safe_create_dir +from .libs.helpers import safe_create_dir, set_running, unset_running class Fetcher(): @@ -127,11 +127,12 @@ class Fetcher(): '''Fetch & store the list''' if not self.fetcher: return + set_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) try: with PidFile('{}.pid'.format(self.listname), piddir=self.meta): if not await self.__newer(): + unset_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) return - async with aiohttp.ClientSession() as session: async with session.get(self.url) as r: content = await r.content.read() @@ -140,5 +141,8 @@ class Fetcher(): self.logger.info('Got a new file \o/') with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: f.write(content) + unset_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) except PidFileError: self.logger.info('Fetcher already running') + finally: + unset_running('{}-{}-{}'.format(self.__class__.__name__, self.vendor, self.listname)) diff --git a/bgpranking/monitor.py b/bgpranking/monitor.py new file mode 100644 index 0000000..adaded1 --- /dev/null +++ b/bgpranking/monitor.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from redis import StrictRedis + + +class Monitor(): + + def __init__(self): + self.intake = StrictRedis(host='localhost', port=6579, db=0, decode_responses=True) + self.sanitize = StrictRedis(host='localhost', port=6580, db=0, decode_responses=True) + self.ris_cache = StrictRedis(host='localhost', port=6581, db=0, decode_responses=True) + self.prefix_cache = StrictRedis(host='localhost', port=6582, db=0, decode_responses=True) + self.running = StrictRedis(host='localhost', port=6582, db=1, decode_responses=True) + self.storage = StrictRedis(host='localhost', port=16579, decode_responses=True) + + def get_runinng(self): + return self.running.hgetall('running') + + def info_prefix_cache(self): + to_return = {'IPv6 Dump': '', 'IPv4 Dump': '', 'Number ASNs': 0} + if self.prefix_cache.exists('ready'): + v6_dump = self.prefix_cache.get('current|v6') + v4_dump = self.prefix_cache.get('current|v4') + number_as = self.prefix_cache.scard('asns') + to_return['IPv6 Dump'] = v6_dump + to_return['IPv4 Dump'] = v4_dump + to_return['Number ASNs'] = number_as + return to_return + + def get_values(self): + ips_in_intake = self.intake.scard('intake') + waiting_for_ris_lookup = self.ris_cache.scard('for_ris_lookup') + ready_to_insert = self.sanitize.scard('to_insert') + prefix_db_ready = self.prefix_cache.exists('ready') + return {'Non-parsed IPs': ips_in_intake, 'Parsed IPs': ready_to_insert, + 'Awaiting prefix lookup': waiting_for_ris_lookup, + 'Prefix database ready': prefix_db_ready} diff --git a/listimport/parser.py b/bgpranking/parser.py similarity index 92% rename from listimport/parser.py rename to bgpranking/parser.py index 449fb60..8e2f41b 100644 --- a/listimport/parser.py +++ b/bgpranking/parser.py @@ -14,7 +14,7 @@ import importlib from typing import List import types -from .libs.helpers import safe_create_dir +from .libs.helpers import safe_create_dir, set_running, unset_running class RawFilesParser(): @@ -26,7 +26,7 @@ class RawFilesParser(): self.vendor = module_parameters['vendor'] self.listname = module_parameters['name'] if 'parser' in module_parameters: - self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser'], 'listimport').parse_raw_file, self) + self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser'], 'bgpranking').parse_raw_file, self) self.source = '{}-{}'.format(self.vendor, self.listname) self.directory = storage_directory / self.vendor / self.listname safe_create_dir(self.directory) @@ -55,7 +55,8 @@ class RawFilesParser(): self.datetime = datetime.now() return self.extract_ipv4(f.getvalue()) - async def parse_raw_files(self): + def parse_raw_files(self): + set_running(self.source) for filepath in self.files_to_parse: self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1)) with open(filepath, 'rb') as f: @@ -68,6 +69,7 @@ class RawFilesParser(): p.sadd('intake', uuid) p.execute() self._archive(filepath) + unset_running(self.source) def _archive(self, filepath: Path): '''After processing, move file to the archive directory''' diff --git a/listimport/parsers/__init__.py b/bgpranking/parsers/__init__.py similarity index 100% rename from listimport/parsers/__init__.py rename to bgpranking/parsers/__init__.py diff --git a/listimport/parsers/abusech.py b/bgpranking/parsers/abusech.py similarity index 100% rename from listimport/parsers/abusech.py rename to bgpranking/parsers/abusech.py diff --git a/listimport/parsers/default.py b/bgpranking/parsers/default.py similarity index 100% rename from listimport/parsers/default.py rename to bgpranking/parsers/default.py diff --git a/listimport/parsers/dshield.py b/bgpranking/parsers/dshield.py similarity index 100% rename from listimport/parsers/dshield.py rename to bgpranking/parsers/dshield.py diff --git a/listimport/parsers/malc0de.py b/bgpranking/parsers/malc0de.py similarity index 100% rename from listimport/parsers/malc0de.py rename to bgpranking/parsers/malc0de.py diff --git a/listimport/parsers/nothink.py b/bgpranking/parsers/nothink.py similarity index 100% rename from listimport/parsers/nothink.py rename to bgpranking/parsers/nothink.py diff --git a/listimport/initranking.py b/bgpranking/prefixdb.py similarity index 64% rename from listimport/initranking.py rename to bgpranking/prefixdb.py index 0552d7c..8057899 100644 --- a/listimport/initranking.py +++ b/bgpranking/prefixdb.py @@ -9,6 +9,9 @@ import gzip from io import BytesIO from collections import defaultdict import re +import time +from .libs.helpers import set_running, unset_running + # Dataset source: Routeviews Prefix to AS mappings Dataset for IPv4 and IPv6 # http://www.caida.org/data/routing/routeviews-prefix2as.xml @@ -18,7 +21,7 @@ class PrefixDatabase(): def __init__(self, loglevel: int=logging.DEBUG): self.__init_logger(loglevel) - self.redis_cache = Redis(host='localhost', port=6582, db=0, decode_responses=True) + self.prefix_cache = Redis(host='localhost', port=6582, db=0, decode_responses=True) self.ipv6_url = 'http://data.caida.org/datasets/routing/routeviews6-prefix2as/{}' self.ipv4_url = 'http://data.caida.org/datasets/routing/routeviews-prefix2as/{}' @@ -26,11 +29,20 @@ class PrefixDatabase(): self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) self.logger.setLevel(loglevel) + def update_required(self): + v4_is_new, v4_path = self._has_new('v4', self.ipv4_url) + v6_is_new, v6_path = self._has_new('v6', self.ipv6_url) + if any([v4_is_new, v6_is_new]): + self.logger.info('Prefix update required.') + else: + self.logger.debug('No prefix update required.') + return any([v4_is_new, v6_is_new]) + def _has_new(self, address_family, root_url): r = requests.get(root_url.format('pfx2as-creation.log')) last_entry = r.text.split('\n')[-2] path = last_entry.split('\t')[-1] - if path == self.redis_cache.get('current|{}'.format(address_family)): + if path == self.prefix_cache.get('current|{}'.format(address_family)): self.logger.debug('Same file already loaded: {}'.format(path)) return False, path return True, path @@ -42,13 +54,13 @@ class PrefixDatabase(): with gzip.open(BytesIO(r.content), 'r') as f: for line in f: prefix, length, asns = line.decode().strip().split('\t') - # The meaning of AS set and multi-origin AS in unclear. Tacking the first ASN in the list only. + # The meaning of AS set and multi-origin AS in unclear. Taking the first ASN in the list only. asn = re.split('[,_]', asns)[0] network = ip_network('{}/{}'.format(prefix, length)) to_import[asn][address_family].add(str(network)) to_import[asn]['ipcount'] += network.num_addresses - p = self.redis_cache.pipeline() + p = self.prefix_cache.pipeline() p.sadd('asns', *to_import.keys()) for asn, data in to_import.items(): p.sadd('{}|{}'.format(asn, address_family), *data[address_family]) @@ -58,10 +70,17 @@ class PrefixDatabase(): return True def load_prefixes(self): + set_running(self.__class__.__name__) + self.prefix_cache.delete('ready') + self.logger.info('Prefix update starting in a few seconds.') + time.sleep(15) v4_is_new, v4_path = self._has_new('v4', self.ipv4_url) v6_is_new, v6_path = self._has_new('v6', self.ipv6_url) - if v4_is_new or v6_is_new: - self.redis_cache.flushdb() - self._init_routes('v6', self.ipv6_url, v6_path) - self._init_routes('v4', self.ipv4_url, v4_path) + self.prefix_cache.flushdb() + # TODO: Add a catchall for everything that isn't announced so we can track that down later on + self._init_routes('v6', self.ipv6_url, v6_path) + self._init_routes('v4', self.ipv4_url, v4_path) + self.prefix_cache.set('ready', 1) + self.logger.info('Prefix update complete.') + unset_running(self.__class__.__name__) diff --git a/bgpranking/risfetcher.py b/bgpranking/risfetcher.py new file mode 100644 index 0000000..8953c91 --- /dev/null +++ b/bgpranking/risfetcher.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from redis import Redis + +import time +import pytricia +import ipaddress +from .libs.helpers import shutdown_requested, set_running, unset_running + + +class RISPrefixLookup(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.logger.info('Starting RIS Prefix fetcher') + self.prefix_db = Redis(host='localhost', port=6582, db=0, decode_responses=True) + self.longest_prefix_matching = Redis(host='localhost', port=6581, db=0, decode_responses=True) + self.tree_v4 = pytricia.PyTricia() + self.tree_v6 = pytricia.PyTricia(128) + self.init_tree() + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + def cache_prefix(self, pipe, ip, prefix, asns): + pipe.hmset(ip, {'asn': asns, 'prefix': prefix}) + pipe.expire(ip, 43200) # 12H + + def init_tree(self): + for asn in self.prefix_db.smembers('asns'): + for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v4')): + self.tree_v4[prefix] = asn + for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v6')): + self.tree_v6[prefix] = asn + self.tree_v4['0.0.0.0/0'] = 0 + self.tree_v4['::/0'] = 0 + + def run(self): + set_running(self.__class__.__name__) + while True: + if shutdown_requested(): + break + if not self.prefix_db.get('ready'): + self.logger.debug('Prefix database not ready.') + time.sleep(5) + continue + ips = self.longest_prefix_matching.spop('for_ris_lookup', 100) + if not ips: # TODO: add a check against something to stop the loop + self.logger.debug('Nothing to lookup') + break + pipe = self.longest_prefix_matching.pipeline(transaction=False) + for ip in ips: + if self.longest_prefix_matching.exists(ip): + self.logger.debug('Already cached: {}'.format(ip)) + continue + ip = ipaddress.ip_address(ip) + if ip.version == 4: + prefix = self.tree_v4.get_key(ip) + asns = self.tree_v4.get(ip) + else: + prefix = self.tree_v6.get_key(ip) + asns = self.tree_v6.get(ip) + if not prefix: + self.logger.warning('The IP {} does not seem to be announced'.format(ip)) + continue + self.cache_prefix(pipe, ip, prefix, asns) + pipe.execute() + unset_running(self.__class__.__name__) diff --git a/bgpranking/sanitizer.py b/bgpranking/sanitizer.py new file mode 100644 index 0000000..929513d --- /dev/null +++ b/bgpranking/sanitizer.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil import parser +import logging +from redis import StrictRedis +from .libs.helpers import shutdown_requested, set_running, unset_running + +import ipaddress + + +class Sanitizer(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.redis_intake = StrictRedis(host='localhost', port=6579, db=0, decode_responses=True) + self.redis_sanitized = StrictRedis(host='localhost', port=6580, db=0, decode_responses=True) + self.ris_cache = StrictRedis(host='localhost', port=6581, db=0, decode_responses=True) + self.logger.debug('Starting import') + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + def sanitize(self): + set_running(self.__class__.__name__) + while True: + if shutdown_requested(): + break + uuids = self.redis_intake.spop('intake', 100) + if not uuids: + break + for_ris_lookup = [] + pipeline = self.redis_sanitized.pipeline(transaction=False) + for uuid in uuids: + data = self.redis_intake.hgetall(uuid) + try: + ip = ipaddress.ip_address(data['ip']) + except ValueError: + self.logger.info('Invalid IP address: {}'.format(data['ip'])) + continue + if not ip.is_global: + self.logger.info('The IP address {} is not global'.format(data['ip'])) + continue + + date = parser.parse(data['datetime']).date().isoformat() + # NOTE: to consider: discard data with an old timestamp (define old) + + # Add to temporay DB for further processing + for_ris_lookup.append(str(ip)) + pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], + 'date': date, 'datetime': data['datetime']}) + pipeline.sadd('to_insert', uuid) + pipeline.execute() + self.redis_intake.delete(*uuid) + self.ris_cache.sadd('for_ris_lookup', *for_ris_lookup) + unset_running(self.__class__.__name__) diff --git a/bin/.rislookup.py.swp b/bin/.rislookup.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..bf51047d2d5b888043976ef09c476f065d1d1a06 GIT binary patch literal 12288 zcmeI2O=}ZD7{{lct%{=b;9+VF*_33{w?kRTNf07U6{~`R44driX6WwhIy2jsi-Knl zeg{E6fM*ZZPoQT%fnPv9=`;HhV_Pb{2!&_h&+KNN{XftAW^)L6hD(q71G?@m5g3;U zxw-k_$>T+rJiSJU^0TqeW4m(V2+}C=jo3Re2o|4LhTTXQ-5o}m^5wRWQAbAIp^#lA zbeyK!dFE#O|MJ3hB!C1?K_E9_XML8et}GW{^@SVr^vdOjr!c@!B!C2v01`j~NB{{S z0VIF~&N=~;&5#$ciF4IPu2=KSp}D%n2?-zpB!C2v01`j~NB{{S0VIF~kN^@mg9QA5 zkXPpknVW^;;s5{o_y4zxgnR*g1|5K2g9e}t(60-Gd;%SSK7!tX-hvcp4YUfH1N{Wg zAE57`ub}s!H=viG40H#y2(r1af^Z-KB!C2v01`j~NB{{S0VIF~{yzcw*BC19ZwJXGSQ(Vv`6pvw>FCA=|PznazUkxNk8F?(O!=>8Pp^sV@(f; z+5+s*JPuvxysm4s^;UHTJfzdr+fb;76we*=Xj#QQ#Y3@I*5brz6!RS(_v(w1fOTd} zNMRUrtZf^3mXRJ6(%A3_I>X)m=Iy&S8JE_aE^CaoSrA-6h@GvpGW~1*l z23}*U*&=nGp|8d0Rw_dgIbo`xogKm)UyqDPcuUiUV>uQVg>`x7HJpTN4aeNl%{C#S zN)!60uicHsP`hIuXIyD2k}Orz?@gN?J?|qHa5B})QyaiRPa`H1-+~w+3$(M?p`$ba cejY8_e(fh@VZNJdRW#ajxkIykGfpM>4fnPm`2YX_ literal 0 HcmV?d00001 diff --git a/bin/archiver.py b/bin/archiver.py new file mode 100755 index 0000000..eff4220 --- /dev/null +++ b/bin/archiver.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from bgpranking.archive import DeepArchive +import logging +from pathlib import Path +from bgpranking.libs.helpers import get_config_path, get_list_storage_path +from pid import PidFile, PidFileError + + +logger = logging.getLogger('Archiver') +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + +# NOTE: +# * Supposed to run once every ~2 months + + +class ModulesArchiver(): + + def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.INFO): + if not config_dir: + config_dir = get_config_path() + if not storage_directory: + self.storage_directory = get_list_storage_path() + modules_config = config_dir / 'modules' + modules_paths = [modulepath for modulepath in modules_config.glob('*.json')] + self.modules = [DeepArchive(path, self.storage_directory, loglevel) for path in modules_paths] + + def archive(self): + [module.archive() for module in self.modules] + + +if __name__ == '__main__': + archiver = ModulesArchiver() + try: + with PidFile(piddir=archiver.storage_directory): + logger.info('Archiving...') + archiver.archive() + logger.info('... done.') + except PidFileError: + logger.warning('Archiver already running, skip.') diff --git a/dbinsert.py b/bin/dbinsert.py similarity index 57% rename from dbinsert.py rename to bin/dbinsert.py index a2d42bd..9aca9fa 100755 --- a/dbinsert.py +++ b/bin/dbinsert.py @@ -2,8 +2,8 @@ # -*- coding: utf-8 -*- import logging -import asyncio -from listimport.dbinsert import DatabaseInsert +from bgpranking.dbinsert import DatabaseInsert +from bgpranking.libs.helpers import long_sleep, shutdown_requested logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') @@ -15,11 +15,15 @@ class DBInsertManager(): self.loglevel = loglevel self.dbinsert = DatabaseInsert(loglevel) - async def run_insert(self): - await asyncio.gather(self.dbinsert.insert()) + def run_insert(self): + self.dbinsert.insert() if __name__ == '__main__': modules_manager = DBInsertManager() - loop = asyncio.get_event_loop() - loop.run_until_complete(modules_manager.run_insert()) + while True: + if shutdown_requested(): + break + modules_manager.run_insert() + if not long_sleep(120): + break diff --git a/bin/fetcher.py b/bin/fetcher.py new file mode 100755 index 0000000..beb3fcf --- /dev/null +++ b/bin/fetcher.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from pathlib import Path +from bgpranking.libs.helpers import long_sleep, shutdown_requested +import aiohttp + +from bgpranking.modulesfetcher import Fetcher +from bgpranking.libs.helpers import get_config_path, get_list_storage_path + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + +logger = logging.getLogger('Fetcher') + + +class ModulesManager(): + + def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.DEBUG): + if not config_dir: + config_dir = get_config_path() + if not storage_directory: + storage_directory = get_list_storage_path() + modules_config = config_dir / 'modules' + modules_paths = [modulepath for modulepath in modules_config.glob('*.json')] + self.modules = [Fetcher(path, storage_directory, loglevel) for path in modules_paths] + + async def run_fetchers(self): + await asyncio.gather( + *[module.fetch_list() for module in self.modules if module.fetcher] + ) + + +if __name__ == '__main__': + modules_manager = ModulesManager() + loop = asyncio.get_event_loop() + while True: + if shutdown_requested(): + break + try: + loop.run_until_complete(modules_manager.run_fetchers()) + except aiohttp.client_exceptions.ClientConnectorError: + logger.critical('Exception while fetching lists.') + long_sleep(60) + continue + if not long_sleep(3600): + break diff --git a/bin/loadprefixes.py b/bin/loadprefixes.py new file mode 100755 index 0000000..f3980ab --- /dev/null +++ b/bin/loadprefixes.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from bgpranking.prefixdb import PrefixDatabase +from bgpranking.libs.helpers import long_sleep, shutdown_requested +import requests + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + +logger = logging.getLogger('PrefixDB Fetcher') + + +class PrefixDBManager(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.prefix_db = PrefixDatabase(loglevel=loglevel) + + def load_prefixes(self): + self.prefix_db.load_prefixes() + + def needs_update(self): + return self.prefix_db.update_required() + + +if __name__ == '__main__': + p = PrefixDBManager() + while True: + if shutdown_requested(): + break + try: + if p.needs_update(): + p.load_prefixes() + except requests.exceptions.ConnectionError: + logger.critical('Unable to download the prefix database.') + long_sleep(60) + continue + if not long_sleep(3600): + break diff --git a/bin/monitor.py b/bin/monitor.py new file mode 100755 index 0000000..67b31ac --- /dev/null +++ b/bin/monitor.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from bgpranking.monitor import Monitor +import logging + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class MonitorManager(): + + def __init__(self, loglevel: int=logging.INFO): + self.monitor = Monitor() + + def get_values(self): + generic = self.monitor.get_values() + prefix_cache = self.monitor.info_prefix_cache() + running = self.monitor.get_runinng() + return generic, prefix_cache, running + + +if __name__ == '__main__': + m = MonitorManager() + print(m.get_values()) diff --git a/bin/parser.py b/bin/parser.py new file mode 100755 index 0000000..61048fa --- /dev/null +++ b/bin/parser.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from pathlib import Path +from bgpranking.parser import RawFilesParser +from bgpranking.libs.helpers import get_config_path, get_list_storage_path +from bgpranking.libs.helpers import long_sleep, shutdown_requested + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class ParserManager(): + + def __init__(self, config_dir: Path=None, storage_directory: Path=None, loglevel: int=logging.DEBUG): + if not config_dir: + config_dir = get_config_path() + if not storage_directory: + storage_directory = get_list_storage_path() + modules_config = config_dir / 'modules' + modules_paths = [modulepath for modulepath in modules_config.glob('*.json')] + self.modules = [RawFilesParser(path, storage_directory, loglevel) for path in modules_paths] + + def run_intake(self): + [module.parse_raw_files() for module in self.modules] + + +if __name__ == '__main__': + parser_manager = ParserManager() + while True: + if shutdown_requested(): + break + parser_manager.run_intake() + if not long_sleep(120): + break diff --git a/bin/rislookup.py b/bin/rislookup.py new file mode 100755 index 0000000..8553fe9 --- /dev/null +++ b/bin/rislookup.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from bgpranking.risfetcher import RISPrefixLookup +from bgpranking.libs.helpers import long_sleep, shutdown_requested + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class RISLookupManager(): + + def __init__(self, loglevel: int=logging.INFO): + self.ris_fetcher = RISPrefixLookup(loglevel=loglevel) + + def run_fetcher(self): + self.ris_fetcher.run() + + +if __name__ == '__main__': + modules_manager = RISLookupManager() + while True: + if shutdown_requested(): + break + modules_manager.run_fetcher() + if not long_sleep(120): + break diff --git a/bin/run_backend.py b/bin/run_backend.py new file mode 100755 index 0000000..68475fc --- /dev/null +++ b/bin/run_backend.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from bgpranking.libs.helpers import get_homedir +from subprocess import Popen +import time +from pathlib import Path +from redis import Redis + +import argparse + + +def launch_cache(storage_directory: Path=None): + if not storage_directory: + storage_directory = get_homedir() + Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache')) + + +def shutdown_cache(storage_directory: Path=None): + if not storage_directory: + storage_directory = get_homedir() + Popen(["./shutdown_redis.sh"], cwd=(storage_directory / 'cache')) + + +def launch_temp(storage_directory: Path=None): + if not storage_directory: + storage_directory = get_homedir() + Popen(["./run_redis.sh"], cwd=(storage_directory / 'temp')) + + +def shutdown_temp(storage_directory: Path=None): + if not storage_directory: + storage_directory = get_homedir() + Popen(["./shutdown_redis.sh"], cwd=(storage_directory / 'temp')) + + +def launch_storage(storage_directory: Path=None): + if not storage_directory: + storage_directory = get_homedir() + Popen(["./run_ardb.sh"], cwd=(storage_directory / 'storage')) + + +def shutdown_storage(storage_directory: Path=None): + if not storage_directory: + storage_directory = get_homedir() + Popen(["./shutdown_ardb.sh"], cwd=(storage_directory / 'storage')) + + +def check_running(host, port): + r = Redis(host=host, port=port) + return r.ping() + + +def launch_all(): + launch_cache() + launch_temp() + launch_storage() + + +def check_all(stop=False): + backends = [['127.0.0.1', 6579, False], ['127.0.0.1', 6580, False], + ['127.0.0.1', 6581, False], ['127.0.0.1', 6582, False], + ['127.0.0.1', 16579, False]] + while True: + for b in backends: + try: + b[2] = check_running(b[0], b[1]) + except Exception: + b[2] = False + if stop: + if not any(b[2] for b in backends): + break + else: + if all(b[2] for b in backends): + break + for b in backends: + if not stop and not b[2]: + print('Waiting on {}:{}'.format(b[0], b[1])) + if stop and b[2]: + print('Waiting on {}:{}'.format(b[0], b[1])) + time.sleep(1) + + +def stop_all(): + shutdown_cache() + shutdown_temp() + shutdown_storage() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Manage backend DBs.') + parser.add_argument("--start", action='store_true', default=False, help="Start all") + parser.add_argument("--stop", action='store_true', default=False, help="Stop all") + parser.add_argument("--status", action='store_true', default=True, help="Show status") + args = parser.parse_args() + + if args.start: + launch_all() + if args.stop: + stop_all() + if not args.stop and args.status: + check_all() diff --git a/sanitize.py b/bin/sanitizer.py similarity index 56% rename from sanitize.py rename to bin/sanitizer.py index b5f6c33..e1a8036 100755 --- a/sanitize.py +++ b/bin/sanitizer.py @@ -2,8 +2,8 @@ # -*- coding: utf-8 -*- import logging -import asyncio -from listimport.sanitizer import Sanitizer +from bgpranking.sanitizer import Sanitizer +from bgpranking.libs.helpers import long_sleep, shutdown_requested logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.WARNING, datefmt='%I:%M:%S') @@ -15,11 +15,15 @@ class SanitizerManager(): self.loglevel = loglevel self.sanitizer = Sanitizer(loglevel) - async def run_sanitizer(self): - await asyncio.gather(self.sanitizer.sanitize()) + def run_sanitizer(self): + self.sanitizer.sanitize() if __name__ == '__main__': modules_manager = SanitizerManager() - loop = asyncio.get_event_loop() - loop.run_until_complete(modules_manager.run_sanitizer()) + while True: + if shutdown_requested(): + break + modules_manager.run_sanitizer() + if not long_sleep(120): + break diff --git a/bin/shutdown.py b/bin/shutdown.py new file mode 100644 index 0000000..6b216c6 --- /dev/null +++ b/bin/shutdown.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from bgpranking.libs.helpers import is_running +import time +from redis import StrictRedis + +if __name__ == '__main__': + r = StrictRedis(host='localhost', port=6582, db=1, decode_responses=True) + r.set('shutdown', 1) + while True: + running = is_running() + print(running) + if not running: + break + time.sleep(10) diff --git a/bin/start.py b/bin/start.py new file mode 100644 index 0000000..ab4fc40 --- /dev/null +++ b/bin/start.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from subprocess import Popen + + +if __name__ == '__main__': + p = Popen(['run_backend.py', '--start']) + p.wait() + Popen(['loadprefixes.py']) + Popen(['rislookup.py']) + Popen(['fetcher.py']) + Popen(['parser.py']) + Popen(['sanitizer.py']) + Popen(['dbinsert.py']) diff --git a/bin/stop.py b/bin/stop.py new file mode 100644 index 0000000..07babce --- /dev/null +++ b/bin/stop.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from subprocess import Popen + + +if __name__ == '__main__': + p = Popen(['shutdown.py']) + p.wait() + Popen(['run_backend.py', '--stop']) diff --git a/fetcher.py b/fetcher.py deleted file mode 100755 index 96bb64d..0000000 --- a/fetcher.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging -import asyncio -from pathlib import Path - -from listimport.modulesfetcher import Fetcher - -logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', - level=logging.INFO, datefmt='%I:%M:%S') - - -class ModulesManager(): - - def __init__(self, config_dir: Path=Path('listimport', 'modules_config'), - storage_directory: Path=Path('rawdata'), - loglevel: int=logging.DEBUG): - self.config_dir = config_dir - print(config_dir) - self.storage_directory = storage_directory - self.loglevel = loglevel - self.modules_paths = [modulepath for modulepath in self.config_dir.glob('*.json')] - self.modules = [Fetcher(path, self.storage_directory, self.loglevel) - for path in self.modules_paths] - - async def run_fetchers(self): - await asyncio.gather( - *[module.fetch_list() for module in self.modules if module.fetcher] - ) - - -if __name__ == '__main__': - modules_manager = ModulesManager() - loop = asyncio.get_event_loop() - loop.run_until_complete(modules_manager.run_fetchers()) diff --git a/intake.py b/intake.py deleted file mode 100755 index 22f18d5..0000000 --- a/intake.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging -import asyncio -from pathlib import Path -from listimport.parser import RawFilesParser - -logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', - level=logging.INFO, datefmt='%I:%M:%S') - - -class IntakeManager(): - - def __init__(self, config_dir: Path=Path('listimport', 'modules_config'), - storage_directory: Path=Path('rawdata'), - loglevel: int=logging.DEBUG): - self.config_dir = config_dir - self.storage_directory = storage_directory - self.loglevel = loglevel - self.modules_paths = [modulepath for modulepath in self.config_dir.glob('*.json')] - self.modules = [RawFilesParser(path, self.storage_directory, self.loglevel) - for path in self.modules_paths] - - async def run_intake(self): - await asyncio.gather( - *[module.parse_raw_files() for module in self.modules] - ) - - -if __name__ == '__main__': - modules_manager = IntakeManager() - loop = asyncio.get_event_loop() - loop.run_until_complete(modules_manager.run_intake()) diff --git a/listimport/libs/helpers.py b/listimport/libs/helpers.py deleted file mode 100644 index f7c4ce3..0000000 --- a/listimport/libs/helpers.py +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -from pathlib import Path -from .exceptions import CreateDirectoryException - - -def safe_create_dir(to_create: Path): - if to_create.exists() and not to_create.is_dir(): - raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create)) - os.makedirs(to_create, exist_ok=True) diff --git a/listimport/risfetcher.py b/listimport/risfetcher.py deleted file mode 100644 index 6c8513a..0000000 --- a/listimport/risfetcher.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging -from redis import Redis - -import time -import pytricia -import ipaddress - - -class RISPrefixLookup(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.__init_logger(loglevel) - self.logger.debug('Starting RIS Prefix fetcher') - self.prefix_db = Redis(host='localhost', port=6582, db=0, decode_responses=True) - self.longest_prefix_matching = Redis(host='localhost', port=6581, db=0, decode_responses=True) - self.tree_v4 = pytricia.PyTricia() - self.tree_v6 = pytricia.PyTricia(128) - self.init_tree() - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) - self.logger.setLevel(loglevel) - - def cache_prefix(self, ip, prefix, asns): - p = self.longest_prefix_matching.pipeline() - p.hmset(ip, {'asn': asns, 'prefix': prefix}) - p.expire(ip, 43200) # 12H - p.execute() - - def init_tree(self): - for asn in self.prefix_db.smembers('asns'): - for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v4')): - self.tree_v4[prefix] = asn - for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v6')): - self.tree_v6[prefix] = asn - - def run(self): - while True: - ip = self.longest_prefix_matching.spop('for_ris_lookup') - if not ip: # TODO: add a check against something to stop the loop - self.logger.debug('Nothing to lookup') - time.sleep(10) - continue - if self.longest_prefix_matching.exists(ip): - self.logger.debug('Already cached: {}'.format(ip)) - continue - ip = ipaddress.ip_address(ip) - if ip.version == 4: - prefix = self.tree_v4.get_key(ip) - asns = self.tree_v4.get(ip) - else: - prefix = self.tree_v6.get_key(ip) - asns = self.tree_v6.get(ip) - if not prefix: - self.logger.warning('The IP {} does not seem to be announced'.format(ip)) - continue - self.cache_prefix(ip, prefix, asns) diff --git a/listimport/sanitizer.py b/listimport/sanitizer.py deleted file mode 100644 index 176049a..0000000 --- a/listimport/sanitizer.py +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from dateutil import parser -import logging -from redis import Redis - -import ipaddress - - -class Sanitizer(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.__init_logger(loglevel) - self.redis_intake = Redis(host='localhost', port=6579, db=0, decode_responses=True) - self.redis_sanitized = Redis(host='localhost', port=6580, db=0, decode_responses=True) - self.ris_cache = Redis(host='localhost', port=6581, db=0, decode_responses=True) - self.logger.debug('Starting import') - - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) - self.logger.setLevel(loglevel) - - async def sanitize(self): - while True: - uuid = self.redis_intake.spop('intake') - if not uuid: - break - data = self.redis_intake.hgetall(uuid) - try: - ip = ipaddress.ip_address(data['ip']) - except ValueError: - self.logger.info('Invalid IP address: {}'.format(data['ip'])) - continue - if not ip.is_global: - self.logger.info('The IP address {} is not global'.format(data['ip'])) - continue - - date = parser.parse(data['datetime']).date().isoformat() - # NOTE: to consider: discard data with an old timestamp (define old) - - # Add to temporay DB for further processing - self.ris_cache.sadd('for_ris_lookup', str(ip)) - pipeline = self.redis_sanitized.pipeline() - pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], - 'date': date, 'datetime': data['datetime']}) - pipeline.sadd('to_insert', uuid) - pipeline.execute() - self.redis_intake.delete(uuid) diff --git a/ranking.py b/ranking.py deleted file mode 100755 index b194f5e..0000000 --- a/ranking.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging -from listimport.initranking import PrefixDatabase - - -logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', - level=logging.INFO, datefmt='%I:%M:%S') - - -class RankingManager(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.prefix_db = PrefixDatabase(loglevel=loglevel) - - def load_prefixes(self): - self.prefix_db.load_prefixes() - - -if __name__ == '__main__': - rm = RankingManager() - rm.load_prefixes() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..968d863 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +git+https://github.com/andymccurdy/redis-py.git +python-dateutil +git+https://github.com/jsommers/pytricia.git +git+https://github.com/trbs/pid.git diff --git a/ris.py b/ris.py deleted file mode 100755 index 7c00da2..0000000 --- a/ris.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging -from listimport.risfetcher import RISPrefixLookup - -logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', - level=logging.INFO, datefmt='%I:%M:%S') - - -class RISManager(): - - def __init__(self, loglevel: int=logging.DEBUG): - self.ris_fetcher = RISPrefixLookup(loglevel=loglevel) - - def run_fetcher(self): - self.ris_fetcher.run() - - -if __name__ == '__main__': - modules_manager = RISManager() - modules_manager.run_fetcher() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..d80b69c --- /dev/null +++ b/setup.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from setuptools import setup + + +setup( + name='bgpranking', + version='0.1', + author='Raphaël Vinot', + author_email='raphael.vinot@circl.lu', + maintainer='Raphaël Vinot', + url='https://github.com/D4-project/BGP-Ranking', + description='BGP Ranking, the new one..', + packages=['bgpranking'], + scripts=['bin/archiver.py', 'bin/dbinsert.py', 'bin/fetcher.py', 'bin/parser.py', + 'bin/loadprefixes.py', 'bin/rislookup.py', 'bin/sanitizer.py', 'bin/run_backend.py', + 'bin/monitor.py', 'bin/start.py', 'bin/stop.py', 'bin/shutdown.py'], + classifiers=[ + 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', + 'Development Status :: 3 - Alpha', + 'Environment :: Console', + 'Operating System :: POSIX :: Linux', + 'Intended Audience :: Science/Research', + 'Intended Audience :: Telecommunications Industry', + 'Intended Audience :: Information Technology', + 'Programming Language :: Python :: 3', + 'Topic :: Security', + 'Topic :: Internet', + ], + include_package_data=True, + package_data={'config': ['config/*/*.conf', + 'config/modules/*.json']}, +) diff --git a/storage/ardb.conf b/storage/ardb.conf old mode 100755 new mode 100644