diff --git a/bin/sanitizer.py b/bin/sanitizer.py index cd8bbb5..9cec03b 100755 --- a/bin/sanitizer.py +++ b/bin/sanitizer.py @@ -6,7 +6,7 @@ import logging import time from datetime import timezone -from typing import Optional, List +from typing import Optional, List, Dict from dateutil import parser from redis import Redis @@ -29,6 +29,71 @@ class Sanitizer(AbstractManager): self.ipasn = get_ipasn() self.logger.debug('Starting import') + def _sanitize_ip(self, pipeline: Redis, uuid: str, data: Dict) -> Optional[Dict]: + try: + ip = ipaddress.ip_address(data['ip']) + if isinstance(ip, ipaddress.IPv6Address): + address_family = 'v6' + else: + address_family = 'v4' + except ValueError: + self.logger.info(f"Invalid IP address: {data['ip']}") + return None + except KeyError: + self.logger.info(f"Invalid entry {data}") + return None + + if not ip.is_global: + self.logger.info(f"The IP address {data['ip']} is not global") + return None + + datetime = parser.parse(data['datetime']) + if datetime.tzinfo: + # Make sure the datetime isn't TZ aware, and UTC. + datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None) + + # Add to temporay DB for further processing + pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family, + 'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()}) + pipeline.sadd('to_insert', uuid) + + return {'ip': str(ip), 'address_family': address_family, 'source': 'caida', + 'date': datetime.isoformat(), 'precision_delta': {'days': 3}} + + def _sanitize_network(self, pipeline: Redis, uuid: str, data: Dict) -> List[Dict]: + try: + network = ipaddress.ip_network(data['ip']) + if isinstance(network, ipaddress.IPv6Network): + address_family = 'v6' + else: + address_family = 'v4' + except ValueError: + self.logger.info(f"Invalid IP network: {data['ip']}") + return [] + except KeyError: + self.logger.info(f"Invalid entry {data}") + return [] + + datetime = parser.parse(data['datetime']) + if datetime.tzinfo: + # Make sure the datetime isn't TZ aware, and UTC. + datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None) + + for_cache = [] + for ip in network.hosts(): + if not ip.is_global: + self.logger.info(f"The IP address {ip} is not global") + continue + + # Add to temporay DB for further processing + pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family, + 'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()}) + pipeline.sadd('to_insert', uuid) + + for_cache.append({'ip': str(ip), 'address_family': address_family, 'source': 'caida', + 'date': datetime.isoformat(), 'precision_delta': {'days': 3}}) + return for_cache + def sanitize(self): ready, message = sanity_check_ipasn(self.ipasn) if not ready: @@ -55,35 +120,15 @@ class Sanitizer(AbstractManager): data = self.redis_intake.hgetall(uuid) if not data: continue - try: - ip = ipaddress.ip_address(data['ip']) - if isinstance(ip, ipaddress.IPv6Address): - address_family = 'v6' - else: - address_family = 'v4' - except ValueError: - self.logger.info(f"Invalid IP address: {data['ip']}") - continue - except KeyError: - self.logger.info(f"Invalid entry {data}") - continue + if '/' in data['ip']: + entries_for_cache = self._sanitize_network(pipeline, uuid, data) + if entries_for_cache: + for_cache += entries_for_cache + else: + entry_for_cache = self._sanitize_ip(pipeline, uuid, data) + if entry_for_cache: + for_cache.append(entry_for_cache) - if not ip.is_global: - self.logger.info(f"The IP address {data['ip']} is not global") - continue - - datetime = parser.parse(data['datetime']) - if datetime.tzinfo: - # Make sure the datetime isn't TZ aware, and UTC. - datetime = datetime.astimezone(timezone.utc).replace(tzinfo=None) - - for_cache.append({'ip': str(ip), 'address_family': address_family, 'source': 'caida', - 'date': datetime.isoformat(), 'precision_delta': {'days': 3}}) - - # Add to temporay DB for further processing - pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'], 'address_family': address_family, - 'date': datetime.date().isoformat(), 'datetime': datetime.isoformat()}) - pipeline.sadd('to_insert', uuid) pipeline.execute() self.redis_intake.delete(*uuids) diff --git a/bin/ssfetcher.py b/bin/ssfetcher.py index f077e01..36a3e86 100755 --- a/bin/ssfetcher.py +++ b/bin/ssfetcher.py @@ -35,10 +35,11 @@ class ShadowServerFetcher(): self.password = password self.index_page = 'https://dl.shadowserver.org/reports/index.php' self.vendor = 'shadowserver' - self.known_list_types = ('blacklist', 'botnet', 'cc', 'cisco', 'cwsandbox', 'drone', + self.known_list_types = ('blacklist', 'blocklist', 'botnet', 'cc', 'cisco', 'cwsandbox', + 'device', 'drone', 'event4', 'malware', 'scan6', 'microsoft', 'scan', 'sinkhole6', 'sinkhole', 'outdated', 'compromised', 'hp', 'darknet', 'ddos') - self.first_available_day: date + self.first_available_day: Optional[date] = None self.last_available_day: date self.available_entries: Dict[str, List[Tuple[str, str]]] = {} @@ -109,8 +110,12 @@ class ShadowServerFetcher(): if main_type == 'blacklist': config['impact'] = 5 + elif main_type == 'blocklist': + config['impact'] = 5 elif main_type == 'botnet': config['impact'] = 2 + elif main_type == 'malware': + config['impact'] = 2 elif main_type == 'cc': config['impact'] = 5 elif main_type == 'cisco': @@ -123,10 +128,16 @@ class ShadowServerFetcher(): config['impact'] = 3 elif main_type == 'scan': config['impact'] = 1 + elif main_type == 'scan6': + config['impact'] = 1 elif main_type == 'sinkhole6': config['impact'] = 2 elif main_type == 'sinkhole': config['impact'] = 2 + elif main_type == 'device': + config['impact'] = 1 + elif main_type == 'event4': + config['impact'] = 2 else: config['impact'] = 1