From de0a60ba8b021384ff1514b7c87c369687388c2b Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 3 Feb 2023 16:13:57 +0100 Subject: [PATCH] chg: [importer] refactor ZMQ + Feeder importer --- bin/DB_KVROCKS_MIGRATION.py | 23 ++-- bin/LAUNCH.sh | 8 +- bin/import/JSON_importer.py | 60 --------- .../ail_json_importer/Ail_feeder_jabber.py | 54 -------- .../ail_json_importer/Ail_feeder_telegram.py | 61 --------- .../ail_json_importer/Ail_feeder_twitter.py | 52 -------- .../Ail_feeder_urlextract.py | 63 --------- bin/import/ail_json_importer/Default_json.py | 72 ---------- bin/import/importer.py | 109 --------------- bin/importer/FeederImporter.py | 124 ++++++++++++++++++ bin/importer/ZMQImporter.py | 80 +++++++++++ bin/importer/abstract_importer.py | 42 ++++++ bin/importer/feeders/Default.py | 70 ++++++++++ bin/importer/feeders/Jabber.py | 53 ++++++++ bin/importer/feeders/Telegram.py | 56 ++++++++ bin/importer/feeders/Twitter.py | 48 +++++++ bin/importer/feeders/Urlextract.py | 58 ++++++++ bin/lib/Users.py | 14 +- bin/lib/ail_updates.py | 2 +- bin/modules/Indexer.py | 5 +- bin/packages/modules.cfg | 14 +- var/www/modules/dashboard/Flask_dashboard.py | 3 +- var/www/modules/restApi/Flask_restApi.py | 34 ++--- 23 files changed, 588 insertions(+), 517 deletions(-) delete mode 100755 bin/import/JSON_importer.py delete mode 100755 bin/import/ail_json_importer/Ail_feeder_jabber.py delete mode 100755 bin/import/ail_json_importer/Ail_feeder_telegram.py delete mode 100755 bin/import/ail_json_importer/Ail_feeder_twitter.py delete mode 100755 bin/import/ail_json_importer/Ail_feeder_urlextract.py delete mode 100755 bin/import/ail_json_importer/Default_json.py delete mode 100755 bin/import/importer.py create mode 100755 bin/importer/FeederImporter.py create mode 100755 bin/importer/ZMQImporter.py create mode 100755 bin/importer/abstract_importer.py create mode 100755 bin/importer/feeders/Default.py create mode 100755 bin/importer/feeders/Jabber.py create mode 100755 bin/importer/feeders/Telegram.py create mode 100755 bin/importer/feeders/Twitter.py create mode 100755 bin/importer/feeders/Urlextract.py diff --git a/bin/DB_KVROCKS_MIGRATION.py b/bin/DB_KVROCKS_MIGRATION.py index 0d87b624..1adedc98 100755 --- a/bin/DB_KVROCKS_MIGRATION.py +++ b/bin/DB_KVROCKS_MIGRATION.py @@ -100,15 +100,19 @@ def core_migration(): versions_to_update = r_serv_db.smembers('ail:to_update') for version in versions_to_update: r_kvrocks.sadd('ail:update:to_update', version) + update_error = r_serv_db.get('ail:update_error') + if update_error: + r_kvrocks.set('ail:update:error', update_error) + update_in_progress = r_serv_db.get('ail:update_in_progress') - r_kvrocks.set('ail:update:error', update_error) - r_kvrocks.set('ail:update:update_in_progress', update_in_progress) + if update_in_progress: + r_kvrocks.set('ail:update:update_in_progress', update_in_progress) # d4 passivedns - d4_enabled = r_serv_db.hget('d4:passivedns', 'enabled') + d4_enabled = bool(r_serv_db.hget('d4:passivedns', 'enabled')) d4_update_time = r_serv_db.hget('d4:passivedns', 'update_time') - r_kvrocks.hset('d4:passivedns', 'enabled', bool(d4_enabled)) + r_kvrocks.hset('d4:passivedns', 'enabled', str(d4_enabled)) r_kvrocks.hset('d4:passivedns', 'update_time', d4_update_time) # Crawler Manager @@ -172,6 +176,7 @@ def user_migration(): Users.create_user(user_id, password=None, chg_passwd=chg_passwd, role=role) Users.edit_user_password(user_id, password_hash, chg_passwd=chg_passwd) Users._delete_user_token(user_id) + print(user_id, token) Users._set_user_token(user_id, token) for invite_row in r_crawler.smembers('telegram:invite_code'): @@ -871,15 +876,15 @@ def cves_migration(): if __name__ == '__main__': - #core_migration() - #user_migration() + core_migration() + user_migration() #tags_migration() # items_migration() # crawler_migration() - domain_migration() # TO TEST ########################### + # domain_migration() # TO TEST ########################### # decodeds_migration() # screenshots_migration() - subtypes_obj_migration() + # subtypes_obj_migration() # ail_2_ail_migration() # trackers_migration() # investigations_migration() @@ -891,6 +896,6 @@ if __name__ == '__main__': # crawler queues + auto_crawlers # stats - Cred - Mail - Provider - +# TODO FEEDER IMPORT -> return r_serv_db.lpop('importer:json:item') ########################################################## diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 480d203a..a16837ec 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -162,7 +162,7 @@ function launching_scripts { # sleep 0.1 echo -e $GREEN"\t* Launching core scripts ..."$DEFAULT - # TODO: MOOVE IMPORTER ???? => multiple scripts + # TODO: IMPORTER SCREEN ???? #### SYNC #### screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x" @@ -173,7 +173,9 @@ function launching_scripts { sleep 0.1 ##-- SYNC --## - screen -S "Script_AIL" -X screen -t "JSON_importer" bash -c "cd ${AIL_BIN}/import; ${ENV_PY} ./JSON_importer.py; read x" + screen -S "Script_AIL" -X screen -t "ZMQImporter" bash -c "cd ${AIL_BIN}/importer; ${ENV_PY} ./ZMQImporter.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "FeederImporter" bash -c "cd ${AIL_BIN}/importer; ${ENV_PY} ./FeederImporter.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "D4_client" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./D4_client.py; read x" sleep 0.1 @@ -279,7 +281,7 @@ function launching_scripts { # sleep 0.1 ################################## - # # + # TO MIGRATE # ################################## screen -S "Script_AIL" -X screen -t "ModuleInformation" bash -c "cd ${AIL_BIN}; ${ENV_PY} ./ModulesInformationV2.py -k 0 -c 1; read x" sleep 0.1 diff --git a/bin/import/JSON_importer.py b/bin/import/JSON_importer.py deleted file mode 100755 index de0ca574..00000000 --- a/bin/import/JSON_importer.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Recieve Json Items (example: Twitter feeder) - -""" -import os -import json -import redis -import sys -import time - -sys.path.append(os.environ['AIL_BIN']) -from Helper import Process -from pubsublogger import publisher - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -import ConfigLoader - -import importer - - -if __name__ == '__main__': - publisher.port = 6380 - publisher.channel = 'Script' - - config_section = 'Importer_Json' - - process = Process(config_section) - - config_loader = ConfigLoader.ConfigLoader() - - # REDIS # - server_cache = config_loader.get_redis_conn("Redis_Log_submit") - config_loader = None - - # LOGGING # - publisher.info("JSON Feed Script started to receive & publish.") - - # OTHER CONFIG # - DEFAULT_FEEDER_NAME = 'Unknow Feeder' - - while True: - - json_item = importer.get_json_item_to_import() - if json_item: - - json_item = json.loads(json_item) - feeder_name = importer.get_json_source(json_item) - print('importing: {} feeder'.format(feeder_name)) - - json_import_class = importer.get_json_receiver_class(feeder_name) - importer_obj = json_import_class(feeder_name, json_item) - importer.process_json(importer_obj, process) - - else: - time.sleep(5) diff --git a/bin/import/ail_json_importer/Ail_feeder_jabber.py b/bin/import/ail_json_importer/Ail_feeder_jabber.py deleted file mode 100755 index 96641abe..00000000 --- a/bin/import/ail_json_importer/Ail_feeder_jabber.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Receiver Jabber Json Items - -""" -import os -import sys -import time - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib import item_basic -from lib.objects.Usernames import Username - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')) -from Default_json import Default_json - -class Ail_feeder_jabber(Default_json): - """Jabber Feeder functions""" - - def __init__(self, name, json_item): - super().__init__(name, json_item) - - def get_feeder_name(self): - return 'jabber' - - # define item id - def get_item_id(self): - item_date = time.strptime(self.json_item['meta']['jabber:ts'], "%Y-%m-%dT%H:%M:%S.%f") - item_date_str = time.strftime("%Y/%m/%d", item_date) - item_id = str(self.json_item['meta']['jabber:id']) - return os.path.join('jabber', item_date_str, item_id) + '.gz' - - def process_json_meta(self, process, item_id): - ''' - Process JSON meta filed. - ''' - jabber_id = str(self.json_item['meta']['jabber:id']) - item_basic.add_map_obj_id_item_id(jabber_id, item_id, 'jabber_id') - to = str(self.json_item['meta']['jabber:to']) - fr = str(self.json_item['meta']['jabber:from']) - item_date = item_basic.get_item_date(item_id) - - user_to = Username(to, 'jabber') - user_fr = Username(fr, 'jabber') - user_to.add(date, item_id) - user_fr.add(date, item_id) - return None diff --git a/bin/import/ail_json_importer/Ail_feeder_telegram.py b/bin/import/ail_json_importer/Ail_feeder_telegram.py deleted file mode 100755 index 8fb8bbff..00000000 --- a/bin/import/ail_json_importer/Ail_feeder_telegram.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Recieve Json Items (example: Twitter feeder) - -""" -import os -import sys -import datetime - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib import item_basic -from lib.objects.Usernames import Username - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')) -from Default_json import Default_json - -class Ail_feeder_telegram(Default_json): - """Twitter Feeder functions""" - - def __init__(self, name, json_item): - super().__init__(name, json_item) - - def get_feeder_name(self): - return 'telegram' - - # define item id - def get_item_id(self): - # use twitter timestamp ? - item_date = datetime.date.today().strftime("%Y/%m/%d") - channel_id = str(self.json_item['meta']['channel_id']) - message_id = str(self.json_item['meta']['message_id']) - item_id = f'{channel_id}_{message_id}' - return os.path.join('telegram', item_date, item_id) + '.gz' - - def process_json_meta(self, process, item_id): - ''' - Process JSON meta filed. - ''' - channel_id = str(self.json_item['meta']['channel_id']) - message_id = str(self.json_item['meta']['message_id']) - telegram_id = f'{channel_id}_{message_id}' - item_basic.add_map_obj_id_item_id(telegram_id, item_id, 'telegram_id') - #print(self.json_item['meta']) - user = None - if self.json_item['meta'].get('user'): - user = str(self.json_item['meta']['user']) - else: - if self.json_item['meta'].get('channel'): - user = str(self.json_item['meta']['channel']['username']) - if user: - item_date = item_basic.get_item_date(item_id) - username = Username(user, 'telegram') - username.add(date, item_id) - return None diff --git a/bin/import/ail_json_importer/Ail_feeder_twitter.py b/bin/import/ail_json_importer/Ail_feeder_twitter.py deleted file mode 100755 index bc0f1b2e..00000000 --- a/bin/import/ail_json_importer/Ail_feeder_twitter.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Recieve Json Items (example: Twitter feeder) - -""" -import os -import sys -import datetime - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib import item_basic -from lib.objects.Usernames import Username - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')) -from Default_json import Default_json - -class Ail_feeder_twitter(Default_json): - """Twitter Feeder functions""" - - def __init__(self, name, json_item): - super().__init__(name, json_item) - - def get_feeder_name(self): - return 'twitter' - - # define item id - def get_item_id(self): - # use twitter timestamp ? - item_date = datetime.date.today().strftime("%Y/%m/%d") - item_id = str(self.json_item['meta']['twitter:tweet_id']) - return os.path.join('twitter', item_date, item_id) + '.gz' - - def process_json_meta(self, process, item_id): - ''' - Process JSON meta filed. - ''' - tweet_id = str(self.json_item['meta']['twitter:tweet_id']) - item_basic.add_map_obj_id_item_id(tweet_id, item_id, 'twitter_id') - - date = item_basic.get_item_date(item_id) - user = str(self.json_item['meta']['twitter:id']) - username = Username(user, 'twitter') - username.add(date, item_id) - - return None diff --git a/bin/import/ail_json_importer/Ail_feeder_urlextract.py b/bin/import/ail_json_importer/Ail_feeder_urlextract.py deleted file mode 100755 index 9bd4cbbd..00000000 --- a/bin/import/ail_json_importer/Ail_feeder_urlextract.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Recieve Json Items (example: Twitter feeder) - -""" -import os -import sys -import datetime -import uuid - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')) -from Default_json import Default_json - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib.objects.Items import Item - - -class Ail_feeder_urlextract(Default_json): - """urlextract Feeder functions""" - - def __init__(self, name, json_item): - super().__init__(name, json_item) - - def get_feeder_name(self): - return 'urlextract' - - # define item id - def get_item_id(self): - # use twitter timestamp ? - item_date = datetime.date.today().strftime("%Y/%m/%d") - item_id = str(self.json_item['meta']['twitter:url-extracted']) - item_id = item_id.split('//') - if len(item_id) > 1: - item_id = ''.join(item_id[1:]) - else: - item_id = item_id[0] - item_id = item_id.replace('/', '_') - if len(item_id) > 215: - item_id = '{}{}.gz'.format(item_id[:215], str(uuid.uuid4())) - else: - item_id = '{}{}.gz'.format(item_id, str(uuid.uuid4())) - return os.path.join('urlextract', item_date, item_id) - - # # TODO: - def process_json_meta(self, process, item_id): - """ - Process JSON meta filed. - """ - json_meta = self.get_json_meta() - parent_id = str(json_meta['parent:twitter:tweet_id']) # TODO SEARCH IN CACHE !!! - item = Item(item_id) - item.set_parent(parent_id) - - # # TODO: change me - # parent_type = 'twitter_id' - # item_basic.add_item_parent_by_parent_id(parent_type, parent_id, item_id) diff --git a/bin/import/ail_json_importer/Default_json.py b/bin/import/ail_json_importer/Default_json.py deleted file mode 100755 index 00e384ef..00000000 --- a/bin/import/ail_json_importer/Default_json.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Recieve Json Items (example: Twitter feeder) - -""" -import os -import datetime -import json -import redis -import time -import sys -import uuid - -#sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -#import ConfigLoader -#import item_basic - -class Default_json(object): - """Default Feeder functions""" - - def __init__(self, feeder_name, json_item): - self.name = feeder_name - self.json_item = json_item - - def get_feeder_source(self): - ''' - Return the original feeder name (json source field). - ''' - return self.name - - def get_feeder_name(self): - ''' - Return feeder name. first part of the item_id and display in the UI - ''' - return self.name - - def get_json_file(self): - ''' - Return the JSON dict, - ''' - return self.json_item - - def get_json_meta(self): - return self.json_item['meta'] - - def get_feeder_uuid(self): - pass - - def get_item_gzip64encoded_content(self): - ''' - Return item base64 encoded gzip content, - ''' - return self.json_item['data'] - - ## OVERWRITE ME ## - def get_item_id(self): - ''' - Return item id. define item id - ''' - item_date = datetime.date.today().strftime("%Y/%m/%d") - return os.path.join(self.get_feeder_name(), item_date, str(uuid.uuid4())) + '.gz' - - ## OVERWRITE ME ## - def process_json_meta(self, process, item_id): - ''' - Process JSON meta filed. - ''' - return None diff --git a/bin/import/importer.py b/bin/import/importer.py deleted file mode 100755 index 841860a5..00000000 --- a/bin/import/importer.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -The JSON Receiver Module -================ - -Recieve Json Items (example: Twitter feeder) - -""" -import os -import importlib -import json -import sys -import time - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -import ConfigLoader - -# Import all receiver -#from all_json_receiver import * - -#### CONFIG #### -config_loader = ConfigLoader.ConfigLoader() -server_cache = config_loader.get_redis_conn("Redis_Log_submit") -r_serv_db = config_loader.get_redis_conn("ARDB_DB") -config_loader = None - -DEFAULT_FEEDER_NAME = 'Default_json' - -#### ------ #### - -def reload_json_importer_list(): - global importer_list - importer_json_dir = os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer') - importer_list = [f[:-3] for f in os.listdir(importer_json_dir) if os.path.isfile(os.path.join(importer_json_dir, f))] - -# init importer list -importer_list = [] -reload_json_importer_list() - - -#### FUNCTIONS #### -def get_json_importer_list(): - return importer_list - -def add_json_to_json_queue(json_item): - json_item = json.dumps(json_item) - r_serv_db.rpush('importer:json:item', json_item) - -def get_json_item_to_import(): - return r_serv_db.lpop('importer:json:item') - -def get_json_receiver_class(feeder_name_in): - global importer_list - - # sanitize class name - feeder_name = feeder_name_in[:1].upper() + feeder_name_in[1:] - feeder_name = feeder_name.replace('-', '_') - - if feeder_name is None or feeder_name not in get_json_importer_list(): - reload_json_importer_list() # add refresh timing ? - if feeder_name not in get_json_importer_list(): - print('Unknow feeder: {}'.format(feeder_name_in)) - feeder_name = 'Default_json' - # avoid subpackages - parts = feeder_name.split('.') - module = 'ail_json_importer.{}'.format(parts[-1]) - # import json importer class - try: - mod = importlib.import_module(module) - except: - raise - mod = importlib.import_module(module) - class_name = getattr(mod, feeder_name) - return class_name - -def get_json_source(json_item): - return json_item.get('source', DEFAULT_FEEDER_NAME) - -def process_json(importer_obj, process): - item_id = importer_obj.get_item_id() - if 'meta' in importer_obj.get_json_file(): - importer_obj.process_json_meta(process, item_id) - - # send data to queue - send_item_to_ail_queue(item_id, importer_obj.get_item_gzip64encoded_content(), importer_obj.get_feeder_name(), process) - -def send_item_to_ail_queue(item_id, gzip64encoded_content, feeder_name, process): - # Send item to queue - # send paste to Global - relay_message = "{0} {1}".format(item_id, gzip64encoded_content) - process.populate_set_out(relay_message, 'Mixer') - - # increase nb of paste by feeder name - server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1) - -#### ---- #### - - -#### API #### -def api_import_json_item(data_json): - if not data_json: - return ({'status': 'error', 'reason': 'Malformed JSON'}, 400) - - # # TODO: add JSON verification - res = add_json_to_json_queue(data_json) - if res: - return ({'status': 'error'}, 400) - return ({'status': 'success'}, 200) diff --git a/bin/importer/FeederImporter.py b/bin/importer/FeederImporter.py new file mode 100755 index 00000000..ecbefade --- /dev/null +++ b/bin/importer/FeederImporter.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +Importer Class +================ + +Import Content + +""" +import os +import sys + +import importlib +import json + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.abstract_importer import AbstractImporter +from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader + +#### CONFIG #### +config_loader = ConfigLoader() +r_db = config_loader.get_db_conn('Kvrocks_DB') +config_loader = None +# --- CONFIG --- # + +class FeederImporter(AbstractImporter): + def __init__(self): + super().__init__() + self.feeders = {} + self.reload_feeders() + + # TODO ADD TIMEOUT RELOAD + def reload_feeders(self): + feeder_dir = os.path.join(os.environ['AIL_BIN'], 'importer', 'feeders') + feeders = [f[:-3] for f in os.listdir(feeder_dir) if os.path.isfile(os.path.join(feeder_dir, f))] + self.feeders = {} + for feeder in feeders: + print(feeder) + part = feeder.split('.')[-1] + # import json importer class + mod = importlib.import_module(f'importer.feeders.{part}') + cls = getattr(mod, f'{feeder}Feeder') + print(cls) + self.feeders[feeder] = cls + print() + print(self.feeders) + print() + + def get_feeder(self, json_data): + class_name = None + feeder_name = json_data.get('source') + if feeder_name: + if feeder_name.startswith('ail_feeder_'): + feeder_name = feeder_name.replace('ail_feeder_', '', 1) + class_name = feeder_name.replace('-', '_').title() + + if not class_name or class_name not in self.feeders: + class_name = 'Default' + cls = self.feeders[class_name] + return cls(json_data) + + def importer(self, json_data): + + feeder = self.get_feeder(json_data) + + feeder_name = feeder.get_name() + print(f'importing: {feeder_name} feeder') + + item_id = feeder.get_item_id() + # process meta + if feeder.get_json_meta(): + feeder.process_meta() + gzip64_content = feeder.get_gzip64_content() + + return f'{item_id} {gzip64_content}' + + +class FeederModuleImporter(AbstractModule): + def __init__(self): + super(FeederModuleImporter, self).__init__() + self.pending_seconds = 5 + + config = ConfigLoader() + self.r_db = config.get_db_conn('Kvrocks_DB') + self.importer = FeederImporter() + + def get_message(self): + return self.r_db.lpop('importer:feeder') # TODO CHOOSE DB + # TODO RELOAD LIST + # after delta + + def compute(self, message): + # TODO HANDLE Invalid JSON + json_data = json.loads(message) + relay_message = self.importer.importer(json_data) + self.send_message_to_queue(relay_message) + + # TODO IN MIXER + # increase nb of paste by feeder name + # server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1) + + +def add_json_feeder_to_queue(json_data): + json_data = json.dumps(json_data) + return r_db.rpush('importer:feeder', json_data) + +def api_add_json_feeder_to_queue(json_data): + if not json_data: + return {'status': 'error', 'reason': 'Malformed JSON'}, 400 + # # TODO: add JSON verification + res = add_json_feeder_to_queue(json_data) + if not res: + return {'status': 'error'}, 400 + return {'status': 'success'}, 200 + + +# Launch Importer +if __name__ == '__main__': + module = FeederModuleImporter() + module.run() diff --git a/bin/importer/ZMQImporter.py b/bin/importer/ZMQImporter.py new file mode 100755 index 00000000..67c17d0c --- /dev/null +++ b/bin/importer/ZMQImporter.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +Importer Class +================ + +Import Content + +""" +import os +import sys +import time + +import zmq + + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.abstract_importer import AbstractImporter +from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader + +class ZMQImporters(AbstractImporter): + def __init__(self): + super().__init__() + self.subscribers = [] + # Initialize poll set + self.poller = zmq.Poller() + + def add(self, address, channel): + context = zmq.Context() + subscriber = context.socket(zmq.SUB) + r = subscriber.connect(address) + print(r) + subscriber.setsockopt_string(zmq.SUBSCRIBE, channel) + self.subscribers.append(subscriber) + + self.poller.register(subscriber, zmq.POLLIN) + + def importer(self, timeout=None): # -> FOR loop required + """ + :param timeout: The timeout (in milliseconds) to wait for an event. + If unspecified (or specified None), will wait forever for an event. + :returns: messages generator + """ + for event in self.poller.poll(timeout=timeout): + socket, event_mask = event + # DEBUG + print(socket, event_mask) + yield socket.recv() + + +class ZMQModuleImporter(AbstractModule): + def __init__(self): + super().__init__() + + config_loader = ConfigLoader() + address = config_loader.get_config_str('ZMQ_Global', 'address') + channel = config_loader.get_config_str('ZMQ_Global', 'channel') + self.zmq_importer = ZMQImporters() + # TODO register all Importers + self.zmq_importer.add(address, channel) + + def get_message(self): + for message in self.zmq_importer.importer(): + # remove channel from message + yield message.split(b' ', 1)[1] + + def compute(self, messages): + for message in messages: + message = message.decode() + print(message.split(' ', 1)[0]) + self.send_message_to_queue(message) + + +if __name__ == '__main__': + module = ZMQModuleImporter() + module.run() diff --git a/bin/importer/abstract_importer.py b/bin/importer/abstract_importer.py new file mode 100755 index 00000000..2e344bc4 --- /dev/null +++ b/bin/importer/abstract_importer.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +Importer Class +================ + +Import Content + +""" +import os +import sys + +from abc import ABC, abstractmethod + + +# sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +# from ConfigLoader import ConfigLoader + +class AbstractImporter(ABC): + def __init__(self): + """ + Init Module + importer_name: str; set the importer name if different from the instance ClassName + """ + # Module name if provided else instance className + self.name = self._name() + + @abstractmethod + def importer(self, *args, **kwargs): + """Importer function""" + pass + + def _name(self): + """ + Returns the instance class name (ie. the Exporter Name) + """ + return self.__class__.__name__ + + diff --git a/bin/importer/feeders/Default.py b/bin/importer/feeders/Default.py new file mode 100755 index 00000000..482d06b4 --- /dev/null +++ b/bin/importer/feeders/Default.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The Default JSON Feeder Importer Module +================ + +Process Feeder Json (example: Twitter feeder) + +""" +import os +import datetime +import uuid + +class DefaultFeeder: + """Default Feeder""" + + def __init__(self, json_data): + self.json_data = json_data + self.item_id = None + self.name = None + + def get_name(self): + """ + Return feeder name. first part of the item_id and display in the UI + """ + if not self.name: + return self.get_source() + return self.name + + def get_source(self): + return self.json_data.get('source') + + def get_json_data(self): + """ + Return the JSON data, + """ + return self.json_data + + def get_json_meta(self): + return self.json_data.get('meta') + + def get_uuid(self): + return self.json_data.get('source_uuid') + + def get_default_encoding(self): + return self.json_data.get('default_encoding') + + def get_gzip64_content(self): + """ + Return the base64 encoded gzip content, + """ + return self.json_data.get('data') + + ## OVERWRITE ME ## + def get_item_id(self): + """ + Return item id. define item id + """ + date = datetime.date.today().strftime("%Y/%m/%d") + item_id = os.path.join(self.get_name(), date, str(uuid.uuid4())) + self.item_id = f'{item_id}.gz' + return self.item_id + + ## OVERWRITE ME ## + def process_meta(self): + """ + Process JSON meta filed. + """ + # meta = self.get_json_meta() + pass diff --git a/bin/importer/feeders/Jabber.py b/bin/importer/feeders/Jabber.py new file mode 100755 index 00000000..79d0950f --- /dev/null +++ b/bin/importer/feeders/Jabber.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The Jabber Feeder Importer Module +================ + +Process Jabber JSON + +""" +import os +import sys +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.feeders.Default import DefaultFeeder +from lib.objects.Usernames import Username +from lib import item_basic + + +class JabberFeeder(DefaultFeeder): + """Jabber Feeder functions""" + + def __init__(self, json_data): + super().__init__(json_data) + self.name = 'jabber' + + # define item id + def get_item_id(self): + date = time.strptime(self.json_data['meta']['jabber:ts'], "%Y-%m-%dT%H:%M:%S.%f") + date_str = time.strftime("%Y/%m/%d", date) + item_id = str(self.json_data['meta']['jabber:id']) + item_id = os.path.join('jabber', date_str, item_id) + self.item_id = f'{item_id}.gz' + return self.item_id + + def process_meta(self): + """ + Process JSON meta field. + """ + # jabber_id = str(self.json_data['meta']['jabber:id']) + # item_basic.add_map_obj_id_item_id(jabber_id, item_id, 'jabber_id') ############################################## + to = str(self.json_data['meta']['jabber:to']) + fr = str(self.json_data['meta']['jabber:from']) + date = item_basic.get_item_date(item_id) + + user_to = Username(to, 'jabber') + user_fr = Username(fr, 'jabber') + user_to.add(date, self.item_id) + user_fr.add(date, self.item_id) + return None diff --git a/bin/importer/feeders/Telegram.py b/bin/importer/feeders/Telegram.py new file mode 100755 index 00000000..3856c88e --- /dev/null +++ b/bin/importer/feeders/Telegram.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The Telegram Feeder Importer Module +================ + +Process Telegram JSON + +""" +import os +import sys +import datetime + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.feeders.Default import DefaultFeeder +from lib.objects.Usernames import Username +from lib import item_basic + +class TelegramFeeder(DefaultFeeder): + + def __init__(self, json_data): + super().__init__(json_data) + self.name = 'telegram' + + # define item id + def get_item_id(self): + # TODO use telegram message date + date = datetime.date.today().strftime("%Y/%m/%d") + channel_id = str(self.json_data['meta']['channel_id']) + message_id = str(self.json_data['meta']['message_id']) + item_id = f'{channel_id}_{message_id}' + item_id = os.path.join('telegram', date, item_id) + self.item_id = f'{item_id}.gz' + return self.item_id + + def process_meta(self): + """ + Process JSON meta field. + """ + # channel_id = str(self.json_data['meta']['channel_id']) + # message_id = str(self.json_data['meta']['message_id']) + # telegram_id = f'{channel_id}_{message_id}' + # item_basic.add_map_obj_id_item_id(telegram_id, item_id, 'telegram_id') ######################################### + user = None + if self.json_data['meta'].get('user'): + user = str(self.json_data['meta']['user']) + elif self.json_data['meta'].get('channel'): + user = str(self.json_data['meta']['channel'].get('username')) + if user: + date = item_basic.get_item_date(self.item_id) + username = Username(user, 'telegram') + username.add(date, self.item_id) + return None diff --git a/bin/importer/feeders/Twitter.py b/bin/importer/feeders/Twitter.py new file mode 100755 index 00000000..d5040c65 --- /dev/null +++ b/bin/importer/feeders/Twitter.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The Twitter Feeder Importer Module +================ + +Process Twitter JSON + +""" +import os +import sys +import datetime + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.feeders.Default import DefaultFeeder +from lib.objects.Usernames import Username +from lib import item_basic + +class TwitterFeeder(DefaultFeeder): + + def __init__(self, json_data): + super().__init__(json_data) + self.name = 'twitter' + + # define item id + def get_item_id(self): + # TODO twitter timestamp message date + date = datetime.date.today().strftime("%Y/%m/%d") + item_id = str(self.json_data['meta']['twitter:tweet_id']) + item_id = os.path.join('twitter', date, item_id) + self.item_id = f'{item_id}.gz' + return self.item_id + + def process_meta(self): + ''' + Process JSON meta field. + ''' + # tweet_id = str(self.json_data['meta']['twitter:tweet_id']) + # item_basic.add_map_obj_id_item_id(tweet_id, item_id, 'twitter_id') ############################################ + + date = item_basic.get_item_date(self.item_id) + user = str(self.json_data['meta']['twitter:id']) + username = Username(user, 'twitter') + username.add(date, item_id) + return None diff --git a/bin/importer/feeders/Urlextract.py b/bin/importer/feeders/Urlextract.py new file mode 100755 index 00000000..1ef19920 --- /dev/null +++ b/bin/importer/feeders/Urlextract.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +The JSON Receiver Module +================ + +Recieve Json Items (example: Twitter feeder) + +""" +import os +import sys +import datetime +import uuid + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.feeders.Default import DefaultFeeder +from lib.objects.Items import Item + + +class UrlextractFeeder(DefaultFeeder): + + def __init__(self, json_data): + super().__init__(json_data) + self.name = 'urlextract' + + # define item id + def get_item_id(self): + date = datetime.date.today().strftime("%Y/%m/%d") + item_id = str(self.json_data['meta']['twitter:url-extracted']) + item_id = item_id.split('//') + if len(item_id) > 1: + item_id = ''.join(item_id[1:]) + else: + item_id = item_id[0] + item_id = item_id.replace('/', '_') + # limit ID length + if len(item_id) > 215: + item_id = item_id[:215] + item_id = f'{item_id}{str(uuid.uuid4())}.gz' + self.item_id = os.path.join('urlextract', date, item_id) + return self.item_id + + def process_meta(self): + """ + Process JSON meta field. + """ + # ADD Other parents here + parent_id = None + if self.json_data['meta'].get('parent:twitter:tweet_id'): + parent_id = str(self.json_data['meta']['parent:twitter:tweet_id']) + + if parent_id: + item = Item(self.item_id) + item.set_parent(parent_id) + diff --git a/bin/lib/Users.py b/bin/lib/Users.py index 668e343e..e29b1a45 100755 --- a/bin/lib/Users.py +++ b/bin/lib/Users.py @@ -48,7 +48,8 @@ def gen_token(): def _delete_user_token(user_id): current_token = get_user_token(user_id) - r_serv_db.hdel('ail:users:tokens', current_token) + if current_token: + r_serv_db.hdel('ail:users:tokens', current_token) def _set_user_token(user_id, token): r_serv_db.hset('ail:users:tokens', token, user_id) @@ -82,6 +83,12 @@ def get_user_passwd_hash(user_id): def get_user_token(user_id): return r_serv_db.hget(f'ail:users:metadata:{user_id}', 'token') +def get_token_user(token): + return r_serv_db.hget('ail:users:tokens', token) + +def exists_token(token): + return r_serv_db.hexists('ail:users:tokens', token) + def exists_user(user_id): return r_serv_db.exists(f'ail:user:metadata:{user_id}') @@ -131,7 +138,7 @@ def create_user(user_id, password=None, chg_passwd=True, role=None): def edit_user_password(user_id, password_hash, chg_passwd=False): if chg_passwd: - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', True) + r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True') else: r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'change_passwd') # remove default user password file @@ -194,6 +201,9 @@ def get_all_user_upper_role(user_role): def get_default_role(): return 'read_only' +def is_in_role(user_id, role): + return r_serv_db.sismember(f'ail:users:role:{role}', user_id) + def edit_user_role(user_id, role): current_role = get_user_role(user_id) if role != current_role: diff --git a/bin/lib/ail_updates.py b/bin/lib/ail_updates.py index bf1b72a1..c3729831 100755 --- a/bin/lib/ail_updates.py +++ b/bin/lib/ail_updates.py @@ -12,7 +12,7 @@ sys.path.append(os.environ['AIL_BIN']) from lib.ConfigLoader import ConfigLoader config_loader = ConfigLoader() -r_db = config_loader.get_redis_conn("Kvrocks_DB") +r_db = config_loader.get_db_conn("Kvrocks_DB") config_loader = None BACKGROUND_UPDATES = { diff --git a/bin/modules/Indexer.py b/bin/modules/Indexer.py index 2b80eeb3..f097c333 100755 --- a/bin/modules/Indexer.py +++ b/bin/modules/Indexer.py @@ -2,11 +2,10 @@ # -*-coding:UTF-8 -* """ -The ZMQ_Sub_Indexer Module +The Indexer Module ============================ -The ZMQ_Sub_Indexer modules is fetching the list of files to be processed -and index each file with a full-text indexer (Whoosh until now). +each file with a full-text indexer (Whoosh until now). """ ################################## diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index 0f1d7547..1c718dab 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -1,6 +1,16 @@ +[ZMQModuleImporter] +publish = Redis_Import + +[FeederModuleImporter] +publish = Redis_Import + +#################################################### + [Mixer] -subscribe = ZMQ_Global -publish = Redis_Mixer,Redis_preProcess1 +# subscribe = ZMQ_Global +subscribe = Redis_Import +publish = Redis_Mixer +#publish = Redis_Mixer,Redis_preProcess1 [Sync_importer] publish = Redis_Mixer,Redis_Tags diff --git a/var/www/modules/dashboard/Flask_dashboard.py b/var/www/modules/dashboard/Flask_dashboard.py index cb33b268..45be4f46 100644 --- a/var/www/modules/dashboard/Flask_dashboard.py +++ b/var/www/modules/dashboard/Flask_dashboard.py @@ -169,7 +169,8 @@ def index(): update_message = '' if ail_updates.get_current_background_update(): background_update = True - update_message = ail_updates.get_update_background_message() + # update_message = ail_updates.get_update_background_message() + update_message = None return render_template("index.html", default_minute = default_minute, threshold_stucked_module=threshold_stucked_module, diff --git a/var/www/modules/restApi/Flask_restApi.py b/var/www/modules/restApi/Flask_restApi.py index 81fa47eb..55c52cf0 100644 --- a/var/www/modules/restApi/Flask_restApi.py +++ b/var/www/modules/restApi/Flask_restApi.py @@ -16,6 +16,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## +from lib import Users from lib.objects.Items import Item from lib import Tag from lib import Tracker @@ -24,8 +25,7 @@ from packages import Term from packages import Import_helper -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import')) -import importer +from importer.FeederImporter import api_add_json_feeder_to_queue from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response, escape @@ -40,7 +40,6 @@ import Flask_config app = Flask_config.app baseUrl = Flask_config.baseUrl r_cache = Flask_config.r_cache -r_serv_db = Flask_config.r_serv_db restApi = Blueprint('restApi', __name__, template_folder='templates') @@ -57,31 +56,16 @@ def verify_token(token): if not check_token_format(token): return False - if r_serv_db.hexists('user:tokens', token): - return True - else: - return False - -def get_user_from_token(token): - return r_serv_db.hget('user:tokens', token) + return Users.exists_token(token) def verify_user_role(role, token): # User without API if role == 'user_no_api': return False - user_id = get_user_from_token(token) + user_id = Users.get_token_user(token) if user_id: - if is_in_role(user_id, role): - return True - else: - return False - else: - return False - -def is_in_role(user_id, role): - if r_serv_db.sismember('user_role:{}'.format(role), user_id): - return True + return Users.is_in_role(user_id, role) else: return False @@ -366,7 +350,7 @@ def get_all_tags(): def add_tracker_term(): data = request.get_json() user_token = get_auth_from_header() - user_id = get_user_from_token(user_token) + user_id = Users.get_token_user(user_token) res = Tracker.api_add_tracker(data, user_id) return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1] @@ -375,7 +359,7 @@ def add_tracker_term(): def delete_tracker_term(): data = request.get_json() user_token = get_auth_from_header() - user_id = get_user_from_token(user_token) + user_id = Users.get_token_user(user_token) res = Term.parse_tracked_term_to_delete(data, user_id) return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1] @@ -384,7 +368,7 @@ def delete_tracker_term(): def get_tracker_term_item(): data = request.get_json() user_token = get_auth_from_header() - user_id = get_user_from_token(user_token) + user_id = Users.get_token_user(user_token) res = Term.parse_get_tracker_term_item(data, user_id) return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1] @@ -687,7 +671,7 @@ def import_item_uuid(): def import_json_item(): data_json = request.get_json() - res = importer.api_import_json_item(data_json) + res = api_add_json_feeder_to_queue(data_json) return Response(json.dumps(res[0]), mimetype='application/json'), res[1]