AIL-framework/bin/core/Sync_importer.py

89 lines
2.3 KiB
Python
Executable File

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The SYNC Module
================================
This module .
"""
##################################
# Import External packages
##################################
import json
import os
import sys
import time
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from core import ail_2_ail
from modules.abstract_module import AbstractModule
from lib.objects.Items import Item
#### CONFIG ####
# config_loader = ConfigLoader()
#
# config_loader = None
#### ------ ####
class Sync_importer(AbstractModule):
"""
Sync_importer module for AIL framework
"""
def __init__(self):
super(Sync_importer, self).__init__()
# Waiting time in seconds between to message processed
self.pending_seconds = 10
# self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
# self.last_refresh = time.time()
# Send module state to logs
self.logger.info(f'Module {self.module_name} Launched')
def run(self):
while self.proceed:
### REFRESH DICT
# if self.last_refresh < ail_2_ail.get_last_updated_ail_instance():
# self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
# self.last_refresh = time.time()
ail_stream = ail_2_ail.get_sync_importer_ail_stream()
if ail_stream:
ail_stream = json.loads(ail_stream)
self.compute(ail_stream)
else:
self.computeNone()
# Wait before next process
self.logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
time.sleep(self.pending_seconds)
def compute(self, ail_stream):
# # TODO: SANITYZE AIL STREAM
# # TODO: CHECK FILTER
# import Object
b64_gzip_content = ail_stream['payload']['raw']
# # TODO: create default id
item_id = ail_stream['meta']['ail:id']
item = Item(item_id)
message = f'sync {b64_gzip_content}'
print(item.id)
self.add_message_to_queue(obj=item, message=message, queue='Importers')
if __name__ == '__main__':
module = Sync_importer()
module.run()