diff --git a/README.md b/README.md index fa1b94d..53996e2 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,24 @@ Creates the following hashes: IP = {'asn': , 'prefix': , 'description': } ``` +## Ranking Information cache (redis, port 6382) + +*Usage*: Store the current list of known ASNs at RIPE, and the prefixes originating from them. + +Creates the following sets: + +```python +asns = set([, ...]) +|v4 = set([, ...]) +|v6 = set([, ...]) +``` + +And the following keys: + +```python +|v4|ipcount = +|v6|ipcount = +``` ## Long term storage (ardb, port 16379) diff --git a/listimport/initranking.py b/listimport/initranking.py new file mode 100644 index 0000000..9a3376e --- /dev/null +++ b/listimport/initranking.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import json +from redis import Redis +import asyncio + +from telnetlib import Telnet + +from .libs.StatsRipeText import RIPECaching +from ipaddress import ip_network + + +class ASNLookup(RIPECaching): + + def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG): + super().__init__(sourceapp, loglevel) + self.redis_cache = Redis(host='localhost', port=6382, db=0, decode_responses=True) + self.logger.debug('Starting ASN lookup cache') + + def get_all_asns(self): + with Telnet(self.hostname, self.port) as tn: + tn.write(b'-k\n') + to_send = '-d ris-asns list_asns=true asn_types=o sourceapp={}\n'.format(self.sourceapp) + tn.write(to_send.encode()) + ris_asns = json.loads(tn.read_until(b'\n}\n')) + all_asns = ris_asns['asns']['originating'] + if not all_asns: + self.logger.warning('No ASNs in ris-asns, something went wrong.') + else: + self.redis_cache.sadd('asns', *all_asns) + self.redis_cache.sadd('asns_to_lookup', *all_asns) + tn.write(b'-k\n') + + def fix_ipv4_networks(self, networks): + '''Because we can't have nice things. + Some netorks come without the last(s) bytes (i.e. 170.254.25/24)''' + to_return = [] + for net in networks: + try: + to_return.append(ip_network(net)) + except ValueError: + ip, mask = net.split('/') + iplist = ip.split('.') + iplist = iplist + ['0'] * (4 - len(iplist)) + to_return.append(ip_network('{}/{}'.format('.'.join(iplist), mask))) + return to_return + + async def get_originating_prefixes(self): + reader, writer = await asyncio.open_connection(self.hostname, self.port) + writer.write(b'-k\n') + while True: + asn = self.redis_cache.spop('asns_to_lookup') + if not asn: + break + self.logger.debug('ASN lookup: {}'.format(asn)) + to_send = '-d ris-prefixes {} list_prefixes=true types=o af=v4,v6 noise=filter sourceapp={}\n'.format(asn, self.sourceapp) + writer.write(to_send.encode()) + data = await reader.readuntil(b'\n}\n') + ris_prefixes = json.loads(data) + p = self.redis_cache.pipeline() + if ris_prefixes['prefixes']['v4']['originating']: + self.logger.debug('{} has ipv4'.format(asn)) + fixed_networks = self.fix_ipv4_networks(ris_prefixes['prefixes']['v4']['originating']) + p.sadd('{}|v4'.format(asn), *[str(net) for net in fixed_networks]) + total_ipv4 = sum([net.num_addresses for net in fixed_networks]) + p.set('{}|v4|ipcount'.format(asn), total_ipv4) + if ris_prefixes['prefixes']['v6']['originating']: + self.logger.debug('{} has ipv6'.format(asn)) + p.sadd('{}|v6'.format(asn), *ris_prefixes['prefixes']['v6']['originating']) + total_ipv6 = sum([ip_network(prefix).num_addresses for prefix in ris_prefixes['prefixes']['v6']['originating']]) + p.set('{}|v4|ipcount'.format(asn), total_ipv6) + p.execute() + writer.write(b'-k\n') + writer.close() diff --git a/listimport/libs/StatsRipeText.py b/listimport/libs/StatsRipeText.py new file mode 100644 index 0000000..9c32f61 --- /dev/null +++ b/listimport/libs/StatsRipeText.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +from abc import ABC + + +class RIPECaching(ABC): + + def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG): + self.sourceapp = sourceapp + self.hostname = 'stat.ripe.net' + self.port = 43 + self.__init_logger(loglevel) + + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) diff --git a/listimport/modules_config/tmp/DshieldDaily.json b/listimport/modules_config/tmp/DshieldDaily.json deleted file mode 100644 index 91e5398..0000000 --- a/listimport/modules_config/tmp/DshieldDaily.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "url": "http://www.dshield.org/feeds/daily_sources", - "vendor": "dshield", - "name": "daily", - "impact": 0.1, - "parser": "parsers.dshield" -} diff --git a/listimport/modulesfetcher.py b/listimport/modulesfetcher.py index 22360e0..56d2062 100644 --- a/listimport/modulesfetcher.py +++ b/listimport/modulesfetcher.py @@ -1,13 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import requests +import aiohttp from dateutil import parser from datetime import datetime, date from hashlib import sha512 # Faster than sha256 on 64b machines. from pathlib import Path import logging -import asyncio from pid import PidFile, PidFileError import json @@ -46,13 +45,15 @@ class Fetcher(): self.vendor, self.listname)) self.logger.setLevel(loglevel) - def __get_last_modified(self): - r = requests.head(self.url) - if 'Last-Modified' in r.headers: - return parser.parse(r.headers['Last-Modified']) - return None + async def __get_last_modified(self): + async with aiohttp.ClientSession() as session: + async with session.head(self.url) as r: + headers = r.headers + if 'Last-Modified' in headers: + return parser.parse(headers['Last-Modified']) + return None - def __newer(self): + async def __newer(self): '''Check if the file available for download is newed than the one already downloaded by checking the `Last-Modified` header. Note: return False if the file containing the last header content @@ -66,7 +67,7 @@ class Fetcher(): self.logger.debug('No Last-Modified header available') return True self.first_fetch = False - last_modified = self.__get_last_modified() + last_modified = await self.__get_last_modified() if last_modified: self.logger.debug('Last-Modified header available') with last_modified_path.open('w') as f: @@ -75,8 +76,9 @@ class Fetcher(): self.logger.debug('No Last-Modified header available') return True with last_modified_path.open() as f: - last_modified_file = parser.parse(f.read()) - last_modified = self.__get_last_modified() + file_content = f.read() + last_modified_file = parser.parse(file_content) + last_modified = await self.__get_last_modified() if not last_modified: # No more Last-Modified header Oo self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname)) @@ -121,20 +123,22 @@ class Fetcher(): return True return False - @asyncio.coroutine async def fetch_list(self): '''Fetch & store the list''' if not self.fetcher: return try: with PidFile('{}.pid'.format(self.listname), piddir=self.meta): - if not self.__newer(): + if not await self.__newer(): return - r = requests.get(self.url) - if self.__same_as_last(r.content): - return - self.logger.info('Got a new file \o/') - with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: - f.write(r.content) + + async with aiohttp.ClientSession() as session: + async with session.get(self.url) as r: + content = await r.content.read() + if self.__same_as_last(content): + return + self.logger.info('Got a new file \o/') + with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f: + f.write(content) except PidFileError: self.logger.info('Fetcher already running') diff --git a/listimport/parser.py b/listimport/parser.py index c9945d2..f894c88 100644 --- a/listimport/parser.py +++ b/listimport/parser.py @@ -4,7 +4,6 @@ from datetime import datetime from pathlib import Path import logging -import asyncio import json import re from redis import Redis @@ -51,7 +50,6 @@ class RawFilesParser(): self.datetime = datetime.now() return self.extract_ipv4(f.getvalue()) - @asyncio.coroutine async def parse_raw_files(self): for filepath in self.files_to_parse: self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1)) diff --git a/listimport/risfetcher.py b/listimport/risfetcher.py index acecb71..62e3765 100644 --- a/listimport/risfetcher.py +++ b/listimport/risfetcher.py @@ -2,39 +2,59 @@ # -*- coding: utf-8 -*- import logging +import json from redis import Redis -from .libs.StatsRipe import StatsRIPE +from .libs.StatsRipeText import RIPECaching +import asyncio -class RoutingInformationServiceFetcher(): +class RISPrefixLookup(RIPECaching): - def __init__(self, loglevel: int=logging.DEBUG): - self.__init_logger(loglevel) - self.ris_cache = Redis(host='localhost', port=6381, db=0) - self.logger.debug('Starting RIS fetcher') - self.ripe = StatsRIPE() + def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG): + super().__init__(sourceapp, loglevel) + self.logger.debug('Starting RIS Prefix fetcher') - def __init_logger(self, loglevel): - self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) - self.logger.setLevel(loglevel) + def cache_prefix(self, redis_cache, ip, network_info, prefix_overview): + prefix = network_info['prefix'] + asns = network_info['asns'] + description = prefix_overview['block']['desc'] + if not description: + description = prefix_overview['block']['name'] + p = redis_cache.pipeline() + for asn in asns: + p.hmset(ip, {'asn': asn, 'prefix': prefix, 'description': description}) + p.expire(ip, 43200) # 12H + p.execute() - async def fetch(self): + async def run(self): + redis_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True) + reader, writer = await asyncio.open_connection(self.hostname, self.port) + + writer.write(b'-k\n') while True: - ip = self.ris_cache.spop('for_ris_lookup') - if not ip: - break - ip = ip.decode() - network_info = await self.ripe.network_info(ip) - prefix = network_info['data']['prefix'] - asns = network_info['data']['asns'] - if not asns or not prefix: + ip = redis_cache.spop('for_ris_lookup') + if not ip: # TODO: add a check against something to stop the loop + self.logger.debug('Nothing to lookup') + await asyncio.sleep(10) + continue + if redis_cache.exists(ip): + self.logger.debug('Already cached: {}'.format(ip)) + continue + self.logger.debug('RIS lookup: {}'.format(ip)) + to_send = '-d network-info {} sourceapp={}\n'.format(ip, self.sourceapp) + writer.write(to_send.encode()) + data = await reader.readuntil(b'\n}\n') + network_info = json.loads(data) + if not network_info.get('prefix'): self.logger.warning('The IP {} does not seem to be announced'.format(ip)) continue - prefix_overview = await self.ripe.prefix_overview(prefix) - description = prefix_overview['data']['block']['desc'] - if not description: - description = prefix_overview['data']['block']['name'] - for asn in asns: - self.ris_cache.hmset(ip, {'asn': asn, 'prefix': prefix, - 'description': description}) + self.logger.debug('Prefix lookup: {}'.format(ip)) + to_send = '-d prefix-overview {} sourceapp={}\n'.format(network_info['prefix'], self.sourceapp) + writer.write(to_send.encode()) + data = await reader.readuntil(b'\n}\n') + prefix_overview = json.loads(data) + self.logger.debug('RIS cache prefix info: {}'.format(ip)) + self.cache_prefix(redis_cache, ip, network_info, prefix_overview) + writer.write(b'-k\n') + writer.close() diff --git a/ranking.py b/ranking.py new file mode 100755 index 0000000..6719e8d --- /dev/null +++ b/ranking.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import asyncio +from listimport.initranking import ASNLookup + + +logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', + level=logging.INFO, datefmt='%I:%M:%S') + + +class RankingManager(): + + def __init__(self, loglevel: int=logging.DEBUG): + self.asn_fetcher = ASNLookup(loglevel=loglevel) + + async def run_fetcher(self): + # self.asn_fetcher.get_all_asns() + await asyncio.gather( + self.asn_fetcher.get_originating_prefixes(), + self.asn_fetcher.get_originating_prefixes(), + self.asn_fetcher.get_originating_prefixes() + ) + + +if __name__ == '__main__': + modules_manager = RankingManager() + loop = asyncio.get_event_loop() + loop.run_until_complete(modules_manager.run_fetcher()) diff --git a/ris.py b/ris.py index ac94864..0f9226c 100755 --- a/ris.py +++ b/ris.py @@ -3,7 +3,7 @@ import logging import asyncio -from listimport.risfetcher import RoutingInformationServiceFetcher +from listimport.risfetcher import RISPrefixLookup logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S') @@ -12,10 +12,14 @@ logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', class RISManager(): def __init__(self, loglevel: int=logging.DEBUG): - self.ris_fetcher = RoutingInformationServiceFetcher(loglevel) + self.ris_fetcher = RISPrefixLookup(loglevel=loglevel) async def run_fetcher(self): - await asyncio.gather(self.ris_fetcher.fetch()) + await asyncio.gather( + self.ris_fetcher.run(), + self.ris_fetcher.run(), + # self.ris_fetcher.run(2) + ) if __name__ == '__main__':