mirror of https://github.com/MISP/PyMISP
153 lines
5.0 KiB
Python
153 lines
5.0 KiB
Python
![]() |
#!/usr/bin/env python
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import sys
|
||
|
import json
|
||
|
import argparse
|
||
|
import datetime
|
||
|
import time
|
||
|
import redis
|
||
|
|
||
|
import settings
|
||
|
|
||
|
from generator import FeedGenerator
|
||
|
|
||
|
|
||
|
def beautyful_sleep(sleep, additional):
|
||
|
length = 20
|
||
|
sleeptime = float(sleep) / float(length)
|
||
|
for i in range(length):
|
||
|
temp_string = '|'*i + ' '*(length-i-1)
|
||
|
print('sleeping [{}]\t{}'.format(temp_string, additional), end='\r', sep='')
|
||
|
sys.stdout.flush()
|
||
|
time.sleep(sleeptime)
|
||
|
|
||
|
|
||
|
class RedisToMISPFeed:
|
||
|
SUFFIX_SIGH = '_sighting'
|
||
|
SUFFIX_ATTR = '_attribute'
|
||
|
SUFFIX_OBJ = '_object'
|
||
|
SUFFIX_LIST = [SUFFIX_SIGH, SUFFIX_ATTR, SUFFIX_OBJ]
|
||
|
|
||
|
def __init__(self):
|
||
|
self.host = settings.host
|
||
|
self.port = settings.port
|
||
|
self.db = settings.db
|
||
|
self.serv = redis.StrictRedis(self.host, self.port, self.db, decode_responses=True)
|
||
|
|
||
|
self.flushing_interval = settings.flushing_interval
|
||
|
self.flushing_next = time.time() + self.flushing_interval
|
||
|
|
||
|
self.generator = FeedGenerator()
|
||
|
|
||
|
self.keynames = []
|
||
|
for k in settings.keyname_pop:
|
||
|
for s in self.SUFFIX_LIST:
|
||
|
self.keynames.append(k+s)
|
||
|
|
||
|
self.keynameError = settings.keyname_error
|
||
|
|
||
|
self.last_flush = datetime.datetime.now()
|
||
|
self.update_last_action("Init system")
|
||
|
|
||
|
def consume(self):
|
||
|
self.update_last_action("Started consuming redis")
|
||
|
while True:
|
||
|
flag_empty = True
|
||
|
for key in self.keynames:
|
||
|
while True:
|
||
|
data = self.pop(key)
|
||
|
if data is None:
|
||
|
break
|
||
|
try:
|
||
|
self.perform_action(key, data)
|
||
|
except Exception as error:
|
||
|
self.save_error_to_redis(error, data)
|
||
|
flag_empty = False
|
||
|
|
||
|
# Define when to write event on disk
|
||
|
if flag_empty and self.flushing_next <= time.time() and self.last_action_time>self.last_flush:
|
||
|
self.update_last_action('Flushed on disk')
|
||
|
self.generator.flush_event()
|
||
|
self.flushing_next = time.time() + self.flushing_interval
|
||
|
self.last_flush = datetime.datetime.now()
|
||
|
|
||
|
beautyful_sleep(5, self.format_last_action())
|
||
|
|
||
|
def pop(self, key):
|
||
|
popped = self.serv.rpop(key)
|
||
|
if popped is None:
|
||
|
return None
|
||
|
try:
|
||
|
popped = json.loads(popped)
|
||
|
except ValueError as error:
|
||
|
self.save_error_to_redis(error, popped)
|
||
|
except ValueError as error:
|
||
|
self.save_error_to_redis(error, popped)
|
||
|
return popped
|
||
|
|
||
|
def perform_action(self, key, data):
|
||
|
# sighting
|
||
|
if key.endswith(self.SUFFIX_SIGH):
|
||
|
if self.generator.add_sighting_on_attribute():
|
||
|
self.update_last_action("Added sighting")
|
||
|
else:
|
||
|
self.update_last_action("Error while adding sighting")
|
||
|
|
||
|
# attribute
|
||
|
elif key.endswith(self.SUFFIX_ATTR):
|
||
|
attr_type = data.pop('type')
|
||
|
attr_value = data.pop('value')
|
||
|
if self.generator.add_attribute_to_event(attr_type, attr_value, **data):
|
||
|
self.update_last_action("Added attribute")
|
||
|
else:
|
||
|
self.update_last_action("Error while adding attribute")
|
||
|
|
||
|
# object
|
||
|
elif key.endswith(self.SUFFIX_OBJ):
|
||
|
# create the MISP object
|
||
|
obj_name = data.pop('name')
|
||
|
if self.generator.add_object_to_event(obj_name, **data):
|
||
|
self.update_last_action("Added object")
|
||
|
else:
|
||
|
self.update_last_action("Error while adding object")
|
||
|
|
||
|
else:
|
||
|
# Suffix not valid
|
||
|
self.update_last_action("Redis key suffix not supported")
|
||
|
|
||
|
# OTHERS
|
||
|
def update_last_action(self, action):
|
||
|
self.last_action = action
|
||
|
self.last_action_time = datetime.datetime.now()
|
||
|
|
||
|
def format_last_action(self):
|
||
|
temp = datetime.datetime.now() - self.last_flush
|
||
|
return "Last action: [{}] @ {}.\tLast flush: {} ago".format(
|
||
|
self.last_action,
|
||
|
self.last_action_time.isoformat().replace('T', ' '),
|
||
|
str(temp).split('.')[0]
|
||
|
)
|
||
|
|
||
|
def get_buffer_state(self):
|
||
|
buffer_state = {'attribute': 0, 'object': 0, 'sighting': 0}
|
||
|
for k in self.keynames:
|
||
|
_ , suffix = k.rsplit('_', 1)
|
||
|
buffer_state[suffix] += self.serv.llen(k)
|
||
|
return buffer_state
|
||
|
|
||
|
|
||
|
def save_error_to_redis(self, error, item):
|
||
|
to_push = {'error': str(error), 'item': str(item)}
|
||
|
print('Error:', str(error), '\nOn adding:', item)
|
||
|
self.serv.lpush(self.keynameError, to_push)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
parser = argparse.ArgumentParser(description="Pop item fom redis and add "
|
||
|
+ "it to the MISP feed. By default, each action are pushed into a "
|
||
|
+ "daily named event. Configuration taken from the file settings.py.")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
redisToMISP = RedisToMISPFeed()
|
||
|
redisToMISP.consume()
|