Generator handles file flushing itself

pull/204/head
Sami Mokaddem 2018-03-09 17:06:00 +01:00
parent f6828c4394
commit e1a9fe1d85
2 changed files with 26 additions and 27 deletions

View File

@ -35,9 +35,6 @@ class RedisToMISPFeed:
self.db = settings.db self.db = settings.db
self.serv = redis.StrictRedis(self.host, self.port, self.db, decode_responses=True) self.serv = redis.StrictRedis(self.host, self.port, self.db, decode_responses=True)
self.flushing_interval = settings.flushing_interval
self.flushing_next = time.time() + self.flushing_interval
self.generator = FeedGenerator() self.generator = FeedGenerator()
self.keynames = [] self.keynames = []
@ -47,13 +44,11 @@ class RedisToMISPFeed:
self.keynameError = settings.keyname_error self.keynameError = settings.keyname_error
self.last_flush = datetime.datetime.now()
self.update_last_action("Init system") self.update_last_action("Init system")
def consume(self): def consume(self):
self.update_last_action("Started consuming redis") self.update_last_action("Started consuming redis")
while True: while True:
flag_empty = True
for key in self.keynames: for key in self.keynames:
while True: while True:
data = self.pop(key) data = self.pop(key)
@ -63,14 +58,6 @@ class RedisToMISPFeed:
self.perform_action(key, data) self.perform_action(key, data)
except Exception as error: except Exception as error:
self.save_error_to_redis(error, data) self.save_error_to_redis(error, data)
flag_empty = False
# Define when to write event on disk
if flag_empty and self.flushing_next <= time.time() and self.last_action_time>self.last_flush:
self.update_last_action('Flushed on disk')
self.generator.flush_event()
self.flushing_next = time.time() + self.flushing_interval
self.last_flush = datetime.datetime.now()
beautyful_sleep(5, self.format_last_action()) beautyful_sleep(5, self.format_last_action())
@ -122,17 +109,15 @@ class RedisToMISPFeed:
self.last_action_time = datetime.datetime.now() self.last_action_time = datetime.datetime.now()
def format_last_action(self): def format_last_action(self):
temp = datetime.datetime.now() - self.last_flush return "Last action: [{}] @ {}".format(
return "Last action: [{}] @ {}.\tLast flush: {} ago".format(
self.last_action, self.last_action,
self.last_action_time.isoformat().replace('T', ' '), self.last_action_time.isoformat().replace('T', ' '),
str(temp).split('.')[0]
) )
def get_buffer_state(self): def get_buffer_state(self):
buffer_state = {'attribute': 0, 'object': 0, 'sighting': 0} buffer_state = {'attribute': 0, 'object': 0, 'sighting': 0}
for k in self.keynames: for k in self.keynames:
_ , suffix = k.rsplit('_', 1) _, suffix = k.rsplit('_', 1)
buffer_state[suffix] += self.serv.llen(k) buffer_state[suffix] += self.serv.llen(k)
return buffer_state return buffer_state
@ -142,6 +127,7 @@ class RedisToMISPFeed:
print('Error:', str(error), '\nOn adding:', item) print('Error:', str(error), '\nOn adding:', item)
self.serv.lpush(self.keynameError, to_push) self.serv.lpush(self.keynameError, to_push)
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Pop item fom redis and add " parser = argparse.ArgumentParser(description="Pop item fom redis and add "
+ "it to the MISP feed. By default, each action are pushed into a " + "it to the MISP feed. By default, each action are pushed into a "

View File

@ -42,19 +42,18 @@ def gen_uuid():
class FeedGenerator: class FeedGenerator:
def __init__(self, auto_flush=60*5): def __init__(self):
"""This object can be use to easily create a daily MISP-feed. """This object can be use to easily create a daily MISP-feed.
It handles the event creation, manifest file and cache file (hashes.csv). It handles the event creation, manifest file and cache file
(hashes.csv).
Attributes:
auto_flush (int) : (In seconds) If a positive value for auto_flush
is provided the FeedGenerator will the current event when
possible if the last flushed time is greater than auto_flush.
""" """
self.sys_templates = get_system_templates() self.sys_templates = get_system_templates()
self.flushing_interval = settings.flushing_interval
self.flushing_next = time.time() + self.flushing_interval
self.manifest = {} self.manifest = {}
self.attributeHashes = [] self.attributeHashes = []
@ -65,12 +64,14 @@ class FeedGenerator:
def add_sighting_on_attribute(self, sight_type, attr_uuid, **data): def add_sighting_on_attribute(self, sight_type, attr_uuid, **data):
self.update_daily_event_id() self.update_daily_event_id()
self.after_addition()
return False return False
def add_attribute_to_event(self, attr_type, attr_value, **attr_data): def add_attribute_to_event(self, attr_type, attr_value, **attr_data):
self.update_daily_event_id() self.update_daily_event_id()
self.current_event.add_attribute(attr_type, attr_value, **attr_data) self.current_event.add_attribute(attr_type, attr_value, **attr_data)
self._add_hash(attr_type, attr_value) self._add_hash(attr_type, attr_value)
self.after_addition()
return True return True
def add_object_to_event(self, obj_name, **data): def add_object_to_event(self, obj_name, **data):
@ -93,8 +94,17 @@ class FeedGenerator:
for attr_type, attr_value in data.items(): for attr_type, attr_value in data.items():
self._add_hash(attr_type, attr_value) self._add_hash(attr_type, attr_value)
self.after_addition()
return True return True
def after_addition(self):
# Write event on disk
now = time.time()
if self.flushing_next <= now:
self.update_last_action('Flushed on disk')
self.flush_event()
self.flushing_next = now + self.flushing_interval
# Cache # Cache
def _add_hash(self, attr_type, attr_value): def _add_hash(self, attr_type, attr_value):
if ('|' in attr_type or attr_type == 'malware-sample'): if ('|' in attr_type or attr_type == 'malware-sample'):
@ -177,11 +187,14 @@ class FeedGenerator:
} }
def get_last_event_from_manifest(self): def get_last_event_from_manifest(self):
"""Retreive last event from the manifest, if the manifest doesn't """Retreive last event from the manifest.
exists or if it is empty, initialize it.
If the manifest doesn't exists or if it is empty, initialize it.
""" """
try: try:
with open(os.path.join(settings.outputdir, 'manifest.json'), 'r') as f: manifest_path = os.path.join(settings.outputdir, 'manifest.json')
with open(manifest_path, 'r') as f:
man = json.load(f) man = json.load(f)
dated_events = [] dated_events = []
for event_uuid, event_json in man.items(): for event_uuid, event_json in man.items():