chg: Improve shadow server import, support network in sanitizer.

pull/17/head
Raphaël Vinot 2022-01-03 13:38:12 +01:00
parent 203e8f56ab
commit a8b62cff9c
2 changed files with 87 additions and 31 deletions

View File

@ -6,7 +6,7 @@ import logging
import time import time
from datetime import timezone from datetime import timezone
from typing import Optional, List from typing import Optional, List, Dict
from dateutil import parser from dateutil import parser
from redis import Redis from redis import Redis
@ -29,6 +29,71 @@ class Sanitizer(AbstractManager):
self.ipasn = get_ipasn() self.ipasn = get_ipasn()
self.logger.debug('Starting import') self.logger.debug('Starting import')
def _sanitize_ip(self, pipeline: Redis, uuid: str, data: Dict) -> Optional[Dict]:
try:
ip = ipaddress.ip_address(data['ip'])
if isinstance(ip, ipaddress.IPv6Address):
address_family = 'v6'
else:
address_family = 'v4'
except ValueError:
self.logger.info(f"Invalid IP address: {data['ip']}")
return None
except KeyError:
self.logger.info(f"Invalid entry {data}")
return None
if not ip.is_global:
self.logger.info(f"The IP address {data['ip']} is not global")
return None
datetime = parser.parse(data['datetime'])
if datetime.tzinfo:
# Make sure the datetime isn't TZ aware, and UTC.
datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None)
# Add to temporay DB for further processing
pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family,
'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()})
pipeline.sadd('to_insert', uuid)
return {'ip': str(ip), 'address_family': address_family, 'source': 'caida',
'date': datetime.isoformat(), 'precision_delta': {'days': 3}}
def _sanitize_network(self, pipeline: Redis, uuid: str, data: Dict) -> List[Dict]:
try:
network = ipaddress.ip_network(data['ip'])
if isinstance(network, ipaddress.IPv6Network):
address_family = 'v6'
else:
address_family = 'v4'
except ValueError:
self.logger.info(f"Invalid IP network: {data['ip']}")
return []
except KeyError:
self.logger.info(f"Invalid entry {data}")
return []
datetime = parser.parse(data['datetime'])
if datetime.tzinfo:
# Make sure the datetime isn't TZ aware, and UTC.
datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None)
for_cache = []
for ip in network.hosts():
if not ip.is_global:
self.logger.info(f"The IP address {ip} is not global")
continue
# Add to temporay DB for further processing
pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family,
'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()})
pipeline.sadd('to_insert', uuid)
for_cache.append({'ip': str(ip), 'address_family': address_family, 'source': 'caida',
'date': datetime.isoformat(), 'precision_delta': {'days': 3}})
return for_cache
def sanitize(self): def sanitize(self):
ready, message = sanity_check_ipasn(self.ipasn) ready, message = sanity_check_ipasn(self.ipasn)
if not ready: if not ready:
@ -55,35 +120,15 @@ class Sanitizer(AbstractManager):
data = self.redis_intake.hgetall(uuid) data = self.redis_intake.hgetall(uuid)
if not data: if not data:
continue continue
try: if '/' in data['ip']:
ip = ipaddress.ip_address(data['ip']) entries_for_cache = self._sanitize_network(pipeline, uuid, data)
if isinstance(ip, ipaddress.IPv6Address): if entries_for_cache:
address_family = 'v6' for_cache += entries_for_cache
else: else:
address_family = 'v4' entry_for_cache = self._sanitize_ip(pipeline, uuid, data)
except ValueError: if entry_for_cache:
self.logger.info(f"Invalid IP address: {data['ip']}") for_cache.append(entry_for_cache)
continue
except KeyError:
self.logger.info(f"Invalid entry {data}")
continue
if not ip.is_global:
self.logger.info(f"The IP address {data['ip']} is not global")
continue
datetime = parser.parse(data['datetime'])
if datetime.tzinfo:
# Make sure the datetime isn't TZ aware, and UTC.
datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None)
for_cache.append({'ip': str(ip), 'address_family': address_family, 'source': 'caida',
'date': datetime.isoformat(), 'precision_delta': {'days': 3}})
# Add to temporay DB for further processing
pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family,
'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()})
pipeline.sadd('to_insert', uuid)
pipeline.execute() pipeline.execute()
self.redis_intake.delete(*uuids) self.redis_intake.delete(*uuids)

View File

@ -35,10 +35,11 @@ class ShadowServerFetcher():
self.password = password self.password = password
self.index_page = 'https://dl.shadowserver.org/reports/index.php' self.index_page = 'https://dl.shadowserver.org/reports/index.php'
self.vendor = 'shadowserver' self.vendor = 'shadowserver'
self.known_list_types = ('blacklist', 'botnet', 'cc', 'cisco', 'cwsandbox', 'drone', self.known_list_types = ('blacklist', 'blocklist', 'botnet', 'cc', 'cisco', 'cwsandbox',
'device', 'drone', 'event4', 'malware', 'scan6',
'microsoft', 'scan', 'sinkhole6', 'sinkhole', 'outdated', 'microsoft', 'scan', 'sinkhole6', 'sinkhole', 'outdated',
'compromised', 'hp', 'darknet', 'ddos') 'compromised', 'hp', 'darknet', 'ddos')
self.first_available_day: date self.first_available_day: Optional[date] = None
self.last_available_day: date self.last_available_day: date
self.available_entries: Dict[str, List[Tuple[str, str]]] = {} self.available_entries: Dict[str, List[Tuple[str, str]]] = {}
@ -109,8 +110,12 @@ class ShadowServerFetcher():
if main_type == 'blacklist': if main_type == 'blacklist':
config['impact'] = 5 config['impact'] = 5
elif main_type == 'blocklist':
config['impact'] = 5
elif main_type == 'botnet': elif main_type == 'botnet':
config['impact'] = 2 config['impact'] = 2
elif main_type == 'malware':
config['impact'] = 2
elif main_type == 'cc': elif main_type == 'cc':
config['impact'] = 5 config['impact'] = 5
elif main_type == 'cisco': elif main_type == 'cisco':
@ -123,10 +128,16 @@ class ShadowServerFetcher():
config['impact'] = 3 config['impact'] = 3
elif main_type == 'scan': elif main_type == 'scan':
config['impact'] = 1 config['impact'] = 1
elif main_type == 'scan6':
config['impact'] = 1
elif main_type == 'sinkhole6': elif main_type == 'sinkhole6':
config['impact'] = 2 config['impact'] = 2
elif main_type == 'sinkhole': elif main_type == 'sinkhole':
config['impact'] = 2 config['impact'] = 2
elif main_type == 'device':
config['impact'] = 1
elif main_type == 'event4':
config['impact'] = 2
else: else:
config['impact'] = 1 config['impact'] = 1