diff --git a/.gitignore b/.gitignore index 521f1ce..0741e6d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ deb/ *img ledBtn + +python/build/* +pyton/dist/* +python/kittengroomer.egg-info/* diff --git a/python/bin/generic.py b/python/bin/generic.py new file mode 100644 index 0000000..21a58f8 --- /dev/null +++ b/python/bin/generic.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import magic +import os +import mimetypes +import shlex +import subprocess +import time + +from kittengroomer import FileBase, KittenGroomerBase, main + +UNOCONV = '/usr/bin/unoconv' +LIBREOFFICE = '/usr/bin/libreoffice' +GS = '/usr/bin/gs' +PDF2HTMLEX = '/usr/bin/pdf2htmlEX' +SEVENZ = '/usr/bin/7z' + + +# Prepare application/ +mimes_office = ['msword', 'vnd.openxmlformats-officedocument.', 'vnd.ms-', + 'vnd.oasis.opendocument'] +mimes_pdf = ['pdf'] +mimes_xml = ['xml'] +mimes_ms = ['x-dosexec'] +mimes_compressed = ['zip', 'x-rar', 'x-bzip2', 'x-lzip', 'x-lzma', 'x-lzop', + 'x-xz', 'x-compress', 'x-gzip', 'x-tar', 'compressed'] +mimes_data = ['octet-stream'] + + +class File(FileBase): + + def __init__(self, src_path, dst_path): + ''' Init file object, set the mimetype ''' + super(File, self).__init__(src_path, dst_path) + mimetype = magic.from_file(src_path, mime=True) + self.main_type, self.sub_type = mimetype.split('/') + self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type}) + self.expected_mimetype, self.expected_extensions = self.crosscheck_mime() + self.is_recursive = False + + def crosscheck_mime(self): + ''' + Set the expected mime and extension variables based on mime type. + ''' + # /usr/share/mime has interesting stuff + + # guess_type uses the extension to get a mime type + expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False) + if expected_mimetype is not None: + expected_extensions = mimetypes.guess_all_extensions(expected_mimetype, + strict=False) + else: + # the extension is unknown... + expected_extensions = None + + return expected_mimetype, expected_extensions + + def verify_extension(self): + '''Check if the extension is the one we expect''' + if self.expected_extensions is None: + return None + path, actual_extension = os.path.splitext(self.src_path) + return actual_extension in self.expected_extensions + + def verify_mime(self): + '''Check if the mime is the one we expect''' + if self.expected_mimetype is None: + return None + actual_mimetype = '{}/{}'.format(self.main_type, self.sub_type) + return actual_mimetype == self.expected_mimetype + + +class KittenGroomer(KittenGroomerBase): + + def __init__(self, root_src=None, root_dst=None, max_recursive=5): + ''' + Initialize the basics of the conversion process + ''' + if root_src is None: + root_src = os.path.join(os.sep, 'media', 'src') + if root_dst is None: + root_dst = os.path.join(os.sep, 'media', 'dst') + super(KittenGroomer, self).__init__(root_src, root_dst) + + self.recursive = 0 + self.max_recursive = max_recursive + + subtypes_apps = [ + (mimes_office, self._office_related), + (mimes_pdf, self._pdf), + (mimes_xml, self._office_related), + (mimes_ms, self._executables), + (mimes_compressed, self._archive), + (mimes_data, self._binary_app), + ] + self.subtypes_application = self._init_subtypes_application(subtypes_apps) + + self.mime_processing_options = { + 'text': self.text, + 'audio': self.audio, + 'image': self.image, + 'video': self.video, + 'application': self.application, + 'example': self.example, + 'message': self.message, + 'model': self.model, + 'multipart': self.multipart, + 'inode': self.inode, + } + + # Dirty trick to run libreoffice at least once and avoid unoconv to crash... + self._run_process(LIBREOFFICE, 5) + + # ##### Helpers ##### + def _init_subtypes_application(self, subtypes_application): + ''' + Create the Dict to pick the right function based on the sub mime type + ''' + to_return = {} + for list_subtypes, fct in subtypes_application: + for st in list_subtypes: + to_return[st] = fct + return to_return + + def _print_log(self): + ''' + Print the logs related to the current file being processed + ''' + tmp_log = self.log_name.fields(**self.cur_file.log_details) + if self.cur_file.log_details.get('dangerous'): + tmp_log.warning(self.cur_file.log_string) + elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'): + tmp_log.info(self.cur_file.log_string) + else: + tmp_log.debug(self.cur_file.log_string) + + def _run_process(self, command_line, timeout=0): + '''Run subprocess, wait until it finishes''' + if timeout != 0: + deadline = time.time() + timeout + else: + deadline = None + args = shlex.split(command_line) + p = subprocess.Popen(args) + while True: + code = p.poll() + if code is not None: + break + if deadline is not None and time.time() > deadline: + p.kill() + break + time.sleep(1) + return True + + ####################### + + # ##### Discarded mime types, reason in the comments ###### + def inode(self): + ''' Usually empty file. No reason (?) to copy it on the dest key''' + self.cur_file.log_string += 'Inode file' + + def unknown(self): + ''' This main type is unknown, that should not happen ''' + self.cur_file.log_string += 'Unknown file' + + # ##### Threated as malicious, no reason to have it on a USB key ###### + def example(self): + '''Way to process example file''' + self.cur_file.log_string += 'Example file' + self.cur_file.make_dangerous() + self._safe_copy() + + def message(self): + '''Way to process message file''' + self.cur_file.log_string += 'Message file' + self.cur_file.make_dangerous() + self._safe_copy() + + def model(self): + '''Way to process model file''' + self.cur_file.log_string += 'Model file' + self.cur_file.make_dangerous() + self._safe_copy() + + def multipart(self): + '''Way to process multipart file''' + self.cur_file.log_string += 'Multipart file' + self.cur_file.make_dangerous() + self._safe_copy() + + ####################### + + # ##### Converted ###### + def text(self): + ''' LibreOffice should be able to open all the files ''' + self.cur_file.log_string += 'Text file' + self._office_related() + + def application(self): + ''' Everything can be there, using the subtype to decide ''' + for subtype, fct in list(self.subtypes_application.items()): + if subtype in self.cur_file.sub_type: + fct() + self.cur_file.log_string += 'Application file' + return + self.cur_file.log_string += 'Unknown Application file' + self._unknown_app() + + def _executables(self): + '''Way to process executable file''' + self.cur_file.add_log_details('processing_type', 'executable') + self.cur_file.make_dangerous() + self._safe_copy() + + def _office_related(self): + '''Way to process all the files LibreOffice can handle''' + self.cur_file.add_log_details('processing_type', 'office') + dst_dir, filename = os.path.split(self.cur_file.dst_path) + tmpdir = os.path.join(dst_dir, 'temp') + name, ext = os.path.splitext(filename) + tmppath = os.path.join(tmpdir, name + '.pdf') + self._safe_mkdir(tmpdir) + lo_command = '{} --format pdf -eSelectPdfVersion=1 --output {} {}'.format( + UNOCONV, tmppath, self.cur_file.src_path) + self._run_process(lo_command) + self._pdfa(tmppath) + self._safe_rmtree(tmpdir) + + def _pdfa(self, tmpsrcpath): + '''Way to process PDF/A file''' + pdf_command = '{} --dest-dir / {} {}'.format(PDF2HTMLEX, tmpsrcpath, + self.cur_file.dst_path + '.html') + self._run_process(pdf_command) + + def _pdf(self): + '''Way to process PDF file''' + self.cur_file.add_log_details('processing_type', 'pdf') + dst_dir, filename = os.path.split(self.cur_file.dst_path) + tmpdir = os.path.join(dst_dir, 'temp') + tmppath = os.path.join(tmpdir, filename) + self._safe_mkdir(tmpdir) + gs_command = '{} -dPDFA -dBATCH -dNOPAUSE -sDEVICE=pdfwrite -sOutputFile={} {}'.format( + GS, tmppath, self.cur_file.src_path) + self._run_process(gs_command) + self._pdfa(tmppath) + self._safe_rmtree(tmpdir) + + def _archive(self): + '''Way to process Archive''' + self.cur_file.add_log_details('processing_type', 'archive') + self.cur_file.is_recursive = True + self.cur_file.log_string += 'Archive extracted, processing content.' + tmpdir = self.cur_file.dst_path + '_temp' + self._safe_mkdir(tmpdir) + extract_command = '{} -p1 x {} -o{} -bd'.format(SEVENZ, self.cur_file.src_path, tmpdir) + self._run_process(extract_command) + self.recursive += 1 + self.processdir(tmpdir, self.cur_file.dst_path) + self.recursive -= 1 + self._safe_rmtree(tmpdir) + + def _unknown_app(self): + '''Way to process an unknown file''' + self.cur_file.make_unknown() + self._safe_copy() + + def _binary_app(self): + '''Way to process an unknown binary file''' + self.cur_file.make_binary() + self._safe_copy() + + ####################### + + # ##### Not converted, checking the mime type ###### + def audio(self): + '''Way to process an audio file''' + self.cur_file.log_string += 'Audio file' + self._media_processing() + + def image(self): + '''Way to process an image''' + self.cur_file.log_string += 'Image file' + self._media_processing() + + def video(self): + '''Way to process a video''' + self.cur_file.log_string += 'Video file' + self._media_processing() + + def _media_processing(self): + '''Generic way to process all the media files''' + self.cur_log.fields(processing_type='media') + if not self.cur_file.verify_mime() or not self.cur_file.verify_extension(): + # The extension is unknown or doesn't match the mime type => suspicious + # TODO: write details in the logfile + self.cur_file.make_dangerous() + self._safe_copy() + + ####################### + + def processdir(self, src_dir=None, dst_dir=None): + ''' + Main function doing the processing + ''' + if src_dir is None: + src_dir = self.src_root_dir + if dst_dir is None: + dst_dir = self.dst_root_dir + + if self.recursive > 0: + self._print_log() + + if self.recursive >= self.max_recursive: + self.cur_log.warning('ARCHIVE BOMB.') + self.cur_log.warning('The content of the archive contains recursively other archives.') + self.cur_log.warning('This is a bad sign so the archive is not extracted to the destination key.') + self._safe_rmtree(src_dir) + if src_dir.endswith('_temp'): + archbomb_path = src_dir[:-len('_temp')] + self._safe_remove(archbomb_path) + + for srcpath in self._list_all_files(src_dir): + self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir)) + + self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''), + self.cur_file.main_type, self.cur_file.sub_type) + self.mime_processing_options.get(self.cur_file.main_type, self.unknown)() + if not self.cur_file.is_recursive: + self._print_log() + +if __name__ == '__main__': + main(KittenGroomer, 'Generic version of the KittenGroomer. Convert and rename files.') diff --git a/python/bin/pier9.py b/python/bin/pier9.py new file mode 100644 index 0000000..004494a --- /dev/null +++ b/python/bin/pier9.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os + +from kittengroomer import FileBase, KittenGroomerBase, main + + +printers = ['.STL', '.obj'] +cnc = ['.nc', '.tap', '.gcode', '.dxf', '.stl', '.obj', '.iges', '.igs', + '.vrml', '.vrl', '.thing', '.step', '.stp', '.x3d'] +shopbot = ['.ai', '.svg', '.dxf', '.dwg', '.eps'] +omax = ['.ai', '.svg', '.dxf', '.dwg', '.eps', '.omx', '.obj'] +epilog_laser = ['.ai', '.svg', '.dxf', '.dwg', '.eps'] +metabeam = ['.dxf'] +up = ['.upp', '.up3', '.stl', '.obj'] + + +class FilePier9(FileBase): + + def __init__(self, src_path, dst_path): + ''' Init file object, set the extension ''' + super(FilePier9, self).__init__(src_path, dst_path) + a, self.extension = os.path.splitext(self.src_path) + + +class KittenGroomerPier9(KittenGroomerBase): + + def __init__(self, root_src=None, root_dst=None): + ''' + Initialize the basics of the copy + ''' + if root_src is None: + root_src = os.path.join(os.sep, 'media', 'src') + if root_dst is None: + root_dst = os.path.join(os.sep, 'media', 'dst') + super(KittenGroomerPier9, self).__init__(root_src, root_dst) + + # The initial version will accept all the file extension for all the machines. + self.authorized_extensions = printers + cnc + shopbot + omax + epilog_laser + metabeam + up + + def _print_log(self): + ''' + Print the logs related to the current file being processed + ''' + tmp_log = self.log_name.fields(**self.cur_file.log_details) + if not self.cur_file.log_details.get('valid'): + tmp_log.warning(self.cur_file.log_string) + else: + tmp_log.debug(self.cur_file.log_string) + + def processdir(self): + ''' + Main function doing the processing + ''' + for srcpath in self._list_all_files(self.src_root_dir): + self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) + self.cur_file = FilePier9(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) + if self.cur_file.extension in self.authorized_extensions: + self.cur_file.add_log_details('valid', True) + self.cur_file.log_string = 'Expected extension: ' + self.cur_file.extension + self._safe_copy() + else: + self.cur_file.log_string = 'Bad extension: ' + self.cur_file.extension + self._print_log() + + +if __name__ == '__main__': + main(KittenGroomerPier9, 'Pier 9 version of the KittenGroomer. Only copy some files.') diff --git a/python/kittengroomer/__init__.py b/python/kittengroomer/__init__.py new file mode 100644 index 0000000..0b6ceb6 --- /dev/null +++ b/python/kittengroomer/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from helpers import FileBase, KittenGroomerBase, main diff --git a/python/kittengroomer/helpers.py b/python/kittengroomer/helpers.py new file mode 100644 index 0000000..3e1fd09 --- /dev/null +++ b/python/kittengroomer/helpers.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os +import shutil +from twiggy import quickSetup, log +import argparse + + +class KittenGroomerError(Exception): + def __init__(self, message): + ''' + Base KittenGroomer exception handler. + ''' + super(KittenGroomerError, self).__init__(message) + self.message = message + + +class ImplementationRequired(KittenGroomerError): + ''' + Implementation required error + ''' + pass + + +class FileBase(object): + + def __init__(self, src_path, dst_path): + ''' + Contains base information for a file on the source USB key, + initialised with expected src and dest path + ''' + self.src_path = src_path + self.dst_path = dst_path + self.log_details = {'filepath': self.src_path} + self.log_string = '' + + def add_log_details(self, key, value): + ''' + Add an entry in the log dictionary + ''' + self.log_details[key] = value + + def make_dangerous(self): + ''' + This file should be considered as dangerous and never run. + Prepending and appending DANGEROUS to the destination + file name avoid double-click of death + ''' + self.log_details['dangerous'] = True + path, filename = os.path.split(self.dst_path) + self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename)) + + def make_unknown(self): + ''' + This file has an unknown type and it was not possible to take + a decision. Theuser will have to decide what to do. + Prepending UNKNOWN + ''' + self.log_details['unknown'] = True + path, filename = os.path.split(self.dst_path) + self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename)) + + def make_binary(self): + ''' + This file is a binary, and should probably not be run. + Appending .bin avoir double click of death but the user + will have to decide by itself. + ''' + self.log_details['binary'] = True + path, filename = os.path.split(self.dst_path) + self.dst_path = os.path.join(path, '{}.bin'.format(filename)) + + +class KittenGroomerBase(object): + + def __init__(self, root_src, root_dst): + ''' + Setup the base options of the copy/convert setup + ''' + self.src_root_dir = root_src + self.dst_root_dir = root_dst + self.log_root_dir = os.path.join(self.dst_root_dir, 'logs') + self.log_processing = os.path.join(self.log_root_dir, 'processing.log') + + # quickSetup(file=self.log_processing) + quickSetup() + self.log_name = log.name('files') + + self.cur_file = None + + # ##### Helpers ##### + def _safe_rmtree(self, directory): + '''Remove a directory tree if it exists''' + if os.path.exists(directory): + shutil.rmtree(directory) + + def _safe_remove(self, filepath): + '''Remove a file if it exists''' + if os.path.exists(filepath): + os.remove(filepath) + + def _safe_mkdir(self, directory): + '''Remove a directory if it exists''' + if not os.path.exists(directory): + os.makedirs(directory) + + def _safe_copy(self): + ''' Copy a file and create directory if needed ''' + try: + dst_path, filename = os.path.split(self.cur_file.dst_path) + self._safe_mkdir(dst_path) + shutil.copy(self.cur_file.src_path, self.cur_file.dst_path) + return True + except Exception as e: + # TODO: Logfile + print(e) + return False + + def _list_all_files(self, directory): + ''' Generate an iterator over all the files in a directory tree ''' + for root, dirs, files in os.walk(directory): + for filename in files: + filepath = os.path.join(root, filename) + yield filepath + + def _print_log(self): + ''' + Print log, should be called after each file. + + You probably want to reimplement it in the subclass + ''' + tmp_log = self.log_name.fields(**self.cur_file.log_details) + tmp_log.info('It did a thing.') + + ####################### + + def processdir(self, src_dir=None, dst_dir=None): + ''' + Main function doing the work, you have to implement it yourself. + ''' + raise ImplementationRequired('You have to implement the result processdir.') + + +def main(kg_implementation, description='Call the KittenGroomer implementation to do things on files present in the source directory to the destination directory'): + parser = argparse.ArgumentParser(prog='KittenGroomer', description=description) + parser.add_argument('-s', '--source', type=str, help='Source directory') + parser.add_argument('-d', '--destination', type=str, help='Destination directory') + args = parser.parse_args() + kg = kg_implementation(args.source, args.destination) + kg.processdir() diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000..d5b9c9b --- /dev/null +++ b/python/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +from setuptools import setup + +setup( + name='kittengroomer', + version='1.0', + author='Raphaël Vinot', + author_email='raphael.vinot@circl.lu', + maintainer='Raphaël Vinot', + url='https://github.com/CIRCL/CIRCLean', + description='Standalone CIRCLean/KittenGroomer code.', + packages=['kittengroomer'], + scripts=['bin/generic.py', 'bin/pier9.py'], + classifiers=[ + 'License :: OSI Approved :: BSD License', + 'Development Status :: 5 - Production/Stable', + 'Environment :: Console', + 'Intended Audience :: Science/Research', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Topic :: Communications :: File Sharing', + 'Topic :: Security', + ], + install_requires=['twiggy'], +)