diff --git a/listimport/initranking.py b/listimport/initranking.py index d783e5f..0552d7c 100644 --- a/listimport/initranking.py +++ b/listimport/initranking.py @@ -42,10 +42,11 @@ class PrefixDatabase(): with gzip.open(BytesIO(r.content), 'r') as f: for line in f: prefix, length, asns = line.decode().strip().split('\t') - for asn in re.split('[,_]', asns): - network = ip_network('{}/{}'.format(prefix, length)) - to_import[asn][address_family].add(str(network)) - to_import[asn]['ipcount'] += network.num_addresses + # The meaning of AS set and multi-origin AS in unclear. Tacking the first ASN in the list only. + asn = re.split('[,_]', asns)[0] + network = ip_network('{}/{}'.format(prefix, length)) + to_import[asn][address_family].add(str(network)) + to_import[asn]['ipcount'] += network.num_addresses p = self.redis_cache.pipeline() p.sadd('asns', *to_import.keys()) diff --git a/listimport/risfetcher.py b/listimport/risfetcher.py index 16cc894..6c8513a 100644 --- a/listimport/risfetcher.py +++ b/listimport/risfetcher.py @@ -2,59 +2,59 @@ # -*- coding: utf-8 -*- import logging -import json from redis import Redis -from .libs.StatsRipeText import RIPECaching -import asyncio +import time +import pytricia +import ipaddress -class RISPrefixLookup(RIPECaching): +class RISPrefixLookup(): - def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG): - super().__init__(sourceapp, loglevel) + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) self.logger.debug('Starting RIS Prefix fetcher') + self.prefix_db = Redis(host='localhost', port=6582, db=0, decode_responses=True) + self.longest_prefix_matching = Redis(host='localhost', port=6581, db=0, decode_responses=True) + self.tree_v4 = pytricia.PyTricia() + self.tree_v6 = pytricia.PyTricia(128) + self.init_tree() - def cache_prefix(self, redis_cache, ip, network_info, prefix_overview): - prefix = network_info['prefix'] - asns = network_info['asns'] - description = prefix_overview['block']['desc'] - if not description: - description = prefix_overview['block']['name'] - p = redis_cache.pipeline() - for asn in asns: - p.hmset(ip, {'asn': asn, 'prefix': prefix, 'description': description}) - p.expire(ip, 43200) # 12H + def __init_logger(self, loglevel): + self.logger = logging.getLogger('{}'.format(self.__class__.__name__)) + self.logger.setLevel(loglevel) + + def cache_prefix(self, ip, prefix, asns): + p = self.longest_prefix_matching.pipeline() + p.hmset(ip, {'asn': asns, 'prefix': prefix}) + p.expire(ip, 43200) # 12H p.execute() - async def run(self): - redis_cache = Redis(host='localhost', port=6581, db=0, decode_responses=True) - reader, writer = await asyncio.open_connection(self.hostname, self.port) + def init_tree(self): + for asn in self.prefix_db.smembers('asns'): + for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v4')): + self.tree_v4[prefix] = asn + for prefix in self.prefix_db.smembers('{}|{}'.format(asn, 'v6')): + self.tree_v6[prefix] = asn - writer.write(b'-k\n') + def run(self): while True: - ip = redis_cache.spop('for_ris_lookup') + ip = self.longest_prefix_matching.spop('for_ris_lookup') if not ip: # TODO: add a check against something to stop the loop self.logger.debug('Nothing to lookup') - await asyncio.sleep(10) + time.sleep(10) continue - if redis_cache.exists(ip): + if self.longest_prefix_matching.exists(ip): self.logger.debug('Already cached: {}'.format(ip)) continue - self.logger.debug('RIS lookup: {}'.format(ip)) - to_send = '-d network-info {} sourceapp={}\n'.format(ip, self.sourceapp) - writer.write(to_send.encode()) - data = await reader.readuntil(b'\n}\n') - network_info = json.loads(data) - if not network_info.get('prefix'): + ip = ipaddress.ip_address(ip) + if ip.version == 4: + prefix = self.tree_v4.get_key(ip) + asns = self.tree_v4.get(ip) + else: + prefix = self.tree_v6.get_key(ip) + asns = self.tree_v6.get(ip) + if not prefix: self.logger.warning('The IP {} does not seem to be announced'.format(ip)) continue - self.logger.debug('Prefix lookup: {}'.format(ip)) - to_send = '-d prefix-overview {} sourceapp={}\n'.format(network_info['prefix'], self.sourceapp) - writer.write(to_send.encode()) - data = await reader.readuntil(b'\n}\n') - prefix_overview = json.loads(data) - self.logger.debug('RIS cache prefix info: {}'.format(ip)) - self.cache_prefix(redis_cache, ip, network_info, prefix_overview) - writer.write(b'-k\n') - writer.close() + self.cache_prefix(ip, prefix, asns) diff --git a/ris.py b/ris.py index 746eb23..7c00da2 100755 --- a/ris.py +++ b/ris.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- import logging -import asyncio from listimport.risfetcher import RISPrefixLookup logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', @@ -14,19 +13,10 @@ class RISManager(): def __init__(self, loglevel: int=logging.DEBUG): self.ris_fetcher = RISPrefixLookup(loglevel=loglevel) - async def run_fetcher(self): - await asyncio.gather( - self.ris_fetcher.run(), - self.ris_fetcher.run(), - self.ris_fetcher.run(), - self.ris_fetcher.run(), - self.ris_fetcher.run(), - self.ris_fetcher.run(), - # self.ris_fetcher.run(2) - ) + def run_fetcher(self): + self.ris_fetcher.run() if __name__ == '__main__': modules_manager = RISManager() - loop = asyncio.get_event_loop() - loop.run_until_complete(modules_manager.run_fetcher()) + modules_manager.run_fetcher()