fix: Properly use asyncio all over the place \o/

pull/12/head
Raphaël Vinot 2018-03-20 21:44:46 +01:00
parent 7961b68852
commit b3c68db8ec
9 changed files with 218 additions and 57 deletions

View File

@ -71,6 +71,24 @@ Creates the following hashes:
IP = {'asn': <asn>, 'prefix': <prefix>, 'description': <description>}
```
## Ranking Information cache (redis, port 6382)
*Usage*: Store the current list of known ASNs at RIPE, and the prefixes originating from them.
Creates the following sets:
```python
asns = set([<asn>, ...])
<asn>|v4 = set([<ipv4_prefix>, ...])
<asn>|v6 = set([<ipv6_prefix>, ...])
```
And the following keys:
```python
<asn>|v4|ipcount = <Total amount of IP v4 addresses originating this AS>
<asn>|v6|ipcount = <Total amount of IP v6 addresses originating this AS>
```
## Long term storage (ardb, port 16379)

76
listimport/initranking.py Normal file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import json
from redis import Redis
import asyncio
from telnetlib import Telnet
from .libs.StatsRipeText import RIPECaching
from ipaddress import ip_network
class ASNLookup(RIPECaching):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
super().__init__(sourceapp, loglevel)
self.redis_cache = Redis(host='localhost', port=6382, db=0, decode_responses=True)
self.logger.debug('Starting ASN lookup cache')
def get_all_asns(self):
with Telnet(self.hostname, self.port) as tn:
tn.write(b'-k\n')
to_send = '-d ris-asns list_asns=true asn_types=o sourceapp={}\n'.format(self.sourceapp)
tn.write(to_send.encode())
ris_asns = json.loads(tn.read_until(b'\n}\n'))
all_asns = ris_asns['asns']['originating']
if not all_asns:
self.logger.warning('No ASNs in ris-asns, something went wrong.')
else:
self.redis_cache.sadd('asns', *all_asns)
self.redis_cache.sadd('asns_to_lookup', *all_asns)
tn.write(b'-k\n')
def fix_ipv4_networks(self, networks):
'''Because we can't have nice things.
Some netorks come without the last(s) bytes (i.e. 170.254.25/24)'''
to_return = []
for net in networks:
try:
to_return.append(ip_network(net))
except ValueError:
ip, mask = net.split('/')
iplist = ip.split('.')
iplist = iplist + ['0'] * (4 - len(iplist))
to_return.append(ip_network('{}/{}'.format('.'.join(iplist), mask)))
return to_return
async def get_originating_prefixes(self):
reader, writer = await asyncio.open_connection(self.hostname, self.port)
writer.write(b'-k\n')
while True:
asn = self.redis_cache.spop('asns_to_lookup')
if not asn:
break
self.logger.debug('ASN lookup: {}'.format(asn))
to_send = '-d ris-prefixes {} list_prefixes=true types=o af=v4,v6 noise=filter sourceapp={}\n'.format(asn, self.sourceapp)
writer.write(to_send.encode())
data = await reader.readuntil(b'\n}\n')
ris_prefixes = json.loads(data)
p = self.redis_cache.pipeline()
if ris_prefixes['prefixes']['v4']['originating']:
self.logger.debug('{} has ipv4'.format(asn))
fixed_networks = self.fix_ipv4_networks(ris_prefixes['prefixes']['v4']['originating'])
p.sadd('{}|v4'.format(asn), *[str(net) for net in fixed_networks])
total_ipv4 = sum([net.num_addresses for net in fixed_networks])
p.set('{}|v4|ipcount'.format(asn), total_ipv4)
if ris_prefixes['prefixes']['v6']['originating']:
self.logger.debug('{} has ipv6'.format(asn))
p.sadd('{}|v6'.format(asn), *ris_prefixes['prefixes']['v6']['originating'])
total_ipv6 = sum([ip_network(prefix).num_addresses for prefix in ris_prefixes['prefixes']['v6']['originating']])
p.set('{}|v4|ipcount'.format(asn), total_ipv6)
p.execute()
writer.write(b'-k\n')
writer.close()

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from abc import ABC
class RIPECaching(ABC):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
self.sourceapp = sourceapp
self.hostname = 'stat.ripe.net'
self.port = 43
self.__init_logger(loglevel)
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)

View File

@ -1,7 +0,0 @@
{
"url": "http://www.dshield.org/feeds/daily_sources",
"vendor": "dshield",
"name": "daily",
"impact": 0.1,
"parser": "parsers.dshield"
}

View File

@ -1,13 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import aiohttp
from dateutil import parser
from datetime import datetime, date
from hashlib import sha512 # Faster than sha256 on 64b machines.
from pathlib import Path
import logging
import asyncio
from pid import PidFile, PidFileError
import json
@ -46,13 +45,15 @@ class Fetcher():
self.vendor, self.listname))
self.logger.setLevel(loglevel)
def __get_last_modified(self):
r = requests.head(self.url)
if 'Last-Modified' in r.headers:
return parser.parse(r.headers['Last-Modified'])
return None
async def __get_last_modified(self):
async with aiohttp.ClientSession() as session:
async with session.head(self.url) as r:
headers = r.headers
if 'Last-Modified' in headers:
return parser.parse(headers['Last-Modified'])
return None
def __newer(self):
async def __newer(self):
'''Check if the file available for download is newed than the one
already downloaded by checking the `Last-Modified` header.
Note: return False if the file containing the last header content
@ -66,7 +67,7 @@ class Fetcher():
self.logger.debug('No Last-Modified header available')
return True
self.first_fetch = False
last_modified = self.__get_last_modified()
last_modified = await self.__get_last_modified()
if last_modified:
self.logger.debug('Last-Modified header available')
with last_modified_path.open('w') as f:
@ -75,8 +76,9 @@ class Fetcher():
self.logger.debug('No Last-Modified header available')
return True
with last_modified_path.open() as f:
last_modified_file = parser.parse(f.read())
last_modified = self.__get_last_modified()
file_content = f.read()
last_modified_file = parser.parse(file_content)
last_modified = await self.__get_last_modified()
if not last_modified:
# No more Last-Modified header Oo
self.logger.warning('{}: Last-Modified header was present, isn\'t anymore!'.format(self.listname))
@ -121,20 +123,22 @@ class Fetcher():
return True
return False
@asyncio.coroutine
async def fetch_list(self):
'''Fetch & store the list'''
if not self.fetcher:
return
try:
with PidFile('{}.pid'.format(self.listname), piddir=self.meta):
if not self.__newer():
if not await self.__newer():
return
r = requests.get(self.url)
if self.__same_as_last(r.content):
return
self.logger.info('Got a new file \o/')
with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f:
f.write(r.content)
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as r:
content = await r.content.read()
if self.__same_as_last(content):
return
self.logger.info('Got a new file \o/')
with (self.directory / '{}.txt'.format(datetime.now().isoformat())).open('wb') as f:
f.write(content)
except PidFileError:
self.logger.info('Fetcher already running')

View File

@ -4,7 +4,6 @@
from datetime import datetime
from pathlib import Path
import logging
import asyncio
import json
import re
from redis import Redis
@ -51,7 +50,6 @@ class RawFilesParser():
self.datetime = datetime.now()
return self.extract_ipv4(f.getvalue())
@asyncio.coroutine
async def parse_raw_files(self):
for filepath in self.files_to_parse:
self.logger.debug('Parsing {}, {} to go.'.format(filepath, len(self.files_to_parse) - 1))

View File

@ -2,39 +2,59 @@
# -*- coding: utf-8 -*-
import logging
import json
from redis import Redis
from .libs.StatsRipe import StatsRIPE
from .libs.StatsRipeText import RIPECaching
import asyncio
class RoutingInformationServiceFetcher():
class RISPrefixLookup(RIPECaching):
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.ris_cache = Redis(host='localhost', port=6381, db=0)
self.logger.debug('Starting RIS fetcher')
self.ripe = StatsRIPE()
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
super().__init__(sourceapp, loglevel)
self.logger.debug('Starting RIS Prefix fetcher')
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)
def cache_prefix(self, redis_cache, ip, network_info, prefix_overview):
prefix = network_info['prefix']
asns = network_info['asns']
description = prefix_overview['block']['desc']
if not description:
description = prefix_overview['block']['name']
p = redis_cache.pipeline()
for asn in asns:
p.hmset(ip, {'asn': asn, 'prefix': prefix, 'description': description})
p.expire(ip, 43200) # 12H
p.execute()
async def fetch(self):
async def run(self):
redis_cache = Redis(host='localhost', port=6381, db=0, decode_responses=True)
reader, writer = await asyncio.open_connection(self.hostname, self.port)
writer.write(b'-k\n')
while True:
ip = self.ris_cache.spop('for_ris_lookup')
if not ip:
break
ip = ip.decode()
network_info = await self.ripe.network_info(ip)
prefix = network_info['data']['prefix']
asns = network_info['data']['asns']
if not asns or not prefix:
ip = redis_cache.spop('for_ris_lookup')
if not ip: # TODO: add a check against something to stop the loop
self.logger.debug('Nothing to lookup')
await asyncio.sleep(10)
continue
if redis_cache.exists(ip):
self.logger.debug('Already cached: {}'.format(ip))
continue
self.logger.debug('RIS lookup: {}'.format(ip))
to_send = '-d network-info {} sourceapp={}\n'.format(ip, self.sourceapp)
writer.write(to_send.encode())
data = await reader.readuntil(b'\n}\n')
network_info = json.loads(data)
if not network_info.get('prefix'):
self.logger.warning('The IP {} does not seem to be announced'.format(ip))
continue
prefix_overview = await self.ripe.prefix_overview(prefix)
description = prefix_overview['data']['block']['desc']
if not description:
description = prefix_overview['data']['block']['name']
for asn in asns:
self.ris_cache.hmset(ip, {'asn': asn, 'prefix': prefix,
'description': description})
self.logger.debug('Prefix lookup: {}'.format(ip))
to_send = '-d prefix-overview {} sourceapp={}\n'.format(network_info['prefix'], self.sourceapp)
writer.write(to_send.encode())
data = await reader.readuntil(b'\n}\n')
prefix_overview = json.loads(data)
self.logger.debug('RIS cache prefix info: {}'.format(ip))
self.cache_prefix(redis_cache, ip, network_info, prefix_overview)
writer.write(b'-k\n')
writer.close()

30
ranking.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import asyncio
from listimport.initranking import ASNLookup
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
class RankingManager():
def __init__(self, loglevel: int=logging.DEBUG):
self.asn_fetcher = ASNLookup(loglevel=loglevel)
async def run_fetcher(self):
# self.asn_fetcher.get_all_asns()
await asyncio.gather(
self.asn_fetcher.get_originating_prefixes(),
self.asn_fetcher.get_originating_prefixes(),
self.asn_fetcher.get_originating_prefixes()
)
if __name__ == '__main__':
modules_manager = RankingManager()
loop = asyncio.get_event_loop()
loop.run_until_complete(modules_manager.run_fetcher())

10
ris.py
View File

@ -3,7 +3,7 @@
import logging
import asyncio
from listimport.risfetcher import RoutingInformationServiceFetcher
from listimport.risfetcher import RISPrefixLookup
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
@ -12,10 +12,14 @@ logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
class RISManager():
def __init__(self, loglevel: int=logging.DEBUG):
self.ris_fetcher = RoutingInformationServiceFetcher(loglevel)
self.ris_fetcher = RISPrefixLookup(loglevel=loglevel)
async def run_fetcher(self):
await asyncio.gather(self.ris_fetcher.fetch())
await asyncio.gather(
self.ris_fetcher.run(),
self.ris_fetcher.run(),
# self.ris_fetcher.run(2)
)
if __name__ == '__main__':