AIL-framework/bin/import/importer.py

111 lines
3.1 KiB
Python
Executable File

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import importlib
import json
import redis
import sys
import time
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
# Import all receiver
#from all_json_receiver import *
#### CONFIG ####
config_loader = ConfigLoader.ConfigLoader()
server_cache = config_loader.get_redis_conn("Redis_Log_submit")
r_serv_db = config_loader.get_redis_conn("ARDB_DB")
config_loader = None
DEFAULT_FEEDER_NAME = 'Default_json'
#### ------ ####
def reload_json_importer_list():
global importer_list
importer_json_dir = os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')
importer_list = [f[:-3] for f in os.listdir(importer_json_dir) if os.path.isfile(os.path.join(importer_json_dir, f))]
# init importer list
importer_list = []
reload_json_importer_list()
#### FUNCTIONS ####
def get_json_importer_list():
return importer_list
def add_json_to_json_queue(json_item):
json_item = json.dumps(json_item)
r_serv_db.rpush('importer:json:item', json_item)
def get_json_item_to_import():
return r_serv_db.lpop('importer:json:item')
def get_json_receiver_class(feeder_name_in):
global importer_list
# sanitize class name
feeder_name = feeder_name_in[:1].upper() + feeder_name_in[1:]
feeder_name = feeder_name.replace('-', '_')
if feeder_name is None or feeder_name not in get_json_importer_list():
reload_json_importer_list() # add refresh timing ?
if feeder_name not in get_json_importer_list():
print('Unknow feeder: {}'.format(feeder_name_in))
feeder_name = 'Default_json'
# avoid subpackages
parts = feeder_name.split('.')
module = 'ail_json_importer.{}'.format(parts[-1])
# import json importer class
try:
mod = importlib.import_module(module)
except:
raise
mod = importlib.import_module(module)
class_name = getattr(mod, feeder_name)
return class_name
def get_json_source(json_item):
return json_item.get('source', DEFAULT_FEEDER_NAME)
def process_json(importer_obj, process):
item_id = importer_obj.get_item_id()
if 'meta' in importer_obj.get_json_file():
importer_obj.process_json_meta(process, item_id)
# send data to queue
send_item_to_ail_queue(item_id, importer_obj.get_item_gzip64encoded_content(), importer_obj.get_feeder_name(), process)
def send_item_to_ail_queue(item_id, gzip64encoded_content, feeder_name, process):
# Send item to queue
# send paste to Global
relay_message = "{0} {1}".format(item_id, gzip64encoded_content)
process.populate_set_out(relay_message, 'Mixer')
# increase nb of paste by feeder name
server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1)
#### ---- ####
#### API ####
def api_import_json_item(data_json):
if not data_json:
return ({'status': 'error', 'reason': 'Malformed JSON'}, 400)
# # TODO: add JSON verification
res = add_json_to_json_queue(data_json)
if res:
return ({'status': 'error'}, 400)
return ({'status': 'success'}, 200)