2018-03-29 22:37:28 +02:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
from dateutil import parser
|
|
|
|
import logging
|
|
|
|
from redis import StrictRedis
|
2018-04-05 14:36:01 +02:00
|
|
|
from .libs.helpers import shutdown_requested, set_running, unset_running, get_socket_path
|
2018-03-29 22:37:28 +02:00
|
|
|
|
|
|
|
import ipaddress
|
|
|
|
|
|
|
|
|
|
|
|
class Sanitizer():
|
|
|
|
|
|
|
|
def __init__(self, loglevel: int=logging.DEBUG):
|
|
|
|
self.__init_logger(loglevel)
|
2018-04-05 14:36:01 +02:00
|
|
|
self.redis_intake = StrictRedis(unix_socket_path=get_socket_path('intake'), db=0, decode_responses=True)
|
|
|
|
self.redis_sanitized = StrictRedis(unix_socket_path=get_socket_path('prepare'), db=0, decode_responses=True)
|
|
|
|
self.ris_cache = StrictRedis(unix_socket_path=get_socket_path('ris'), db=0, decode_responses=True)
|
2018-03-29 22:37:28 +02:00
|
|
|
self.logger.debug('Starting import')
|
|
|
|
|
|
|
|
def __init_logger(self, loglevel):
|
2018-04-10 00:20:59 +02:00
|
|
|
self.logger = logging.getLogger(f'{self.__class__.__name__}')
|
2018-03-29 22:37:28 +02:00
|
|
|
self.logger.setLevel(loglevel)
|
|
|
|
|
|
|
|
def sanitize(self):
|
|
|
|
set_running(self.__class__.__name__)
|
|
|
|
while True:
|
|
|
|
if shutdown_requested():
|
|
|
|
break
|
|
|
|
uuids = self.redis_intake.spop('intake', 100)
|
|
|
|
if not uuids:
|
|
|
|
break
|
|
|
|
for_ris_lookup = []
|
|
|
|
pipeline = self.redis_sanitized.pipeline(transaction=False)
|
|
|
|
for uuid in uuids:
|
|
|
|
data = self.redis_intake.hgetall(uuid)
|
|
|
|
try:
|
|
|
|
ip = ipaddress.ip_address(data['ip'])
|
|
|
|
except ValueError:
|
2018-04-10 00:20:59 +02:00
|
|
|
self.logger.info(f"Invalid IP address: {data['ip']}")
|
2018-03-29 22:37:28 +02:00
|
|
|
continue
|
|
|
|
if not ip.is_global:
|
2018-04-10 00:20:59 +02:00
|
|
|
self.logger.info(f"The IP address {data['ip']} is not global")
|
2018-03-29 22:37:28 +02:00
|
|
|
continue
|
|
|
|
|
|
|
|
date = parser.parse(data['datetime']).date().isoformat()
|
|
|
|
# NOTE: to consider: discard data with an old timestamp (define old)
|
|
|
|
|
|
|
|
# Add to temporay DB for further processing
|
|
|
|
for_ris_lookup.append(str(ip))
|
|
|
|
pipeline.hmset(uuid, {'ip': str(ip), 'source': data['source'],
|
|
|
|
'date': date, 'datetime': data['datetime']})
|
|
|
|
pipeline.sadd('to_insert', uuid)
|
|
|
|
pipeline.execute()
|
2018-04-10 11:27:52 +02:00
|
|
|
self.redis_intake.delete(*uuids)
|
2018-03-29 22:37:28 +02:00
|
|
|
self.ris_cache.sadd('for_ris_lookup', *for_ris_lookup)
|
|
|
|
unset_running(self.__class__.__name__)
|