mirror of https://github.com/CIRCL/AIL-framework
90 lines
2.3 KiB
Python
90 lines
2.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*-coding:UTF-8 -*
|
||
|
|
||
|
"""
|
||
|
The SYNC Module
|
||
|
================================
|
||
|
|
||
|
This module .
|
||
|
|
||
|
"""
|
||
|
|
||
|
##################################
|
||
|
# Import External packages
|
||
|
##################################
|
||
|
import json
|
||
|
import os
|
||
|
import sys
|
||
|
import time
|
||
|
|
||
|
sys.path.append(os.environ['AIL_BIN'])
|
||
|
##################################
|
||
|
# Import Project packages
|
||
|
##################################
|
||
|
from core import ail_2_ail
|
||
|
from modules.abstract_module import AbstractModule
|
||
|
from packages.Item import Item
|
||
|
from packages import Tag
|
||
|
|
||
|
|
||
|
class Sync_importer(AbstractModule):
|
||
|
"""
|
||
|
Tags module for AIL framework
|
||
|
"""
|
||
|
|
||
|
def __init__(self):
|
||
|
super(Sync_importer, self).__init__()
|
||
|
|
||
|
# Waiting time in secondes between to message proccessed
|
||
|
self.pending_seconds = 10
|
||
|
|
||
|
#self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
|
||
|
#self.last_refresh = time.time()
|
||
|
|
||
|
# Send module state to logs
|
||
|
self.redis_logger.info(f'Module {self.module_name} Launched')
|
||
|
|
||
|
|
||
|
def run(self):
|
||
|
while self.proceed:
|
||
|
### REFRESH DICT
|
||
|
# if self.last_refresh < ail_2_ail.get_last_updated_ail_instance():
|
||
|
# self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
|
||
|
# self.last_refresh = time.time()
|
||
|
|
||
|
ail_stream = ail_2_ail.get_sync_importer_ail_stream()
|
||
|
if ail_stream:
|
||
|
ail_stream = json.loads(ail_stream)
|
||
|
self.compute(ail_stream)
|
||
|
|
||
|
else:
|
||
|
self.computeNone()
|
||
|
# Wait before next process
|
||
|
self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
|
||
|
time.sleep(self.pending_seconds)
|
||
|
|
||
|
|
||
|
def compute(self, ail_stream):
|
||
|
|
||
|
# # TODO: SANITYZE AIL STREAM
|
||
|
# # TODO: CHECK FILTER
|
||
|
|
||
|
# import Object
|
||
|
b64_gzip_content = ail_stream['payload']['raw']
|
||
|
|
||
|
# # TODO: create default id
|
||
|
item_id = ail_stream['meta']['ail:id'] + 'test'
|
||
|
|
||
|
message = f'{item_id} {b64_gzip_content}'
|
||
|
print(message)
|
||
|
self.send_message_to_queue(message, 'Mixer')
|
||
|
|
||
|
# # increase nb of paste by feeder name
|
||
|
# server_cache.hincrby("mixer_cache:list_feeder", Sync, 1)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
|
||
|
module = Sync_importer()
|
||
|
module.run()
|