From e1a9fe1d85b5f243bbb9b8cddf9ef88b5ff22b3d Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Fri, 9 Mar 2018 17:06:00 +0100 Subject: [PATCH] Generator handles file flushing itself --- .../feed-generator-from-redis/fromredis.py | 20 ++--------- .../feed-generator-from-redis/generator.py | 33 +++++++++++++------ 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/examples/feed-generator-from-redis/fromredis.py b/examples/feed-generator-from-redis/fromredis.py index f4f68b7..38f33cc 100755 --- a/examples/feed-generator-from-redis/fromredis.py +++ b/examples/feed-generator-from-redis/fromredis.py @@ -35,9 +35,6 @@ class RedisToMISPFeed: self.db = settings.db self.serv = redis.StrictRedis(self.host, self.port, self.db, decode_responses=True) - self.flushing_interval = settings.flushing_interval - self.flushing_next = time.time() + self.flushing_interval - self.generator = FeedGenerator() self.keynames = [] @@ -47,13 +44,11 @@ class RedisToMISPFeed: self.keynameError = settings.keyname_error - self.last_flush = datetime.datetime.now() self.update_last_action("Init system") def consume(self): self.update_last_action("Started consuming redis") while True: - flag_empty = True for key in self.keynames: while True: data = self.pop(key) @@ -63,14 +58,6 @@ class RedisToMISPFeed: self.perform_action(key, data) except Exception as error: self.save_error_to_redis(error, data) - flag_empty = False - - # Define when to write event on disk - if flag_empty and self.flushing_next <= time.time() and self.last_action_time>self.last_flush: - self.update_last_action('Flushed on disk') - self.generator.flush_event() - self.flushing_next = time.time() + self.flushing_interval - self.last_flush = datetime.datetime.now() beautyful_sleep(5, self.format_last_action()) @@ -122,17 +109,15 @@ class RedisToMISPFeed: self.last_action_time = datetime.datetime.now() def format_last_action(self): - temp = datetime.datetime.now() - self.last_flush - return "Last action: [{}] @ {}.\tLast flush: {} ago".format( + return "Last action: [{}] @ {}".format( self.last_action, self.last_action_time.isoformat().replace('T', ' '), - str(temp).split('.')[0] ) def get_buffer_state(self): buffer_state = {'attribute': 0, 'object': 0, 'sighting': 0} for k in self.keynames: - _ , suffix = k.rsplit('_', 1) + _, suffix = k.rsplit('_', 1) buffer_state[suffix] += self.serv.llen(k) return buffer_state @@ -142,6 +127,7 @@ class RedisToMISPFeed: print('Error:', str(error), '\nOn adding:', item) self.serv.lpush(self.keynameError, to_push) + if __name__ == '__main__': parser = argparse.ArgumentParser(description="Pop item fom redis and add " + "it to the MISP feed. By default, each action are pushed into a " diff --git a/examples/feed-generator-from-redis/generator.py b/examples/feed-generator-from-redis/generator.py index 6683109..7e232fc 100755 --- a/examples/feed-generator-from-redis/generator.py +++ b/examples/feed-generator-from-redis/generator.py @@ -42,19 +42,18 @@ def gen_uuid(): class FeedGenerator: - def __init__(self, auto_flush=60*5): + def __init__(self): """This object can be use to easily create a daily MISP-feed. - It handles the event creation, manifest file and cache file (hashes.csv). - - Attributes: - auto_flush (int) : (In seconds) If a positive value for auto_flush - is provided the FeedGenerator will the current event when - possible if the last flushed time is greater than auto_flush. + It handles the event creation, manifest file and cache file + (hashes.csv). """ self.sys_templates = get_system_templates() + self.flushing_interval = settings.flushing_interval + self.flushing_next = time.time() + self.flushing_interval + self.manifest = {} self.attributeHashes = [] @@ -65,12 +64,14 @@ class FeedGenerator: def add_sighting_on_attribute(self, sight_type, attr_uuid, **data): self.update_daily_event_id() + self.after_addition() return False def add_attribute_to_event(self, attr_type, attr_value, **attr_data): self.update_daily_event_id() self.current_event.add_attribute(attr_type, attr_value, **attr_data) self._add_hash(attr_type, attr_value) + self.after_addition() return True def add_object_to_event(self, obj_name, **data): @@ -93,8 +94,17 @@ class FeedGenerator: for attr_type, attr_value in data.items(): self._add_hash(attr_type, attr_value) + self.after_addition() return True + def after_addition(self): + # Write event on disk + now = time.time() + if self.flushing_next <= now: + self.update_last_action('Flushed on disk') + self.flush_event() + self.flushing_next = now + self.flushing_interval + # Cache def _add_hash(self, attr_type, attr_value): if ('|' in attr_type or attr_type == 'malware-sample'): @@ -177,11 +187,14 @@ class FeedGenerator: } def get_last_event_from_manifest(self): - """Retreive last event from the manifest, if the manifest doesn't - exists or if it is empty, initialize it. + """Retreive last event from the manifest. + + If the manifest doesn't exists or if it is empty, initialize it. + """ try: - with open(os.path.join(settings.outputdir, 'manifest.json'), 'r') as f: + manifest_path = os.path.join(settings.outputdir, 'manifest.json') + with open(manifest_path, 'r') as f: man = json.load(f) dated_events = [] for event_uuid, event_json in man.items():