new: Uses IPASN History for the routing information.

pull/12/head
Raphaël Vinot 2018-11-14 17:07:30 +01:00
parent a40d0f02b6
commit 5ce2f91430
26 changed files with 154 additions and 1790 deletions

View File

@ -27,7 +27,7 @@ class AbstractManager(ABC):
try: try:
self._to_run_forever() self._to_run_forever()
except Exception: except Exception:
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') self.logger.exception(f'Something went wrong in {self.__class__.__name__}.')
if not long_sleep(sleep_in_sec): if not long_sleep(sleep_in_sec):
break break
self.logger.info(f'Shutting down {self.__class__.__name__}') self.logger.info(f'Shutting down {self.__class__.__name__}')

View File

@ -0,0 +1,3 @@
{
"ipasnhistory_url": "http://127.0.0.1:5176/"
}

View File

@ -3,7 +3,7 @@
import logging import logging
from redis import StrictRedis from redis import StrictRedis
from .libs.helpers import shutdown_requested, set_running, unset_running, get_socket_path from .libs.helpers import shutdown_requested, set_running, unset_running, get_socket_path, get_ipasn, sanity_check_ipasn
class DatabaseInsert(): class DatabaseInsert():
@ -12,7 +12,7 @@ class DatabaseInsert():
self.__init_logger(loglevel) self.__init_logger(loglevel)
self.ardb_storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True) self.ardb_storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
self.redis_sanitized = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True) self.redis_sanitized = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True)
self.ris_cache = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True) self.ipasn = get_ipasn()
self.logger.debug('Starting import') self.logger.debug('Starting import')
def __init_logger(self, loglevel): def __init_logger(self, loglevel):
@ -20,52 +20,70 @@ class DatabaseInsert():
self.logger.setLevel(loglevel) self.logger.setLevel(loglevel)
def insert(self): def insert(self):
ready, message = sanity_check_ipasn(self.ipasn)
if not ready:
# Try again later.
self.logger.warning(message)
return
self.logger.debug(message)
set_running(self.__class__.__name__) set_running(self.__class__.__name__)
while True: while True:
if shutdown_requested(): if shutdown_requested():
break break
uuids = self.redis_sanitized.spop('to_insert', 1000) uuids = self.redis_sanitized.spop('to_insert', 100)
if not uuids: if not uuids:
break break
p = self.redis_sanitized.pipeline(transaction=False) p = self.redis_sanitized.pipeline(transaction=False)
[p.hgetall(uuid) for uuid in uuids] [p.hgetall(uuid) for uuid in uuids]
sanitized_data = p.execute() sanitized_data = p.execute()
for_query = []
for i, uuid in enumerate(uuids):
data = sanitized_data[i]
if not data:
self.logger.warning(f'No data for UUID {uuid}. This should not happen, but lets move on.')
continue
for_query.append({'ip': data['ip'], 'address_family': data['address_family'], 'source': 'caida',
'date': data['datetime'], 'precision_delta': {'days': 3}})
responses = self.ipasn.mass_query(for_query)
retry = [] retry = []
done = [] done = []
prefix_missing = []
ardb_pipeline = self.ardb_storage.pipeline(transaction=False) ardb_pipeline = self.ardb_storage.pipeline(transaction=False)
for i, uuid in enumerate(uuids): for i, uuid in enumerate(uuids):
data = sanitized_data[i] data = sanitized_data[i]
if not data: if not data:
self.logger.warning(f'No data for UUID {uuid}. This should not happen, but lets move on.') self.logger.warning(f'No data for UUID {uuid}. This should not happen, but lets move on.')
continue continue
# Data gathered from the RIS queries: routing_info = responses['responses'][i][0] # our queries are on one single date, not a range
# * IP Block of the IP -> https://stat.ripe.net/docs/data_api#NetworkInfo # Data gathered from IPASN History:
# * AS number -> https://stat.ripe.net/docs/data_api#NetworkInfo # * IP Block of the IP
# * Full text description of the AS (older name) -> https://stat.ripe.net/docs/data_api#AsOverview # * AS number
ris_entry = self.ris_cache.hgetall(data['ip']) if 'error' in routing_info:
if not ris_entry: self.logger.warning(f"Unable to find routing information for {data['ip']} - {data['datetime']}: {routing_info['error']}")
# RIS data not available yet, retry later
retry.append(uuid)
# In case this IP is missing in the set to process
prefix_missing.append(data['ip'])
continue continue
# Single date query, getting from the object
datetime_routing = list(routing_info.keys())[0]
entry = routing_info[datetime_routing]
if not entry:
# routing info is missing, need to try again later.
retry.append(uuid)
continue
# Format: <YYYY-MM-DD>|sources -> set([<source>, ...]) # Format: <YYYY-MM-DD>|sources -> set([<source>, ...])
ardb_pipeline.sadd(f"{data['date']}|sources", data['source']) ardb_pipeline.sadd(f"{data['date']}|sources", data['source'])
# Format: <YYYY-MM-DD>|<source> -> set([<asn>, ...]) # Format: <YYYY-MM-DD>|<source> -> set([<asn>, ...])
ardb_pipeline.sadd(f"{data['date']}|{data['source']}", ris_entry['asn']) ardb_pipeline.sadd(f"{data['date']}|{data['source']}", entry['asn'])
# Format: <YYYY-MM-DD>|<source>|<asn> -> set([<prefix>, ...]) # Format: <YYYY-MM-DD>|<source>|<asn> -> set([<prefix>, ...])
ardb_pipeline.sadd(f"{data['date']}|{data['source']}|{ris_entry['asn']}", ris_entry['prefix']) ardb_pipeline.sadd(f"{data['date']}|{data['source']}|{entry['asn']}", entry['prefix'])
# Format: <YYYY-MM-DD>|<source>|<asn>|<prefix> -> set([<ip>|<datetime>, ...]) # Format: <YYYY-MM-DD>|<source>|<asn>|<prefix> -> set([<ip>|<datetime>, ...])
ardb_pipeline.sadd(f"{data['date']}|{data['source']}|{ris_entry['asn']}|{ris_entry['prefix']}", ardb_pipeline.sadd(f"{data['date']}|{data['source']}|{entry['asn']}|{entry['prefix']}",
f"{data['ip']}|{data['datetime']}") f"{data['ip']}|{data['datetime']}")
done.append(uuid) done.append(uuid)
ardb_pipeline.execute() ardb_pipeline.execute()
if prefix_missing:
self.ris_cache.sadd('for_ris_lookup', *prefix_missing)
p = self.redis_sanitized.pipeline(transaction=False) p = self.redis_sanitized.pipeline(transaction=False)
if done: if done:
p.delete(*done) p.delete(*done)

View File

@ -24,3 +24,15 @@ class MissingEnv(BGPRankingException):
class InvalidDateFormat(BGPRankingException): class InvalidDateFormat(BGPRankingException):
pass pass
class MissingConfigFile(BGPRankingException):
pass
class MissingConfigEntry(BGPRankingException):
pass
class ThirdPartyUnreachable(BGPRankingException):
pass

View File

@ -4,12 +4,13 @@
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from .exceptions import CreateDirectoryException, MissingEnv from .exceptions import CreateDirectoryException, MissingEnv, MissingConfigFile, MissingConfigEntry, ThirdPartyUnreachable
from redis import StrictRedis from redis import StrictRedis
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from datetime import datetime, timedelta from datetime import datetime, timedelta
import time import time
import json import json
from pyipasnhistory import IPASNHistory
def load_config_files(config_dir: Path=None) -> dict: def load_config_files(config_dir: Path=None) -> dict:
@ -54,24 +55,23 @@ def safe_create_dir(to_create: Path):
def set_running(name: str): def set_running(name: str):
r = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=1, decode_responses=True) r = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
r.hset('running', name, 1) r.hset('running', name, 1)
def unset_running(name: str): def unset_running(name: str):
r = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=1, decode_responses=True) r = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
r.hdel('running', name) r.hdel('running', name)
def is_running(): def is_running():
r = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=1, decode_responses=True) r = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
return r.hgetall('running') return r.hgetall('running')
def get_socket_path(name: str): def get_socket_path(name: str):
mapping = { mapping = {
'ris': Path('cache', 'ris.sock'), 'cache': Path('cache', 'cache.sock'),
'prefixes': Path('cache', 'prefixes.sock'),
'storage': Path('storage', 'storage.sock'), 'storage': Path('storage', 'storage.sock'),
'intake': Path('temp', 'intake.sock'), 'intake': Path('temp', 'intake.sock'),
'prepare': Path('temp', 'prepare.sock'), 'prepare': Path('temp', 'prepare.sock'),
@ -79,6 +79,33 @@ def get_socket_path(name: str):
return str(get_homedir() / mapping[name]) return str(get_homedir() / mapping[name])
def get_ipasn():
general_config_file = get_config_path() / 'bgpranking.json'
if not general_config_file.exists():
raise MissingConfigFile(f'The general configuration file ({general_config_file}) does not exists.')
with open(general_config_file) as f:
config = json.load(f)
if 'ipasnhistory_url' not in config:
raise MissingConfigEntry(f'"ipasnhistory_url" is missing in {general_config_file}.')
ipasn = IPASNHistory(config['ipasnhistory_url'])
if not ipasn.is_up:
raise ThirdPartyUnreachable(f"Unable to reach IPASNHistory on {config['ipasnhistory_url']}")
return ipasn
def sanity_check_ipasn(ipasn):
meta = ipasn.meta()
if 'error' in meta:
raise ThirdPartyUnreachable(f'IP ASN History has a problem: meta["error"]')
v4_percent = meta['cached_dates']['caida']['v4']['percent']
v6_percent = meta['cached_dates']['caida']['v6']['percent']
if v4_percent < 90 or v6_percent < 90: # (this way it works if we only load 10 days)
# Try again later.
return False, f"IP ASN History is not ready: v4 {v4_percent}% / v6 {v6_percent}% loaded"
return True, f"IP ASN History is ready: v4 {v4_percent}% / v6 {v6_percent}% loaded"
def check_running(name: str): def check_running(name: str):
socket_path = get_socket_path(name) socket_path = get_socket_path(name)
try: try:
@ -90,7 +117,7 @@ def check_running(name: str):
def shutdown_requested(): def shutdown_requested():
try: try:
r = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=1, decode_responses=True) r = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
return r.exists('shutdown') return r.exists('shutdown')
except ConnectionRefusedError: except ConnectionRefusedError:
return True return True

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json
from redis import StrictRedis from redis import StrictRedis
from .libs.helpers import get_socket_path from .libs.helpers import get_socket_path, get_ipasn
class Monitor(): class Monitor():
@ -10,30 +11,12 @@ class Monitor():
def __init__(self): def __init__(self):
self.intake = StrictRedis(unix_socket_path=get_socket_path('intake'), db=0, decode_responses=True) self.intake = StrictRedis(unix_socket_path=get_socket_path('intake'), db=0, decode_responses=True)
self.sanitize = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True) self.sanitize = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True)
self.ris_cache = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True) self.cache = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
self.prefix_cache = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=0, decode_responses=True) self.ipasn = get_ipasn()
self.running = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=1, decode_responses=True)
self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
def get_runinng(self):
return self.running.hgetall('running')
def info_prefix_cache(self):
to_return = {'IPv6 Dump': '', 'IPv4 Dump': '', 'Number ASNs': 0}
if self.prefix_cache.exists('ready'):
v6_dump = self.prefix_cache.get('current|v6')
v4_dump = self.prefix_cache.get('current|v4')
number_as = self.prefix_cache.scard('asns')
to_return['IPv6 Dump'] = v6_dump
to_return['IPv4 Dump'] = v4_dump
to_return['Number ASNs'] = number_as
return to_return
def get_values(self): def get_values(self):
ips_in_intake = self.intake.scard('intake') ips_in_intake = self.intake.scard('intake')
waiting_for_ris_lookup = self.ris_cache.scard('for_ris_lookup')
ready_to_insert = self.sanitize.scard('to_insert') ready_to_insert = self.sanitize.scard('to_insert')
prefix_db_ready = self.prefix_cache.exists('ready') return json.dumps({'Non-parsed IPs': ips_in_intake, 'Parsed IPs': ready_to_insert,
return {'Non-parsed IPs': ips_in_intake, 'Parsed IPs': ready_to_insert, 'IPASN History': self.ipasn.meta(), 'running': self.cache.hgetall('running')},
'Awaiting prefix lookup': waiting_for_ris_lookup, indent=2)
'Prefix database ready': prefix_db_ready}

View File

@ -1,97 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from redis import StrictRedis
from ipaddress import ip_network
import requests
import gzip
from io import BytesIO
from collections import defaultdict
import re
import time
from .libs.helpers import set_running, unset_running, get_socket_path
from dateutil.parser import parse
# Dataset source: Routeviews Prefix to AS mappings Dataset for IPv4 and IPv6
# http://www.caida.org/data/routing/routeviews-prefix2as.xml
class PrefixDatabase():
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.prefix_cache = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=0, decode_responses=True)
self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True)
self.ipv6_url = 'http://data.caida.org/datasets/routing/routeviews6-prefix2as/{}'
self.ipv4_url = 'http://data.caida.org/datasets/routing/routeviews-prefix2as/{}'
def __init_logger(self, loglevel):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(loglevel)
def update_required(self):
v4_is_new, v4_path = self._has_new('v4', self.ipv4_url)
v6_is_new, v6_path = self._has_new('v6', self.ipv6_url)
if any([v4_is_new, v6_is_new]):
self.logger.info('Prefix update required.')
else:
self.logger.debug('No prefix update required.')
return any([v4_is_new, v6_is_new])
def _has_new(self, address_family, root_url):
r = requests.get(root_url.format('pfx2as-creation.log'))
last_entry = r.text.split('\n')[-2]
path = last_entry.split('\t')[-1]
if path == self.prefix_cache.get(f'current|{address_family}'):
self.logger.debug(f'Same file already loaded: {path}')
return False, path
return True, path
def _init_routes(self, address_family, root_url, path) -> bool:
self.logger.debug(f'Loading {path}')
date = parse(re.findall('(?:.*)/(?:.*)/routeviews-rv[2,6]-(.*).pfx2as.gz', path)[0]).isoformat()
r = requests.get(root_url.format(path))
to_import = defaultdict(lambda: {address_family: set(), 'ipcount': 0})
with gzip.open(BytesIO(r.content), 'r') as f:
for line in f:
prefix, length, asns = line.decode().strip().split('\t')
# The meaning of AS set and multi-origin AS in unclear. Taking the first ASN in the list only.
asn = re.split('[,_]', asns)[0]
network = ip_network(f'{prefix}/{length}')
to_import[asn][address_family].add(str(network))
to_import[asn]['ipcount'] += network.num_addresses
p = self.prefix_cache.pipeline()
p_asn_meta = self.asn_meta.pipeline()
p.sadd('asns', *to_import.keys())
p_asn_meta.set(f'{address_family}|last', date) # Not necessarely today
p_asn_meta.rpush(f'{address_family}|dates', date)
p_asn_meta.sadd(f'{date}|asns|{address_family}', *to_import.keys())
for asn, data in to_import.items():
p.sadd(f'{asn}|{address_family}', *data[address_family])
p.set(f'{asn}|{address_family}|ipcount', data['ipcount'])
p_asn_meta.sadd(f'{date}|{asn}|{address_family}', *data[address_family])
p_asn_meta.set(f'{date}|{asn}|{address_family}|ipcount', data['ipcount'])
p.set(f'current|{address_family}', path)
p.execute()
p_asn_meta.execute()
return True
def load_prefixes(self):
set_running(self.__class__.__name__)
self.prefix_cache.delete('ready')
self.asn_meta.delete('v4|last')
self.asn_meta.delete('v6|last')
self.logger.info('Prefix update starting in a few seconds.')
time.sleep(15)
v4_is_new, v4_path = self._has_new('v4', self.ipv4_url)
v6_is_new, v6_path = self._has_new('v6', self.ipv6_url)
self.prefix_cache.flushdb()
# TODO: Add a catchall for everything that isn't announced so we can track that down later on
self._init_routes('v6', self.ipv6_url, v6_path)
self._init_routes('v4', self.ipv4_url, v4_path)
self.prefix_cache.set('ready', 1)
self.logger.info('Prefix update complete.')
unset_running(self.__class__.__name__)

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from typing import TypeVar, Union from typing import TypeVar, Union
@ -8,9 +8,10 @@ from dateutil.parser import parse
from collections import defaultdict from collections import defaultdict
import logging import logging
import json
from redis import StrictRedis from redis import StrictRedis
from .libs.helpers import get_socket_path from .libs.helpers import get_socket_path, get_config_path
from .libs.exceptions import InvalidDateFormat from .libs.exceptions import InvalidDateFormat
from .libs.statsripe import StatsRIPE from .libs.statsripe import StatsRIPE
@ -24,7 +25,7 @@ class Querying():
self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True) self.storage = StrictRedis(unix_socket_path=get_socket_path('storage'), decode_responses=True)
self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True) self.ranking = StrictRedis(unix_socket_path=get_socket_path('storage'), db=1, decode_responses=True)
self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True) self.asn_meta = StrictRedis(unix_socket_path=get_socket_path('storage'), db=2, decode_responses=True)
self.cache = StrictRedis(unix_socket_path=get_socket_path('ris'), db=1, decode_responses=True) self.cache = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
def __init_logger(self, loglevel: int): def __init_logger(self, loglevel: int):
self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger = logging.getLogger(f'{self.__class__.__name__}')
@ -200,3 +201,14 @@ class Querying():
rank = 0 rank = 0
to_return[c].insert(0, (d.isoformat(), rank, list(details))) to_return[c].insert(0, (d.isoformat(), rank, list(details)))
return to_return return to_return
def get_source_config(self):
pass
def get_sources_configs(self):
config_dir = get_config_path() / 'modules'
loaded = []
for modulepath in config_dir.glob('*.json'):
with open(modulepath) as f:
loaded.append(json.load(f))
return {'{}-{}'.format(config['vendor'], config['name']): config for config in loaded}

View File

@ -23,7 +23,7 @@ class Ranking():
self.logger.setLevel(loglevel) self.logger.setLevel(loglevel)
def rank_a_day(self, day: str): def rank_a_day(self, day: str):
# FIXME: If we want to rank an older date, we need to hav older datasets for the announces # FIXME: If we want to rank an older date, we need to have older datasets for the announces
v4_last, v6_last = self.asn_meta.mget('v4|last', 'v6|last') v4_last, v6_last = self.asn_meta.mget('v4|last', 'v6|last')
asns_aggregation_key_v4 = f'{day}|asns|v4' asns_aggregation_key_v4 = f'{day}|asns|v4'
asns_aggregation_key_v6 = f'{day}|asns|v6' asns_aggregation_key_v6 = f'{day}|asns|v6'
@ -45,6 +45,10 @@ class Ranking():
asn_rank_v4 = 0.0 asn_rank_v4 = 0.0
asn_rank_v6 = 0.0 asn_rank_v6 = 0.0
for prefix in self.storage.smembers(f'{day}|{source}|{asn}'): for prefix in self.storage.smembers(f'{day}|{source}|{asn}'):
if prefix == 'None':
# FIXME, this should not happen
self.logger.warning(f'Fucked up prefix in "{day}|{source}|{asn}"')
continue
ips = set([ip_ts.split('|')[0] ips = set([ip_ts.split('|')[0]
for ip_ts in self.storage.smembers(f'{day}|{source}|{asn}|{prefix}')]) for ip_ts in self.storage.smembers(f'{day}|{source}|{asn}|{prefix}')])
py_prefix = ip_network(prefix) py_prefix = ip_network(prefix)

View File

@ -1,82 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from redis import StrictRedis
import time
import pytricia
import ipaddress
from .libs.helpers import shutdown_requested, set_running, unset_running, get_socket_path
class RISPrefixLookup():
def __init__(self, loglevel: int=logging.DEBUG):
self.__init_logger(loglevel)
self.logger.info('Starting RIS Prefix fetcher')
self.prefix_db = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=0, decode_responses=True)
self.longest_prefix_matching = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True)
self.tree_v4 = pytricia.PyTricia()
self.tree_v6 = pytricia.PyTricia(128)
self.force_init = True
self.current_v4 = None
self.current_v6 = None
def __init_logger(self, loglevel):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(loglevel)
def cache_prefix(self, pipe, ip, prefix, asns):
pipe.hmset(ip, {'asn': asns, 'prefix': prefix})
pipe.expire(ip, 43200) # 12H
def init_tree(self):
for asn in self.prefix_db.smembers('asns'):
for prefix in self.prefix_db.smembers(f'{asn}|v4'):
self.tree_v4[prefix] = asn
for prefix in self.prefix_db.smembers(f'{asn}|v6'):
self.tree_v6[prefix] = asn
self.tree_v4['0.0.0.0/0'] = 0
self.tree_v6['::/0'] = 0
self.current_v4 = self.prefix_db.get('current|v4')
self.current_v6 = self.prefix_db.get('current|v6')
def run(self):
set_running(self.__class__.__name__)
while True:
if shutdown_requested():
break
if not self.prefix_db.get('ready'):
self.logger.debug('Prefix database not ready.')
time.sleep(5)
self.force_init = True
continue
if (self.force_init or
(self.current_v4 != self.prefix_db.get('current|v4')) or
(self.current_v6 != self.prefix_db.get('current|v6'))):
self.init_tree()
self.force_init = False
ips = self.longest_prefix_matching.spop('for_ris_lookup', 100)
if not ips: # TODO: add a check against something to stop the loop
self.logger.debug('Nothing to lookup')
break
pipe = self.longest_prefix_matching.pipeline(transaction=False)
for ip in ips:
if self.longest_prefix_matching.exists(ip):
self.logger.debug(f'Already cached: {ip}')
continue
ip = ipaddress.ip_address(ip)
if ip.version == 4:
prefix = self.tree_v4.get_key(ip)
asns = self.tree_v4.get(ip)
else:
prefix = self.tree_v6.get_key(ip)
asns = self.tree_v6.get(ip)
if not prefix:
self.logger.warning(f'The IP {ip} does not seem to be announced')
continue
self.cache_prefix(pipe, ip, prefix, asns)
pipe.execute()
unset_running(self.__class__.__name__)

View File

@ -1,13 +1,14 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from datetime import timezone
from dateutil import parser from dateutil import parser
import logging import logging
from redis import StrictRedis from redis import StrictRedis
from .libs.helpers import shutdown_requested, set_running, unset_running, get_socket_path
import ipaddress import ipaddress
from .libs.helpers import shutdown_requested, set_running, unset_running, get_socket_path, get_ipasn, sanity_check_ipasn
class Sanitizer(): class Sanitizer():
@ -15,7 +16,7 @@ class Sanitizer():
self.__init_logger(loglevel) self.__init_logger(loglevel)
self.redis_intake = StrictRedis(unix_socket_path=get_socket_path('intake'), db=0, decode_responses=True) self.redis_intake = StrictRedis(unix_socket_path=get_socket_path('intake'), db=0, decode_responses=True)
self.redis_sanitized = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True) self.redis_sanitized = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True)
self.ris_cache = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True) self.ipasn = get_ipasn()
self.logger.debug('Starting import') self.logger.debug('Starting import')
def __init_logger(self, loglevel): def __init_logger(self, loglevel):
@ -23,6 +24,13 @@ class Sanitizer():
self.logger.setLevel(loglevel) self.logger.setLevel(loglevel)
def sanitize(self): def sanitize(self):
ready, message = sanity_check_ipasn(self.ipasn)
if not ready:
# Try again later.
self.logger.warning(message)
return
self.logger.debug(message)
set_running(self.__class__.__name__) set_running(self.__class__.__name__)
while True: while True:
if shutdown_requested(): if shutdown_requested():
@ -30,12 +38,16 @@ class Sanitizer():
uuids = self.redis_intake.spop('intake', 100) uuids = self.redis_intake.spop('intake', 100)
if not uuids: if not uuids:
break break
for_ris_lookup = [] for_cache = []
pipeline = self.redis_sanitized.pipeline(transaction=False) pipeline = self.redis_sanitized.pipeline(transaction=False)
for uuid in uuids: for uuid in uuids:
data = self.redis_intake.hgetall(uuid) data = self.redis_intake.hgetall(uuid)
try: try:
ip = ipaddress.ip_address(data['ip']) ip = ipaddress.ip_address(data['ip'])
if isinstance(ip, ipaddress.IPv6Address):
address_family = 'v6'
else:
address_family = 'v4'
except ValueError: except ValueError:
self.logger.info(f"Invalid IP address: {data['ip']}") self.logger.info(f"Invalid IP address: {data['ip']}")
continue continue
@ -43,15 +55,21 @@ class Sanitizer():
self.logger.info(f"The IP address {data['ip']} is not global") self.logger.info(f"The IP address {data['ip']} is not global")
continue continue
date = parser.parse(data['datetime']).date().isoformat() datetime = parser.parse(data['datetime'])
# NOTE: to consider: discard data with an old timestamp (define old) if datetime.tzinfo:
# Make sure the datetime isn't TZ aware, and UTC.
datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None)
for_cache.append({'ip': str(ip), 'address_family': address_family, 'source': 'caida',
'date': datetime.isoformat(), 'precision_delta': {'days': 3}})
# Add to temporay DB for further processing # Add to temporay DB for further processing
for_ris_lookup.append(str(ip)) pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family,
pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()})
'date': date, 'datetime': data['datetime']})
pipeline.sadd('to_insert', uuid) pipeline.sadd('to_insert', uuid)
pipeline.execute() pipeline.execute()
self.redis_intake.delete(*uuids) self.redis_intake.delete(*uuids)
self.ris_cache.sadd('for_ris_lookup', *for_ris_lookup)
# Just cache everything so the lookup scripts can do their thing.
self.ipasn.mass_cache(for_cache)
unset_running(self.__class__.__name__) unset_running(self.__class__.__name__)

View File

@ -1,30 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import requests
from bgpranking.abstractmanager import AbstractManager
from bgpranking.prefixdb import PrefixDatabase
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
class PrefixDBManager(AbstractManager):
def __init__(self, loglevel: int=logging.DEBUG):
super().__init__(loglevel)
self.prefix_db = PrefixDatabase(loglevel=loglevel)
def _to_run_forever(self):
try:
if self.prefix_db.update_required():
self.prefix_db.load_prefixes()
except requests.exceptions.ConnectionError as e:
self.logger.critical(f'Unable to download the prefix database: {e}')
if __name__ == '__main__':
p = PrefixDBManager()
p.run(sleep_in_sec=3600)

View File

@ -14,10 +14,7 @@ class MonitorManager():
self.monitor = Monitor() self.monitor = Monitor()
def get_values(self): def get_values(self):
generic = self.monitor.get_values() return self.monitor.get_values()
prefix_cache = self.monitor.info_prefix_cache()
running = self.monitor.get_runinng()
return generic, prefix_cache, running
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,25 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
from bgpranking.abstractmanager import AbstractManager
from bgpranking.risfetcher import RISPrefixLookup
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S')
class RISLookupManager(AbstractManager):
def __init__(self, loglevel: int=logging.INFO):
super().__init__(loglevel)
self.ris_fetcher = RISPrefixLookup(loglevel=loglevel)
def _to_run_forever(self):
self.ris_fetcher.run()
if __name__ == '__main__':
rislookup = RISLookupManager()
rislookup.run(120)

View File

@ -12,7 +12,7 @@ import argparse
def launch_cache(storage_directory: Path=None): def launch_cache(storage_directory: Path=None):
if not storage_directory: if not storage_directory:
storage_directory = get_homedir() storage_directory = get_homedir()
if not check_running('ris') and not check_running('prefixes'): if not check_running('cache'):
Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache')) Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache'))
@ -55,7 +55,7 @@ def launch_all():
def check_all(stop=False): def check_all(stop=False):
backends = [['ris', False], ['prefixes', False], ['storage', False], backends = [['cache', False], ['storage', False],
['intake', False], ['prepare', False]] ['intake', False], ['prepare', False]]
while True: while True:
for b in backends: for b in backends:

View File

@ -6,7 +6,7 @@ import time
from redis import StrictRedis from redis import StrictRedis
if __name__ == '__main__': if __name__ == '__main__':
r = StrictRedis(unix_socket_path=get_socket_path('prefixes'), db=1, decode_responses=True) r = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
r.set('shutdown', 1) r.set('shutdown', 1)
while True: while True:
running = is_running() running = is_running()

View File

@ -10,8 +10,6 @@ if __name__ == '__main__':
get_homedir() get_homedir()
p = Popen(['run_backend.py', '--start']) p = Popen(['run_backend.py', '--start'])
p.wait() p.wait()
Popen(['loadprefixes.py'])
Popen(['rislookup.py'])
Popen(['fetcher.py']) Popen(['fetcher.py'])
Popen(['ssfetcher.py']) Popen(['ssfetcher.py'])
Popen(['parser.py']) Popen(['parser.py'])

View File

@ -106,7 +106,7 @@ tcp-backlog 511
# incoming connections. There is no default, so Redis will not listen # incoming connections. There is no default, so Redis will not listen
# on a unix socket when not specified. # on a unix socket when not specified.
# #
unixsocket prefixes.sock unixsocket cache.sock
unixsocketperm 700 unixsocketperm 700
# Close the connection after a client is idle for N seconds (0 to disable) # Close the connection after a client is idle for N seconds (0 to disable)
@ -168,7 +168,7 @@ loglevel notice
# Specify the log file name. Also the empty string can be used to force # Specify the log file name. Also the empty string can be used to force
# Redis to log on the standard output. Note that if you use standard # Redis to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null # output for logging but daemonize, logs will be sent to /dev/null
logfile "prefixes.log" logfile "cache.log"
# To enable logging to the system logger, just set 'syslog-enabled' to yes, # To enable logging to the system logger, just set 'syslog-enabled' to yes,
# and optionally update the other syslog parameters to suit your needs. # and optionally update the other syslog parameters to suit your needs.

1317
cache/ris.conf vendored

File diff suppressed because it is too large Load Diff

3
cache/run_redis.sh vendored
View File

@ -3,5 +3,4 @@
set -e set -e
set -x set -x
../../redis/src/redis-server ./ris.conf ../../redis/src/redis-server ./cache.conf
../../redis/src/redis-server ./prefixes.conf

View File

@ -3,5 +3,4 @@
# set -e # set -e
set -x set -x
../../redis/src/redis-cli -s ./ris.sock shutdown ../../redis/src/redis-cli -s ./cache.sock shutdown
../../redis/src/redis-cli -s ./prefixes.sock shutdown

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from abc import ABC
class RIPECaching(ABC):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
self.sourceapp = sourceapp
self.hostname = 'stat.ripe.net'
self.port = 43
self.__init_logger(loglevel)
def __init_logger(self, loglevel):
self.logger = logging.getLogger('{}'.format(self.__class__.__name__))
self.logger.setLevel(loglevel)

View File

@ -1,79 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import json
from redis import Redis
import asyncio
from .libs.StatsRipeText import RIPECaching
from ipaddress import ip_network
class ASNLookup(RIPECaching):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
super().__init__(sourceapp, loglevel)
self.redis_cache = Redis(host='localhost', port=6382, db=0, decode_responses=True)
self.logger.debug('Starting ASN lookup cache')
async def get_all_asns(self):
reader, writer = await asyncio.open_connection(self.hostname, self.port)
to_send = '-d ris-asns list_asns=true asn_types=o sourceapp={}\n'.format(self.sourceapp)
writer.write(to_send.encode())
ris_asns = json.loads(await reader.read())
all_asns = ris_asns['asns']['originating']
if not all_asns:
self.logger.warning('No ASNs in ris-asns, something went wrong.')
else:
self.redis_cache.sadd('asns', *all_asns)
self.redis_cache.sadd('asns_to_lookup', *all_asns)
def fix_ipv4_networks(self, networks):
'''Because we can't have nice things.
Some netorks come without the last(s) bytes (i.e. 170.254.25/24)'''
to_return = []
for net in networks:
try:
to_return.append(ip_network(net))
except ValueError:
ip, mask = net.split('/')
iplist = ip.split('.')
iplist = iplist + ['0'] * (4 - len(iplist))
to_return.append(ip_network('{}/{}'.format('.'.join(iplist), mask)))
return to_return
async def get_originating_prefixes(self):
reader, writer = await asyncio.open_connection(self.hostname, self.port)
writer.write(b'-k\n')
while True:
asn = self.redis_cache.spop('asns_to_lookup')
if not asn:
break
self.logger.debug('ASN lookup: {}'.format(asn))
to_send = '-d ris-prefixes {} list_prefixes=true types=o af=v4,v6 noise=filter sourceapp={}\n'.format(asn, self.sourceapp)
writer.write(to_send.encode())
try:
data = await reader.readuntil(b'\n}\n')
except asyncio.streams.LimitOverrunError:
self.logger.debug('ASN lookup failed: {}'.format(asn))
self.redis_cache.sadd('asns_to_lookup', asn)
writer.close()
reader, writer = await asyncio.open_connection(self.hostname, self.port)
ris_prefixes = json.loads(data)
p = self.redis_cache.pipeline()
if ris_prefixes['prefixes']['v4']['originating']:
self.logger.debug('{} has ipv4'.format(asn))
fixed_networks = self.fix_ipv4_networks(ris_prefixes['prefixes']['v4']['originating'])
p.sadd('{}|v4'.format(asn), *[str(net) for net in fixed_networks])
total_ipv4 = sum([net.num_addresses for net in fixed_networks])
p.set('{}|v4|ipcount'.format(asn), total_ipv4)
if ris_prefixes['prefixes']['v6']['originating']:
self.logger.debug('{} has ipv6'.format(asn))
p.sadd('{}|v6'.format(asn), *ris_prefixes['prefixes']['v6']['originating'])
total_ipv6 = sum([ip_network(prefix).num_addresses for prefix in ris_prefixes['prefixes']['v6']['originating']])
p.set('{}|v4|ipcount'.format(asn), total_ipv6)
p.execute()
writer.write(b'-k\n')
writer.close()

View File

@ -1,61 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import json
from redis import Redis
from .libs.StatsRipeText import RIPECaching
import asyncio
class RISPrefixLookup(RIPECaching):
def __init__(self, sourceapp: str='bgpranking-ng', loglevel: int=logging.DEBUG):
super().__init__(sourceapp, loglevel)
self.logger.debug('Starting RIS Prefix fetcher')
def cache_prefix(self, redis_cache, ip, network_info, prefix_overview):
prefix = network_info['prefix']
asns = network_info['asns']
# description = prefix_overview['block']['desc']
# if not description:
# description = prefix_overview['block']['name']
p = redis_cache.pipeline()
for asn in asns:
p.hmset(ip, {'asn': asn, 'prefix': prefix}) # , 'description': description})
p.expire(ip, 43200) # 12H
p.execute()
async def run(self):
redis_cache = Redis(host='localhost', port=6581, db=0, decode_responses=True)
reader, writer = await asyncio.open_connection(self.hostname, self.port)
writer.write(b'-k\n')
while True:
ip = redis_cache.spop('for_ris_lookup')
if not ip: # TODO: add a check against something to stop the loop
self.logger.debug('Nothing to lookup')
await asyncio.sleep(10)
continue
if redis_cache.exists(ip):
self.logger.debug('Already cached: {}'.format(ip))
continue
self.logger.debug('RIS lookup: {}'.format(ip))
to_send = '-d network-info {} sourceapp={}\n'.format(ip, self.sourceapp)
writer.write(to_send.encode())
data = await reader.readuntil(b'\n}\n')
network_info = json.loads(data)
if not network_info.get('prefix'):
self.logger.warning('The IP {} does not seem to be announced'.format(ip))
continue
# self.logger.debug('Prefix lookup: {}'.format(ip))
# to_send = '-d prefix-overview {} sourceapp={}\n'.format(network_info['prefix'], self.sourceapp)
# writer.write(to_send.encode())
# data = await reader.readuntil(b'\n}\n')
# prefix_overview = json.loads(data)
# self.logger.debug('RIS cache prefix info: {}'.format(ip))
# self.cache_prefix(redis_cache, ip, network_info, prefix_overview)
self.cache_prefix(redis_cache, ip, network_info, {})
writer.write(b'-k\n')
writer.close()

View File

@ -10,3 +10,6 @@ git+https://github.com/MISP/PyTaxonomies
git+https://github.com/MISP/PyMISPGalaxies.git git+https://github.com/MISP/PyMISPGalaxies.git
beautifulsoup4 beautifulsoup4
# IPASN web client
git+https://github.com/D4-project/IPASN-History.git/#egg=pyipasnhistory&subdirectory=client

View File

@ -13,7 +13,7 @@ setup(
description='BGP Ranking, the new one..', description='BGP Ranking, the new one..',
packages=['bgpranking'], packages=['bgpranking'],
scripts=['bin/archiver.py', 'bin/dbinsert.py', 'bin/fetcher.py', 'bin/parser.py', scripts=['bin/archiver.py', 'bin/dbinsert.py', 'bin/fetcher.py', 'bin/parser.py',
'bin/loadprefixes.py', 'bin/rislookup.py', 'bin/sanitizer.py', 'bin/run_backend.py', 'bin/ssfetcher.py', 'bin/sanitizer.py', 'bin/run_backend.py', 'bin/ssfetcher.py',
'bin/monitor.py', 'bin/ranking.py', 'bin/asn_descriptions.py', 'bin/start.py', 'bin/stop.py', 'bin/shutdown.py'], 'bin/monitor.py', 'bin/ranking.py', 'bin/asn_descriptions.py', 'bin/start.py', 'bin/stop.py', 'bin/shutdown.py'],
classifiers=[ classifiers=[
'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)',