diff --git a/bin/Global.py b/bin/Global.py index ff7e8e52..41781a8e 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -119,6 +119,12 @@ if __name__ == '__main__': # decode compressed base64 decoded = base64.standard_b64decode(gzip64encoded) + try: + new_file_content = gunzip_bytes_obj(decoded) + except OSError as e: + print('{}, {}'.format(filename, e)) + publisher.warning('Global; Invalid Gzip file: {}, {}'.format(filename, e)) + continue # check if file exist if os.path.isfile(filename): @@ -143,7 +149,6 @@ if __name__ == '__main__': curr_file_md5 = hashlib.md5(curr_file_content).hexdigest() - new_file_content = gunzip_bytes_obj(decoded) new_file_md5 = hashlib.md5(new_file_content).hexdigest() if new_file_md5 != curr_file_md5: diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index b20153cf..ace54f59 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -148,6 +148,11 @@ function launching_scripts { sleep 0.1 echo -e $GREEN"\t* Launching scripts"$DEFAULT + # LAUNCH CORE MODULE + screen -S "Script_AIL" -X screen -t "JSON_importer" bash -c "cd ${AIL_BIN}/import; ${ENV_PY} ./JSON_importer.py; read x" + sleep 0.1 + + screen -S "Script_AIL" -X screen -t "ModuleInformation" bash -c "cd ${AIL_BIN}; ${ENV_PY} ./ModulesInformationV2.py -k 0 -c 1; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Mixer" bash -c "cd ${AIL_BIN}; ${ENV_PY} ./Mixer.py; read x" diff --git a/bin/import/JSON_importer.py b/bin/import/JSON_importer.py new file mode 100755 index 00000000..de0ca574 --- /dev/null +++ b/bin/import/JSON_importer.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The JSON Receiver Module +================ + +Recieve Json Items (example: Twitter feeder) + +""" +import os +import json +import redis +import sys +import time + +sys.path.append(os.environ['AIL_BIN']) +from Helper import Process +from pubsublogger import publisher + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +import importer + + +if __name__ == '__main__': + publisher.port = 6380 + publisher.channel = 'Script' + + config_section = 'Importer_Json' + + process = Process(config_section) + + config_loader = ConfigLoader.ConfigLoader() + + # REDIS # + server_cache = config_loader.get_redis_conn("Redis_Log_submit") + config_loader = None + + # LOGGING # + publisher.info("JSON Feed Script started to receive & publish.") + + # OTHER CONFIG # + DEFAULT_FEEDER_NAME = 'Unknow Feeder' + + while True: + + json_item = importer.get_json_item_to_import() + if json_item: + + json_item = json.loads(json_item) + feeder_name = importer.get_json_source(json_item) + print('importing: {} feeder'.format(feeder_name)) + + json_import_class = importer.get_json_receiver_class(feeder_name) + importer_obj = json_import_class(feeder_name, json_item) + importer.process_json(importer_obj, process) + + else: + time.sleep(5) diff --git a/bin/import/ail_json_importer/Ail_feeder_twitter.py b/bin/import/ail_json_importer/Ail_feeder_twitter.py new file mode 100755 index 00000000..831b56c3 --- /dev/null +++ b/bin/import/ail_json_importer/Ail_feeder_twitter.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The JSON Receiver Module +================ + +Recieve Json Items (example: Twitter feeder) + +""" +import os +import json +import sys +import datetime + +# sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) +# import item_basic + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')) +from Default_json import Default_json + +class Ail_feeder_twitter(Default_json): + """Twitter Feeder functions""" + + def __init__(self, name, json_item): + super().__init__(name, json_item) + + def get_feeder_name(self): + return 'twitter' + + # define item id + def get_item_id(self): + # use twitter timestamp ? + item_date = datetime.date.today().strftime("%Y/%m/%d") + item_id = str(self.json_item['meta']['twitter:tweet_id']) + return os.path.join('twitter', item_date, item_id) + '.gz' + + # # TODO: + def process_json_meta(self, process): + ''' + Process JSON meta filed. + ''' + return None diff --git a/bin/import/ail_json_importer/Default_json.py b/bin/import/ail_json_importer/Default_json.py new file mode 100755 index 00000000..a7ceb820 --- /dev/null +++ b/bin/import/ail_json_importer/Default_json.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The JSON Receiver Module +================ + +Recieve Json Items (example: Twitter feeder) + +""" +import os +import datetime +import json +import redis +import time +import sys +import uuid + +#sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +#import ConfigLoader +#import item_basic + +class Default_json(object): + """Default Feeder functions""" + + def __init__(self, feeder_name, json_item): + self.name = feeder_name + self.json_item = json_item + + def get_feeder_source(self): + ''' + Return the original feeder name (json source field). + ''' + return self.name + + def get_feeder_name(self): + ''' + Return feeder name. first part of the item_id and display in the UI + ''' + return self.name + + def get_json_file(self): + ''' + Return the JSON dict, + ''' + return self.json_item + + def get_feeder_uuid(self): + pass + + def get_item_gzip64encoded_content(self): + ''' + Return item base64 encoded gzip content, + ''' + return self.json_item['data'] + + ## OVERWRITE ME ## + def get_item_id(self): + ''' + Return item id. define item id + ''' + item_date = datetime.date.today().strftime("%Y/%m/%d") + return os.path.join(self.get_feeder_name(), item_date, str(uuid.uuid4())) + '.gz' + + ## OVERWRITE ME ## + def process_json_meta(self, process): + ''' + Process JSON meta filed. + ''' + return None diff --git a/bin/import/importer.py b/bin/import/importer.py new file mode 100755 index 00000000..585f4065 --- /dev/null +++ b/bin/import/importer.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The JSON Receiver Module +================ + +Recieve Json Items (example: Twitter feeder) + +""" +import os +import importlib +import json +import redis +import sys +import time + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +# Import all receiver +#from all_json_receiver import * + +#### CONFIG #### +config_loader = ConfigLoader.ConfigLoader() +server_cache = config_loader.get_redis_conn("Redis_Log_submit") +r_serv_db = config_loader.get_redis_conn("ARDB_DB") +config_loader = None + +DEFAULT_FEEDER_NAME = 'Default_json' + +#### ------ #### + +def reload_json_importer_list(): + global importer_list + importer_json_dir = os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer') + importer_list = [f[:-3] for f in os.listdir(importer_json_dir) if os.path.isfile(os.path.join(importer_json_dir, f))] + +# init importer list +importer_list = [] +reload_json_importer_list() + + +#### FUNCTIONS #### +def get_json_importer_list(): + return importer_list + +def add_json_to_json_queue(json_item): + json_item = json.dumps(json_item) + r_serv_db.sadd('importer:json:item', json_item) + +def get_json_item_to_import(): + return r_serv_db.spop('importer:json:item') + +def get_json_receiver_class(feeder_name_in): + global importer_list + + # sanitize class name + feeder_name = feeder_name_in[:1].upper() + feeder_name_in[1:] + feeder_name = feeder_name.replace('-', '_') + + if feeder_name is None or feeder_name not in get_json_importer_list(): + reload_json_importer_list() # add refresh timing ? + if feeder_name not in get_json_importer_list(): + print('Unknow feeder: {}'.format(feeder_name_in)) + feeder_name = 'Default_json' + # avoid subpackages + parts = feeder_name.split('.') + module = 'ail_json_importer.{}'.format(parts[-1]) + # import json importer class + try: + mod = importlib.import_module(module) + except: + raise + mod = importlib.import_module(module) + class_name = getattr(mod, feeder_name) + return class_name + +def get_json_source(json_item): + return json_item.get('source', DEFAULT_FEEDER_NAME) + +def process_json(importer_obj, process): + item_id = importer_obj.get_item_id() + if 'meta' in importer_obj.get_json_file(): + importer_obj.process_json_meta(process) + + # send data to queue + send_item_to_ail_queue(item_id, importer_obj.get_item_gzip64encoded_content(), importer_obj.get_feeder_name(), process) + +def send_item_to_ail_queue(item_id, gzip64encoded_content, feeder_name, process): + # Send item to queue + # send paste to Global + relay_message = "{0} {1}".format(item_id, gzip64encoded_content) + process.populate_set_out(relay_message, 'Mixer') + + # increase nb of paste by feeder name + server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1) + +#### ---- #### + + +#### API #### +def api_import_json_item(data_json): + if not data_json: + return ({'status': 'error', 'reason': 'Malformed JSON'}, 400) + + # # TODO: add JSON verification + res = add_json_to_json_queue(data_json) + if not res: + return ({'status': 'success'}, 200) diff --git a/bin/lib/item_basic.py b/bin/lib/item_basic.py new file mode 100755 index 00000000..756c55ef --- /dev/null +++ b/bin/lib/item_basic.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import sys + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +config_loader = ConfigLoader.ConfigLoader() +# get and sanityze PASTE DIRECTORY +PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/' +PASTES_FOLDER = os.path.join(os.path.realpath(PASTES_FOLDER), '') +config_loader = None + +def exist_item(item_id): + filename = get_item_filepath(item_id) + if os.path.isfile(filename): + return True + else: + return False + +def get_item_filepath(item_id): + filename = os.path.join(PASTES_FOLDER, item_id) + return os.path.realpath(filename) diff --git a/bin/packages/Item.py b/bin/packages/Item.py index 6015b1ae..a8cc5a51 100755 --- a/bin/packages/Item.py +++ b/bin/packages/Item.py @@ -15,6 +15,7 @@ import Cryptocurrency import Pgp sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import item_basic import ConfigLoader import Correlate_object import Decoded @@ -31,11 +32,7 @@ screenshot_directory = os.path.join(os.environ['AIL_HOME'], config_loader.get_co config_loader = None def exist_item(item_id): - filename = get_item_filepath(item_id) - if os.path.isfile(filename): - return True - else: - return False + return item_basic.exist_item(item_id) def get_basename(item_id): return os.path.basename(item_id) @@ -44,8 +41,7 @@ def get_item_id(full_path): return full_path.replace(PASTES_FOLDER, '', 1) def get_item_filepath(item_id): - filename = os.path.join(PASTES_FOLDER, item_id) - return os.path.realpath(filename) + return item_basic.get_item_filepath(item_id) def get_item_date(item_id, add_separator=False): l_directory = item_id.split('/') diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index a519403e..cc446036 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -2,6 +2,10 @@ subscribe = ZMQ_Global publish = Redis_Mixer,Redis_preProcess1 +[Importer_Json] +subscribe = ZMQ_JSON +publish = Redis_Mixer,Redis_Tags + [Global] subscribe = Redis_Mixer publish = Redis_Global,Redis_ModuleStats diff --git a/var/www/modules/restApi/Flask_restApi.py b/var/www/modules/restApi/Flask_restApi.py index 308e8146..78c4372d 100644 --- a/var/www/modules/restApi/Flask_restApi.py +++ b/var/www/modules/restApi/Flask_restApi.py @@ -24,6 +24,10 @@ import Paste import Tag import Term +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import')) +import importer + + from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response from flask_login import login_required @@ -588,5 +592,17 @@ def import_item_uuid(): return Response(json.dumps({'status': 'error', 'reason': 'Invalid response'}), mimetype='application/json'), 400 +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# +# +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +@restApi.route("api/v1/import/json/item", methods=['POST']) +@token_required('user') +def import_json_item(): + + data_json = request.get_json() + res = importer.api_import_json_item(data_json) + return Response(json.dumps(res[0]), mimetype='application/json'), res[1] + # ========= REGISTRATION ========= app.register_blueprint(restApi, url_prefix=baseUrl)