diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/archive.py b/archive.py new file mode 100755 index 0000000..8fa76c3 --- /dev/null +++ b/archive.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from listimport.archive import DeepArchive +import logging +from pathlib import Path + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class ModulesArchiver(): + + def __init__(self, config_dir: Path=Path('listimport', 'modules_config'), + storage_directory: Path=Path('rawdata'), + loglevel: int=logging.INFO): + self.config_dir = config_dir + self.storage_directory = storage_directory + self.loglevel = loglevel + self.modules_paths = [modulepath for modulepath in self.config_dir.glob('*.json')] + self.modules = [DeepArchive(path, self.storage_directory, self.loglevel) + for path in self.modules_paths] + + def archive(self): + [module.archive() for module in self.modules] + + +if __name__ == '__main__': + archiver = ModulesArchiver() + archiver.archive() diff --git a/dbinsert.py b/dbinsert.py new file mode 100755 index 0000000..a2d42bd --- /dev/null +++ b/dbinsert.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from listimport.dbinsert import DatabaseInsert + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class DBInsertManager(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.loglevel = loglevel + self.dbinsert = DatabaseInsert(loglevel) + + async def run_insert(self): + await asyncio.gather(self.dbinsert.insert()) + + +if __name__ == '__main__': + modules_manager = DBInsertManager() + loop = asyncio.get_event_loop() + loop.run_until_complete(modules_manager.run_insert()) diff --git a/fetcher.py b/fetcher.py new file mode 100755 index 0000000..96bb64d --- /dev/null +++ b/fetcher.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from pathlib import Path + +from listimport.modulesfetcher import Fetcher + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class ModulesManager(): + + def __init__(self, config_dir: Path=Path('listimport', 'modules_config'), + storage_directory: Path=Path('rawdata'), + loglevel: int=logging.DEBUG): + self.config_dir = config_dir + print(config_dir) + self.storage_directory = storage_directory + self.loglevel = loglevel + self.modules_paths = [modulepath for modulepath in self.config_dir.glob('*.json')] + self.modules = [Fetcher(path, self.storage_directory, self.loglevel) + for path in self.modules_paths] + + async def run_fetchers(self): + await asyncio.gather( + *[module.fetch_list() for module in self.modules if module.fetcher] + ) + + +if __name__ == '__main__': + modules_manager = ModulesManager() + loop = asyncio.get_event_loop() + loop.run_until_complete(modules_manager.run_fetchers()) diff --git a/intake.py b/intake.py new file mode 100755 index 0000000..22f18d5 --- /dev/null +++ b/intake.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from pathlib import Path +from listimport.parser import RawFilesParser + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class IntakeManager(): + + def __init__(self, config_dir: Path=Path('listimport', 'modules_config'), + storage_directory: Path=Path('rawdata'), + loglevel: int=logging.DEBUG): + self.config_dir = config_dir + self.storage_directory = storage_directory + self.loglevel = loglevel + self.modules_paths = [modulepath for modulepath in self.config_dir.glob('*.json')] + self.modules = [RawFilesParser(path, self.storage_directory, self.loglevel) + for path in self.modules_paths] + + async def run_intake(self): + await asyncio.gather( + *[module.parse_raw_files() for module in self.modules] + ) + + +if __name__ == '__main__': + modules_manager = IntakeManager() + loop = asyncio.get_event_loop() + loop.run_until_complete(modules_manager.run_intake()) diff --git a/listimport/__init__.py b/listimport/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/listimport/archive.py b/listimport/archive.py new file mode 100644 index 0000000..6a0f49b --- /dev/null +++ b/listimport/archive.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil import parser +from datetime import date +from pathlib import Path +from dateutil.relativedelta import relativedelta +from collections import defaultdict +import zipfile +import logging +import json + +from .libs.helpers import safe_create_dir + + +class DeepArchive(): + + def __init__(self, config_file: Path, storage_directory: Path, + loglevel: int=logging.DEBUG): + '''Archive everyfile older than 2 month.''' + with open(config_file, 'r') as f: + module_parameters = json.load(f) + self.vendor = module_parameters['vendor'] + self.listname = module_parameters['name'] + self.directory = storage_directory / self.vendor / self.listname / 'archive' + safe_create_dir(self.directory) + self.deep_archive = self.directory / 'deep' + safe_create_dir(self.deep_archive) + self.__init_logger(loglevel) + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, + self.vendor, self.listname)) + self.logger.setLevel(loglevel) + + def archive(self): + to_archive = defaultdict(list) + today = date.today() + last_day_to_keep = date(today.year, today.month, 1) - relativedelta(months=2) + for p in self.directory.iterdir(): + if not p.is_file(): + continue + filedate = parser.parse(p.name.split('.')[0]).date() + if filedate >= last_day_to_keep: + continue + to_archive['{}.zip'.format(filedate.strftime('%Y%m'))].append(p) + if to_archive: + self.logger.info('Found old files. Archiving: {}'.format(', '.join(to_archive.keys()))) + else: + self.logger.debug('No old files.') + for archivename, path_list in to_archive.items(): + with zipfile.ZipFile(self.deep_archive / archivename, 'x', zipfile.ZIP_DEFLATED) as z: + for f in path_list: + z.write(f, f.name) + # Delete all the files if the archiving worked out properly + [f.unlink() for f in path_list] diff --git a/listimport/dbinsert.py b/listimport/dbinsert.py new file mode 100644 index 0000000..a138aba --- /dev/null +++ b/listimport/dbinsert.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from redis import Redis +from redis import StrictRedis + + +class DatabaseInsert(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.ardb_storage = StrictRedis(host='localhost', port=16379, decode_responses=True) + self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True) + self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) + self.logger.debug('Starting import') + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + async def insert(self): + while True: + uuid = self.redis_sanitized.spop('to_insert') + if not uuid: + break + data = self.redis_sanitized.hgetall(uuid) + # Data gathered from the RIS queries: + # * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo + # * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo + # * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview + ris_entry = self.ris_cache.hgetall(data['ip']) + if not ris_entry: + # RIS data not available yet, retry later + # FIXME: an IP can sometimes not be announced, we need to discard it + self.redis_sanitized.sadd('to_insert', uuid) + # In case this IP is missing in the set to process + self.ris_cache.sadd('for_ris_lookup', data['ip']) + continue + # Format: |sources -> set([, ...]) + self.ardb_storage.sadd('{}|sources'.format(data['date']), data['source']) + + # Format: | -> set([, ...]) + self.ardb_storage.sadd('{}|{}'.format(data['date'], data['source']), + ris_entry['asn']) + # Format: || -> set([, ...]) + self.ardb_storage.sadd('{}|{}|{}'.format(data['date'], data['source'], ris_entry['asn']), + ris_entry['prefix']) + + # Format: ||| -> set([|, ...]) + self.ardb_storage.sadd('{}|{}|{}|{}'.format(data['date'], data['source'], + ris_entry['asn'], + ris_entry['prefix']), + '{}|{}'.format(data['ip'], data['datetime'])) + self.redis_sanitized.delete(uuid) diff --git a/listimport/libs/StatsRipe.py b/listimport/libs/StatsRipe.py new file mode 100644 index 0000000..b0354c1 --- /dev/null +++ b/listimport/libs/StatsRipe.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import requests + + +class StatsRIPE(): + + def __init__(self, sourceapp='bgpranking-ng - CIRCL'): + self.url = "https://stat.ripe.net/data/{method}/data.json?{parameters}" + self.url_parameters = {'sourceapp': sourceapp} + + async def network_info(self, ip: str) -> dict: + method = 'network-info' + self.url_parameters['resource'] = ip + parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()]) + url = self.url.format(method=method, parameters=parameters) + response = requests.get(url) + return response.json() + + async def prefix_overview(self, prefix: str) -> dict: + method = 'prefix-overview' + self.url_parameters['resource'] = prefix + parameters = '&'.join(['='.join(item) for item in self.url_parameters.items()]) + url = self.url.format(method=method, parameters=parameters) + response = requests.get(url) + return response.json() diff --git a/listimport/libs/__init__.py b/listimport/libs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/listimport/libs/exceptions.py b/listimport/libs/exceptions.py new file mode 100644 index 0000000..fa53aa6 --- /dev/null +++ b/listimport/libs/exceptions.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +class BGPRankingException(Exception): + pass + + +class FetcherException(BGPRankingException): + pass + + +class ArchiveException(BGPRankingException): + pass + + +class CreateDirectoryException(BGPRankingException): + pass diff --git a/listimport/libs/helpers.py b/listimport/libs/helpers.py new file mode 100644 index 0000000..f7c4ce3 --- /dev/null +++ b/listimport/libs/helpers.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +from pathlib import Path +from .exceptions import CreateDirectoryException + + +def safe_create_dir(to_create: Path): + if to_create.exists() and not to_create.is_dir(): + raise CreateDirectoryException('The path {} already exists and is not a directory'.format(to_create)) + os.makedirs(to_create, exist_ok=True) diff --git a/listimport/modules_config/Alienvault.json b/listimport/modules_config/Alienvault.json new file mode 100644 index 0000000..571e055 --- /dev/null +++ b/listimport/modules_config/Alienvault.json @@ -0,0 +1,6 @@ +{ + "url": "http://reputation.alienvault.com/reputation.generic", + "vendor": "alienvault", + "name": "reputation.generic", + "impact": 0.01 +} diff --git a/listimport/modules_config/BlocklistDeApache.json b/listimport/modules_config/BlocklistDeApache.json new file mode 100644 index 0000000..ee89b1c --- /dev/null +++ b/listimport/modules_config/BlocklistDeApache.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/apache.txt", + "vendor": "blocklist_de", + "name": "apache", + "impact": 0.1 +} diff --git a/listimport/modules_config/BlocklistDeBots.json b/listimport/modules_config/BlocklistDeBots.json new file mode 100644 index 0000000..8f3bd12 --- /dev/null +++ b/listimport/modules_config/BlocklistDeBots.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/bots.txt", + "vendor": "blocklist_de", + "name": "bots", + "impact": 3 +} diff --git a/listimport/modules_config/BlocklistDeFTP.json b/listimport/modules_config/BlocklistDeFTP.json new file mode 100644 index 0000000..d1d7a59 --- /dev/null +++ b/listimport/modules_config/BlocklistDeFTP.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/ftp.txt", + "vendor": "blocklist_de", + "name": "ftp", + "impact": 3 +} diff --git a/listimport/modules_config/BlocklistDeIMAP.json b/listimport/modules_config/BlocklistDeIMAP.json new file mode 100644 index 0000000..5dd8be5 --- /dev/null +++ b/listimport/modules_config/BlocklistDeIMAP.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/imap.txt", + "vendor": "blocklist_de", + "name": "imap", + "impact": 3 +} diff --git a/listimport/modules_config/BlocklistDeMail.json b/listimport/modules_config/BlocklistDeMail.json new file mode 100644 index 0000000..9c26e63 --- /dev/null +++ b/listimport/modules_config/BlocklistDeMail.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/mail.txt", + "vendor": "blocklist_de", + "name": "mail", + "impact": 0.1 +} diff --git a/listimport/modules_config/BlocklistDeSIP.json b/listimport/modules_config/BlocklistDeSIP.json new file mode 100644 index 0000000..e3f7d50 --- /dev/null +++ b/listimport/modules_config/BlocklistDeSIP.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/sip.txt", + "vendor": "blocklist_de", + "name": "sip", + "impact": 3 +} diff --git a/listimport/modules_config/BlocklistDeSSH.json b/listimport/modules_config/BlocklistDeSSH.json new file mode 100644 index 0000000..d4e827c --- /dev/null +++ b/listimport/modules_config/BlocklistDeSSH.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/ssh.txt", + "vendor": "blocklist_de", + "name": "ssh", + "impact": 3 +} diff --git a/listimport/modules_config/BlocklistDeStrong.json b/listimport/modules_config/BlocklistDeStrong.json new file mode 100644 index 0000000..ecb3569 --- /dev/null +++ b/listimport/modules_config/BlocklistDeStrong.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.blocklist.de/lists/strongips.txt", + "vendor": "blocklist_de", + "name": "strong", + "impact": 6 +} diff --git a/listimport/modules_config/CIArmy.json b/listimport/modules_config/CIArmy.json new file mode 100644 index 0000000..fdd551f --- /dev/null +++ b/listimport/modules_config/CIArmy.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.ciarmy.com/list/ci-badguys.txt", + "vendor": "ciarmy", + "name": "ip", + "impact": 5 +} diff --git a/listimport/modules_config/CleanMXMalwares.json b/listimport/modules_config/CleanMXMalwares.json new file mode 100644 index 0000000..b31f01f --- /dev/null +++ b/listimport/modules_config/CleanMXMalwares.json @@ -0,0 +1,5 @@ +{ + "vendor": "cleanmx", + "name": "malwares", + "impact": 5 +} diff --git a/listimport/modules_config/CleanMXPhishing.json b/listimport/modules_config/CleanMXPhishing.json new file mode 100644 index 0000000..e0e02c1 --- /dev/null +++ b/listimport/modules_config/CleanMXPhishing.json @@ -0,0 +1,5 @@ +{ + "vendor": "cleanmx", + "name": "phishing", + "impact": 5 +} diff --git a/listimport/modules_config/CleanMXPortals.json b/listimport/modules_config/CleanMXPortals.json new file mode 100644 index 0000000..3e4e4f0 --- /dev/null +++ b/listimport/modules_config/CleanMXPortals.json @@ -0,0 +1,5 @@ +{ + "vendor": "cleanmx", + "name": "portals", + "impact": 5 +} diff --git a/listimport/modules_config/DshieldDaily.json b/listimport/modules_config/DshieldDaily.json new file mode 100644 index 0000000..91e5398 --- /dev/null +++ b/listimport/modules_config/DshieldDaily.json @@ -0,0 +1,7 @@ +{ + "url": "http://www.dshield.org/feeds/daily_sources", + "vendor": "dshield", + "name": "daily", + "impact": 0.1, + "parser": "parsers.dshield" +} diff --git a/listimport/modules_config/DshieldTopIPs.json b/listimport/modules_config/DshieldTopIPs.json new file mode 100644 index 0000000..948ff53 --- /dev/null +++ b/listimport/modules_config/DshieldTopIPs.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.dshield.org/feeds/topips.txt", + "vendor": "dshield", + "name": "topips", + "impact": 1 +} diff --git a/listimport/modules_config/EmergingThreatsCompromized.json b/listimport/modules_config/EmergingThreatsCompromized.json new file mode 100644 index 0000000..1424cbb --- /dev/null +++ b/listimport/modules_config/EmergingThreatsCompromized.json @@ -0,0 +1,6 @@ +{ + "url": "http://rules.emergingthreats.net/blockrules/compromised-ips.txt", + "vendor": "emergingthreats", + "name": "compromized", + "impact": 5 +} diff --git a/listimport/modules_config/FeodotrackerIPBlockList.json b/listimport/modules_config/FeodotrackerIPBlockList.json new file mode 100644 index 0000000..dd7a4e5 --- /dev/null +++ b/listimport/modules_config/FeodotrackerIPBlockList.json @@ -0,0 +1,7 @@ +{ + "url": "https://feodotracker.abuse.ch/blocklist/?download=ipblocklist", + "vendor": "feodotracker", + "name": "ipblocklist", + "impact": 5, + "parser": ".parsers.abusech" +} diff --git a/listimport/modules_config/Malc0de.json b/listimport/modules_config/Malc0de.json new file mode 100644 index 0000000..fc5991b --- /dev/null +++ b/listimport/modules_config/Malc0de.json @@ -0,0 +1,7 @@ +{ + "url": "http://malc0de.com/bl/IP_Blacklist.txt", + "vendor": "malc0de", + "name": "blocklist", + "impact": 5, + "parser": ".parsers.malc0de" +} diff --git a/listimport/modules_config/MalwareDomainListIP.json b/listimport/modules_config/MalwareDomainListIP.json new file mode 100644 index 0000000..53987dc --- /dev/null +++ b/listimport/modules_config/MalwareDomainListIP.json @@ -0,0 +1,6 @@ +{ + "url": "http://www.malwaredomainlist.com/hostslist/ip.txt", + "vendor": "malwaredomainlist", + "name": "ip", + "impact": 5 +} diff --git a/listimport/modules_config/NothinkSNMP.json b/listimport/modules_config/NothinkSNMP.json new file mode 100644 index 0000000..80cf8b6 --- /dev/null +++ b/listimport/modules_config/NothinkSNMP.json @@ -0,0 +1,7 @@ +{ + "url": "http://www.nothink.org/blacklist/blacklist_snmp_day.txt", + "vendor": "nothink", + "name": "snmp", + "impact": 5, + "parser": ".parsers.nothink" +} diff --git a/listimport/modules_config/NothinkSSH.json b/listimport/modules_config/NothinkSSH.json new file mode 100644 index 0000000..002ff94 --- /dev/null +++ b/listimport/modules_config/NothinkSSH.json @@ -0,0 +1,7 @@ +{ + "url": "http://www.nothink.org/blacklist/blacklist_ssh_day.txt", + "vendor": "nothink", + "name": "ssh", + "impact": 5, + "parser": ".parsers.nothink" +} diff --git a/listimport/modules_config/NothinkTelnet.json b/listimport/modules_config/NothinkTelnet.json new file mode 100644 index 0000000..d8a1411 --- /dev/null +++ b/listimport/modules_config/NothinkTelnet.json @@ -0,0 +1,7 @@ +{ + "url": "http://www.nothink.org/blacklist/blacklist_telnet_day.txt", + "vendor": "nothink", + "name": "telnet", + "impact": 5, + "parser": ".parsers.nothink" +} diff --git a/listimport/modules_config/PalevotrackerIPBlockList.json b/listimport/modules_config/PalevotrackerIPBlockList.json new file mode 100644 index 0000000..89c1574 --- /dev/null +++ b/listimport/modules_config/PalevotrackerIPBlockList.json @@ -0,0 +1,6 @@ +{ + "url": "https://palevotracker.abuse.ch/blocklists.php?download=ipblocklist", + "vendor": "palevotracker", + "name": "ipblocklist", + "impact": 5 +} diff --git a/listimport/modules_config/RansomwareIPBlockList.json b/listimport/modules_config/RansomwareIPBlockList.json new file mode 100644 index 0000000..e2374b0 --- /dev/null +++ b/listimport/modules_config/RansomwareIPBlockList.json @@ -0,0 +1,7 @@ +{ + "url": "https://ransomwaretracker.abuse.ch/downloads/RW_IPBL.txt", + "vendor": "ransomwaretracker", + "name": "ipblocklist", + "impact": 7, + "parser": ".parsers.abusech" +} diff --git a/listimport/modules_config/SSHBlackListBase.json b/listimport/modules_config/SSHBlackListBase.json new file mode 100644 index 0000000..a46d110 --- /dev/null +++ b/listimport/modules_config/SSHBlackListBase.json @@ -0,0 +1,6 @@ +{ + "url": "https://www.openbl.org/lists/base.txt", + "vendor": "sshbl", + "name": "base", + "impact": 5 +} diff --git a/listimport/modules_config/ZeustrackerIPBlockList.json b/listimport/modules_config/ZeustrackerIPBlockList.json new file mode 100644 index 0000000..3c4f65b --- /dev/null +++ b/listimport/modules_config/ZeustrackerIPBlockList.json @@ -0,0 +1,6 @@ +{ + "url": "https://zeustracker.abuse.ch/blocklist.php?download=ipblocklist", + "vendor": "zeustracker", + "name": "ipblocklist", + "impact": 5 +} diff --git a/listimport/modules_config/jq_all_the_things.sh b/listimport/modules_config/jq_all_the_things.sh new file mode 100755 index 0000000..b1d0d50 --- /dev/null +++ b/listimport/modules_config/jq_all_the_things.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -e +set -x + +# Seeds sponge, from moreutils + +for dir in ./*.json +do + cat ${dir} | jq . | sponge ${dir} +done diff --git a/listimport/modules_config/module.schema b/listimport/modules_config/module.schema new file mode 100644 index 0000000..6f190ba --- /dev/null +++ b/listimport/modules_config/module.schema @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/schema#", + "title": "BGP Ranking NG module", + "id": "https://www.github.com/CIRCL/bgpranking-ng/modules.json", + "type": "object", + "additionalProperties": false, + "properties": { + "url": { + "type": "string" + }, + "vendor": { + "type": "string" + }, + "name": { + "type": "string" + }, + "impact": { + "type": "number" + }, + "parser": { + "type": "string" + } + }, + "required": [ + "name", + "vendor", + "impact" + ] +} diff --git a/listimport/modules_config/tmp/DshieldDaily.json b/listimport/modules_config/tmp/DshieldDaily.json new file mode 100644 index 0000000..91e5398 --- /dev/null +++ b/listimport/modules_config/tmp/DshieldDaily.json @@ -0,0 +1,7 @@ +{ + "url": "http://www.dshield.org/feeds/daily_sources", + "vendor": "dshield", + "name": "daily", + "impact": 0.1, + "parser": "parsers.dshield" +} diff --git a/listimport/modules_config/validate_all.sh b/listimport/modules_config/validate_all.sh new file mode 100755 index 0000000..663dc05 --- /dev/null +++ b/listimport/modules_config/validate_all.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +set -e +set -x + +# remove the exec flag on the json files +find -name "*.json" -exec chmod -x "{}" \; + +diffs=`git status --porcelain | wc -l` + +if ! [ $diffs -eq 0 ]; then + echo "Please make sure you run remove the executable flag on the json files before commiting: find -name "*.json" -exec chmod -x \"{}\" \\;" + # exit 1 +fi + +./jq_all_the_things.sh + +diffs=`git status --porcelain | wc -l` + +if ! [ $diffs -eq 0 ]; then + echo "Please make sure you run ./jq_all_the_things.sh before commiting." + # exit 1 +fi + +for dir in ./*.json +do + echo -n "${dir}: " + jsonschema -i ${dir} module.schema + echo '' +done diff --git a/listimport/modulesfetcher.py b/listimport/modulesfetcher.py new file mode 100644 index 0000000..22360e0 --- /dev/null +++ b/listimport/modulesfetcher.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import requests +from dateutil import parser +from datetime import datetime, date +from hashlib import sha512 # Faster than sha256 on 64b machines. +from pathlib import Path +import logging +import asyncio +from pid import PidFile, PidFileError +import json + +from .libs.helpers import safe_create_dir + + +class Fetcher(): + + def __init__(self, config_file: Path, storage_directory: Path, + loglevel: int=logging.DEBUG): + '''Load `config_file`, and store the fetched data into `storage_directory` + Note: if the `config_file` does not provide a URL (the file is + gathered by some oter mean), the fetcher is automatically stoped.''' + with open(config_file, 'r') as f: + module_parameters = json.load(f) + self.vendor = module_parameters['vendor'] + self.listname = module_parameters['name'] + self.__init_logger(loglevel) + self.fetcher = True + if 'url' not in module_parameters: + self.logger.info('No URL to fetch, breaking.') + self.fetcher = False + return + self.url = module_parameters['url'] + self.logger.debug('Starting fetcher on {}'.format(self.url)) + self.directory = storage_directory / self.vendor / self.listname + safe_create_dir(self.directory) + self.meta = self.directory / 'meta' + safe_create_dir(self.meta) + self.archive_dir = self.directory / 'archive' + safe_create_dir(self.archive_dir) + self.first_fetch = True + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, + self.vendor, self.listname)) + self.logger.setLevel(loglevel) + + def __get_last_modified(self): + r = requests.head(self.url) + if 'Last-Modified' in r.headers: + return parser.parse(r.headers['Last-Modified']) + return None + + def __newer(self): + '''Check if the file available for download is newed than the one + already downloaded by checking the `Last-Modified` header. + Note: return False if the file containing the last header content + is not existing, or the header doesn't have this key. + ''' + last_modified_path = self.meta / 'lastmodified' + if not last_modified_path.exists(): + # The file doesn't exists + if not self.first_fetch: + # The URL has no Last-Modified header, we cannot use it. + self.logger.debug('No Last-Modified header available') + return True + self.first_fetch = False + last_modified = self.__get_last_modified() + if last_modified: + self.logger.debug('Last-Modified header available') + with last_modified_path.open('w') as f: + f.write(last_modified.isoformat()) + else: + self.logger.debug('No Last-Modified header available') + return True + with last_modified_path.open() as f: + last_modified_file = parser.parse(f.read()) + last_modified = self.__get_last_modified() + if not last_modified: + # No more Last-Modified header Oo + self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname)) + last_modified_path.unlink() + return True + if last_modified > last_modified_file: + self.logger.info('Got a new file.') + with last_modified_path.open('w') as f: + f.write(last_modified.isoformat()) + return True + return False + + def __same_as_last(self, downloaded): + '''Figure out the last downloaded file, check if it is the same as the + newly downloaded one. Returns true if both files have been downloaded the + same day. + Note: we check the new and the archive directory because we may have backlog + and the newest file is always the first one we process + ''' + to_check = [] + to_check_new = sorted([f for f in self.directory.iterdir() if f.is_file()]) + if to_check_new: + # we have files waiting to be processed + self.logger.debug('{} file(s) are waiting to be processed'.format(len(to_check_new))) + to_check.append(to_check_new[-1]) + to_check_archive = sorted([f for f in self.archive_dir.iterdir() if f.is_file()]) + if to_check_archive: + # we have files already processed, in the archive + self.logger.debug('{} file(s) have been processed'.format(len(to_check_archive))) + to_check.append(to_check_archive[-1]) + if not to_check: + self.logger.debug('New list, no hisorical files') + # nothing has been downloaded ever, moving on + return False + for last_file in to_check: + with last_file.open('rb') as f: + last_hash = sha512(f.read()) + dl_hash = sha512(downloaded) + if (dl_hash.digest() == last_hash.digest() and + parser.parse(last_file.name.split('.')[0]).date() == date.today()): + self.logger.debug('Same file already downloaded today.') + return True + return False + + @asyncio.coroutine + async def fetch_list(self): + '''Fetch & store the list''' + if not self.fetcher: + return + try: + with PidFile('{}.pid'.format(self.listname), piddir=self.meta): + if not self.__newer(): + return + r = requests.get(self.url) + if self.__same_as_last(r.content): + return + self.logger.info('Got a new file \o/') + with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: + f.write(r.content) + except PidFileError: + self.logger.info('Fetcher already running') diff --git a/listimport/parser.py b/listimport/parser.py new file mode 100644 index 0000000..c9945d2 --- /dev/null +++ b/listimport/parser.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from datetime import datetime +from pathlib import Path +import logging +import asyncio +import json +import re +from redis import Redis +from uuid import uuid4 +from io import BytesIO +import importlib + +from typing import List +import types + +from .libs.helpers import safe_create_dir + + +class RawFilesParser(): + + def __init__(self, config_file: Path, storage_directory: Path, + loglevel: int=logging.DEBUG): + with open(config_file, 'r') as f: + module_parameters = json.load(f) + self.vendor = module_parameters['vendor'] + self.listname = module_parameters['name'] + if 'parser' in module_parameters: + self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser'], 'listimport').parse_raw_file, self) + self.source = '{}-{}'.format(self.vendor, self.listname) + self.directory = storage_directory / self.vendor / self.listname + safe_create_dir(self.directory) + self.__init_logger(loglevel) + self.redis_intake = Redis(host='localhost', port=6379, db=0) + self.logger.debug('Starting intake on {}'.format(self.source)) + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}-{}-{}'.format(self.__class__.__name__, + self.vendor, self.listname)) + self.logger.setLevel(loglevel) + + @property + def files_to_parse(self) -> List[Path]: + return sorted([f for f in self.directory.iterdir() if f.is_file()], reverse=True) + + def extract_ipv4(self, bytestream: bytes) -> List[bytes]: + return re.findall(rb'[0-9]+(?:\.[0-9]+){3}', bytestream) + + def parse_raw_file(self, f: BytesIO): + self.datetime = datetime.now() + return self.extract_ipv4(f.getvalue()) + + @asyncio.coroutine + async def parse_raw_files(self): + for filepath in self.files_to_parse: + self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1)) + with open(filepath, 'rb') as f: + to_parse = BytesIO(f.read()) + p = self.redis_intake.pipeline() + for ip in self.parse_raw_file(to_parse): + uuid = uuid4() + p.hmset(uuid, {'ip': ip, 'source': self.source, + 'datetime': self.datetime.isoformat()}) + p.sadd('intake', uuid) + p.execute() + self._archive(filepath) + + def _archive(self, filepath: Path): + '''After processing, move file to the archive directory''' + filepath.rename(self.directory / 'archive' / filepath.name) diff --git a/listimport/parsers/__init__.py b/listimport/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/listimport/parsers/abusech.py b/listimport/parsers/abusech.py new file mode 100644 index 0000000..c4bede5 --- /dev/null +++ b/listimport/parsers/abusech.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil.parser import parse +import re +from io import BytesIO + + +def parse_raw_file(self, f: BytesIO): + self.datetime = parse(re.findall(b'# Generated on (.*)#\n', f.getvalue())[0]) + return self.extract_ipv4(f.getvalue()) diff --git a/listimport/parsers/default.py b/listimport/parsers/default.py new file mode 100644 index 0000000..11b22f7 --- /dev/null +++ b/listimport/parsers/default.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from io import BytesIO +from datetime import datetime + +from ..simple_feed_fetcher import RawFileImporter + + +class DefaultImporter(RawFileImporter): + + def parse_raw_file(self, f: BytesIO): + self.datetime = datetime.now() + return self.extract_ipv4(f.getvalue()) diff --git a/listimport/parsers/dshield.py b/listimport/parsers/dshield.py new file mode 100644 index 0000000..5416595 --- /dev/null +++ b/listimport/parsers/dshield.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil.parser import parse +import re +from io import BytesIO + + +def parse_raw_file(self, f: BytesIO): + self.datetime = parse(re.findall(b'# updated (.*)\n', f.getvalue())[0]) + return self.extract_ipv4(f.getvalue()) diff --git a/listimport/parsers/malc0de.py b/listimport/parsers/malc0de.py new file mode 100644 index 0000000..db2d217 --- /dev/null +++ b/listimport/parsers/malc0de.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil.parser import parse +import re +from io import BytesIO + + +def parse_raw_file(self, f: BytesIO): + self.datetime = parse(re.findall(b'// Last updated (.*)\n', f.getvalue())[0]) + return self.extract_ipv4(f.getvalue()) diff --git a/listimport/parsers/nothink.py b/listimport/parsers/nothink.py new file mode 100644 index 0000000..3f3ef01 --- /dev/null +++ b/listimport/parsers/nothink.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil.parser import parse +import re +from io import BytesIO + + +def parse_raw_file(self, f: BytesIO): + self.datetime = parse(re.findall(b'# Generated (.*)\n', f.getvalue())[0]) + return self.extract_ipv4(f.getvalue()) diff --git a/listimport/risfetcher.py b/listimport/risfetcher.py new file mode 100644 index 0000000..acecb71 --- /dev/null +++ b/listimport/risfetcher.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from redis import Redis + +from .libs.StatsRipe import StatsRIPE + + +class RoutingInformationServiceFetcher(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.ris_cache = Redis(host='localhost', port=6381, db=0) + self.logger.debug('Starting RIS fetcher') + self.ripe = StatsRIPE() + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + async def fetch(self): + while True: + ip = self.ris_cache.spop('for_ris_lookup') + if not ip: + break + ip = ip.decode() + network_info = await self.ripe.network_info(ip) + prefix = network_info['data']['prefix'] + asns = network_info['data']['asns'] + if not asns or not prefix: + self.logger.warning('The IP {} does not seem to be announced'.format(ip)) + continue + prefix_overview = await self.ripe.prefix_overview(prefix) + description = prefix_overview['data']['block']['desc'] + if not description: + description = prefix_overview['data']['block']['name'] + for asn in asns: + self.ris_cache.hmset(ip, {'asn': asn, 'prefix': prefix, + 'description': description}) diff --git a/listimport/sanitizer.py b/listimport/sanitizer.py new file mode 100644 index 0000000..39dde8c --- /dev/null +++ b/listimport/sanitizer.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from dateutil import parser +import logging +from redis import Redis + +import ipaddress + + +class Sanitizer(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.redis_intake = Redis(host='localhost', port=6379, db=0, decode_responses=True) + self.redis_sanitized = Redis(host='localhost', port=6380, db=0, decode_responses=True) + self.ris_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) + self.logger.debug('Starting import') + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + async def sanitize(self): + while True: + uuid = self.redis_intake.spop('intake') + if not uuid: + break + data = self.redis_intake.hgetall(uuid) + try: + ip = ipaddress.ip_address(data['ip']) + except ValueError: + self.logger.info('Invalid IP address: {}'.format(data['ip'])) + continue + if not ip.is_global: + self.logger.info('The IP address {} is not global'.format(data['ip'])) + continue + + date = parser.parse(data['datetime']).date().isoformat() + # NOTE: to consider: discard data with an old timestamp (define old) + + # Add to temporay DB for further processing + self.ris_cache.sadd('for_ris_lookup', str(ip)) + pipeline = self.redis_sanitized.pipeline() + pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], + 'date': date, 'datetime': data['datetime']}) + pipeline.sadd('to_insert', uuid) + pipeline.execute() + self.redis_intake.delete(uuid) diff --git a/ris.py b/ris.py new file mode 100755 index 0000000..ac94864 --- /dev/null +++ b/ris.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from listimport.risfetcher import RoutingInformationServiceFetcher + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class RISManager(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.ris_fetcher = RoutingInformationServiceFetcher(loglevel) + + async def run_fetcher(self): + await asyncio.gather(self.ris_fetcher.fetch()) + + +if __name__ == '__main__': + modules_manager = RISManager() + loop = asyncio.get_event_loop() + loop.run_until_complete(modules_manager.run_fetcher()) diff --git a/sanitize.py b/sanitize.py new file mode 100755 index 0000000..b5ead6e --- /dev/null +++ b/sanitize.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from listimport.sanitizer import Sanitizer + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class SanitizerManager(): + + def __init__(self, loglevel: int=logging.INFO): + self.loglevel = loglevel + self.sanitizer = Sanitizer(loglevel) + + async def run_sanitizer(self): + await asyncio.gather(self.sanitizer.sanitize()) + + +if __name__ == '__main__': + modules_manager = SanitizerManager() + loop = asyncio.get_event_loop() + loop.run_until_complete(modules_manager.run_sanitizer())