diff --git a/HOWTO.md b/HOWTO.md index 08f23f6a..1432d475 100644 --- a/HOWTO.md +++ b/HOWTO.md @@ -11,7 +11,7 @@ For the moment, there are three different ways to feed AIL with data: 2. You can setup [pystemon](https://github.com/cvandeplas/pystemon) and use the custom feeder provided by AIL (see below). -3. You can feed your own data using the [./bin/import_dir.py](./bin/import_dir.py) script. +3. You can feed your own data using the [./bin/file_dir_importer.py](./bin/import_dir.py) script. ### Feeding AIL with pystemon diff --git a/bin/importer/FileImporter.py b/bin/importer/FileImporter.py new file mode 100755 index 00000000..ba5e54b9 --- /dev/null +++ b/bin/importer/FileImporter.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +Importer Class +================ + +Import Content + +""" +import logging.config +import os +import sys + + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer.abstract_importer import AbstractImporter +# from modules.abstract_module import AbstractModule +from lib import ail_logger +from lib.ail_queues import AILQueue +from lib import ail_files # TODO RENAME ME + +logging.config.dictConfig(ail_logger.get_config(name='modules')) + +# TODO Clean queue one object destruct + +class FileImporter(AbstractImporter): + def __init__(self, feeder='file_import'): + super().__init__() + self.logger = logging.getLogger(f'{self.__class__.__name__}') + + self.feeder_name = feeder # TODO sanityze feeder name + + # Setup the I/O queues + self.queue = AILQueue('FileImporter', 'manual') + + def importer(self, path): + if os.path.isfile(path): + with open(path, 'rb') as f: + content = f.read() + mimetype = ail_files.get_mimetype(content) + if ail_files.is_text(mimetype): + item_id = ail_files.create_item_id(self.feeder_name, path) + content = ail_files.create_gzipped_b64(content) + if content: + message = f'dir_import {item_id} {content}' + self.logger.info(message) + self.queue.send_message(message) + elif mimetype == 'application/gzip': + item_id = ail_files.create_item_id(self.feeder_name, path) + content = ail_files.create_b64(content) + if content: + message = f'dir_import {item_id} {content}' + self.logger.info(message) + self.queue.send_message(message) + +class DirImporter(AbstractImporter): + def __init__(self): + super().__init__() + self.logger = logging.getLogger(f'{self.__class__.__name__}') + self.file_importer = FileImporter() + + def importer(self, dir_path): + if not os.path.isdir(dir_path): + message = f'Error, {dir_path} is not a directory' + self.logger.warning(message) + raise Exception(message) + + for dirname, _, filenames in os.walk(dir_path): + for filename in filenames: + path = os.path.join(dirname, filename) + self.file_importer.importer(path) + + +# if __name__ == '__main__': +# import argparse +# # TODO multiple files/dirs ??? +# parser = argparse.ArgumentParser(description='Directory or file importer') +# parser.add_argument('-d', '--directory', type=str, help='Root directory to import') +# parser.add_argument('-f', '--file', type=str, help='File to import') +# args = parser.parse_args() +# +# if not args.directory and not args.file: +# parser.print_help() +# sys.exit(0) +# +# if args.directory: +# dir_path = args.directory +# dir_importer = DirImporter() +# dir_importer.importer(dir_path) +# +# if args.file: +# file_path = args.file +# file_importer = FileImporter() +# file_importer.importer(file_path) diff --git a/bin/lib/ail_files.py b/bin/lib/ail_files.py new file mode 100755 index 00000000..26929873 --- /dev/null +++ b/bin/lib/ail_files.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import base64 +import datetime +import gzip +import logging.config +import magic +import os +import sys + +from werkzeug.utils import secure_filename + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from lib import ail_logger +from lib.ail_core import generate_uuid +# from lib import ConfigLoader +from packages import Date + +logging.config.dictConfig(ail_logger.get_config(name='modules')) +logger = logging.getLogger() + +# config_loader = ConfigLoader.ConfigLoader() +# r_serv = config_loader.get_db_conn("Kvrocks_Stats") # TODO CHANGE DB +# r_cache = config_loader.get_redis_conn("Redis_Log_submit") +# +# # Text max size +# TEXT_MAX_SIZE = ConfigLoader.ConfigLoader().get_config_int("SubmitPaste", "TEXT_MAX_SIZE") +# # File max size +# FILE_MAX_SIZE = ConfigLoader.ConfigLoader().get_config_int("SubmitPaste", "FILE_MAX_SIZE") +# # Allowed file type +# ALLOWED_EXTENSIONS = ConfigLoader.ConfigLoader().get_config_str("SubmitPaste", "FILE_ALLOWED_EXTENSIONS").split(',') +# config_loader = None +# +# # TODO generate UUID +# +# # TODO Source ???? +# +# # TODO RENAME ME +# class Submit: +# def __init__(self, submit_uuid): +# self.uuid = submit_uuid +# +# def exists(self): +# return r_serv.exists(f'submit:{self.uuid}') +# +# def is_item(self): +# return r_serv.hexists(f'submit:{self.uuid}', 'content') +# +# def is_file(self): +# return r_serv.hexists(f'submit:{self.uuid}', 'filename') +# +# def get_filename(self): +# return r_serv.hget(f'submit:{self.uuid}', 'filename') +# +# def get_content(self): +# return r_serv.hget(f'submit:{self.uuid}', 'content') +# +# def get_password(self): +# r_serv.hget(f'submit:{self.uuid}', 'password') +# +# def get_tags(self): +# return r_serv.smembers(f'submit:tags:{self.uuid}') +# +# def get_error(self): +# return r_cache.hget(f'submit:{self.uuid}:', 'error') +# +# def get_stats(self): +# stats = {'ended': r_cache.hget(f'submit:{self.uuid}', 'ended'), # boolean +# 'objs': r_cache.hget(f'submit:{self.uuid}', 'objs'), # objs IDs +# 'nb_files': r_cache.hget(f'submit:{self.uuid}', 'nb_files'), +# 'nb_done': r_cache.hget(f'submit:{self.uuid}', 'nb_done'), +# 'submitted': r_cache.hget(f'submit:{self.uuid}', 'submitted'), +# 'error': self.get_error()} +# return stats +# +# +# def get_meta(self): +# meta = {'uuid': self.uuid} +# return meta +# +# def is_compressed(self): +# pass +# +# +# def abort(self, message): +# self.set_error(message) +# r_cache.hset(f'submit:{self.uuid}', 'ended', 'True') +# self.delete() +# +# def set_error(self, message): +# +# r_serv.hset(f'submit:{self.uuid}', 'error', ) +# +# # source ??? +# def create(self, content='', filename='', tags=[], password=None): +# +# +# +# +# r_serv.sadd(f'submits:all') +# +# +# def delete(self): +# r_serv.srem(f'submits:all', self.uuid) +# r_cache.delete(f'submit:{self.uuid}') +# r_serv.delete(f'submit:tags:{self.uuid}') +# r_serv.delete(f'submit:{self.uuid}') +# +# +# def create_submit(tags=[]): +# submit_uuid = generate_uuid() +# submit = Submit(submit_uuid) +# +# def api_create_submit(): +# pass + + +######################################################################################### +######################################################################################### +######################################################################################### + +ARCHIVE_MIME_TYPE = { + 'application/zip', + # application/bzip2 + 'application/x-bzip2', + 'application/java-archive', + 'application/x-tar', + 'application/gzip', + # application/x-gzip + 'application/x-lzma', + 'application/x-xz', + # application/x-xz-compressed-tar + 'application/x-lz', + 'application/x-7z-compressed', + 'application/x-rar', + # application/x-rar-compressed + 'application/x-iso9660-image', + 'application/vnd.ms-cab-compressed', + # application/x-lzma + # application/x-compress + # application/x-lzip + # application/x-lz4 + # application/zstd +} + +def is_archive(mimetype): + return mimetype in ARCHIVE_MIME_TYPE + +def is_text(mimetype): + return mimetype.split('/')[0] == 'text' + + +def get_mimetype(b_content): + return magic.from_buffer(b_content, mime=True) + +def create_item_id(feeder_name, path): + names = path.split('/') + try: + date = datetime.datetime(int(names[-4]), int(names[-3]), int(names[-2])).strftime("%Y%m%d") + basename = names[-1] + except (IndexError, ValueError): + date = Date.get_today_date_str() + basename = path # TODO check max depth + date = f'{date[0:4]}/{date[4:6]}/{date[6:8]}' + basename = secure_filename(basename) + if len(basename) < 1: + basename = generate_uuid() + if len(basename) > 215: + basename = basename[-215:] + str(generate_uuid()) + if not basename.endswith('.gz'): + basename = basename.replace('.', '_') + basename = f'{basename}.gz' + else: + nb = basename.count('.') - 1 + if nb > 0: + basename = basename.replace('.', '_', nb) + item_id = os.path.join(feeder_name, date, basename) + # TODO check if already exists + return item_id + +def create_b64(b_content): + return base64.standard_b64encode(b_content).decode() + +def create_gzipped_b64(b_content): + try: + gzipencoded = gzip.compress(b_content) + gzip64encoded = create_b64(gzipencoded) + return gzip64encoded + except Exception as e: + logger.warning(e) + return '' diff --git a/configs/modules.cfg b/configs/modules.cfg index 360dfbf1..262099ac 100644 --- a/configs/modules.cfg +++ b/configs/modules.cfg @@ -7,6 +7,9 @@ publish = Importers [Importer_Json] publish = Importers,Tags +[FileImporter] +publish = Importers + [PystemonModuleImporter] publish = Importers diff --git a/tools/file_dir_importer.py b/tools/file_dir_importer.py index 9ff6f884..c5db7f5d 100755 --- a/tools/file_dir_importer.py +++ b/tools/file_dir_importer.py @@ -1,119 +1,40 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +DIR/File Importer Helper +================ + +Import Content + +""" -import zmq -import base64 -from io import StringIO -import datetime -import gzip import argparse -import binascii import os -import time, datetime -import re +import sys -''' -' -' Import content/pastes into redis. -' If content is not compressed yet, compress it (only text). -' -' /!\ WARNING /!\ - Content to be imported can be placed in a directory tree of the form - root/ - | - +-- Year/ - | - +-- Month/ - | - +-- Day/ - | - +-- Content - e.g.: - ~/to_import/2017/08/22/paste1.gz +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from importer import FileImporter - or this directory tree will be created with the current date - e.g.: - ~/to_import/paste1.gz -' -''' - -def is_gzip_file(magic_nuber): - return binascii.hexlify(magic_nuber) == b'1f8b' - -def is_hierachy_valid(path): - var = path.split('/') - try: - newDate = datetime.datetime(int(var[-4]), int(var[-3]), int(var[-2])) - correctDate = True - except ValueError: - correctDate = False - except IndexError: - correctDate = False - except: - correctDate = False - return correctDate - -def sanitize_str(str_var, invalid_char_regex): - res = re.sub(invalid_char_regex, "-", str_var) - return res.replace(' ', '_') if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Take files from a directory and push them into a 0MQ feed.') - parser.add_argument('-d', '--directory', type=str, required=True, help='Root directory to import') - parser.add_argument('-p', '--port', type=int, default=5556, help='Zero MQ port') - parser.add_argument('-c', '--channel', type=str, default='102', help='Zero MQ channel') - parser.add_argument('-n', '--name', type=str, default='import_dir', help='Name of the feeder') - parser.add_argument('-s', '--seconds', type=float, default=0.2, help='Second between pastes') - parser.add_argument('--hierarchy', type=int, default=1, help='Number of parent directory forming the name') - + parser = argparse.ArgumentParser(description='Directory or file importer') + parser.add_argument('-d', '--directory', type=str, help='Root directory to import') + parser.add_argument('-f', '--file', type=str, help='File to import') args = parser.parse_args() - context = zmq.Context() - socket = context.socket(zmq.PUB) - socket.bind("tcp://*:{}".format(args.port)) - time.sleep(1) #Important, avoid loosing the 1 message + if not args.directory and not args.file: + parser.print_help() + sys.exit(0) - invalid_char = r'[\\/*?&%=:"<>|#\\\']' - invalid_char_dir = r'[\\*?&%=:"<>|#\\\']' + if args.directory: + dir_path = args.directory + dir_importer = FileImporter.DirImporter() + dir_importer.importer(dir_path) - for dirname, dirnames, filenames in os.walk(args.directory): - for filename in filenames: - complete_path = os.path.join(dirname, filename) - - with open(complete_path, 'rb') as f: - messagedata = f.read() - - #verify that the data is gzipEncoded. if not compress it - if not is_gzip_file(messagedata[0:2]): - messagedata = gzip.compress(messagedata) - complete_path += '.gz' - - if complete_path[-4:] != '.gz': - - #if paste do not have a 'date hierarchy', create it - if not is_hierachy_valid(complete_path): - now = datetime.datetime.now() - paste_name = complete_path.split('/')[-1] - paste_name = sanitize_str(paste_name, invalid_char) - directory = complete_path.split('/')[-2] - directory = sanitize_str(directory, invalid_char_dir) - wanted_path = os.path.join(directory, now.strftime("%Y"), now.strftime("%m"), now.strftime("%d"), paste_name) - wanted_path = os.path.relpath(wanted_path) - else: - #take wanted path of the file - wanted_path = os.path.relpath(complete_path) - wanted_path = wanted_path.split('/') - wanted_path = '/'.join(wanted_path[-(4+args.hierarchy):]) - wanted_path = sanitize_str(wanted_path, invalid_char_dir) - - # sanitize feeder_name - feeder_name = os.path.relpath(sanitize_str(args.name, invalid_char)) - - path_to_send = 'import_dir/' + feeder_name + '>>' + wanted_path - s = b' '.join( [ args.channel.encode(), path_to_send.encode(), base64.b64encode(messagedata) ] ) - socket.send(s) - print('import_dir/' + feeder_name+'>>'+wanted_path) - time.sleep(args.seconds) - - else: - print('{} : incorrect type'.format(complete_path)) + if args.file: + file_path = args.file + file_importer = FileImporter.FileImporter() + file_importer.importer(file_path)