diff --git a/fs/opt/groomer/groomer.sh b/fs/opt/groomer/groomer.sh index 7e96459..f87ce4d 100755 --- a/fs/opt/groomer/groomer.sh +++ b/fs/opt/groomer/groomer.sh @@ -102,7 +102,7 @@ do LOGFILE="${LOGS}/processing.txt" echo "==== Starting processing of /media/${SRC} to ${target_dir}. ====" >> ${LOGFILE} - python ./functions.py --source /media/${SRC} --destination ${target_dir} || true + generic.py --source /media/${SRC} --destination ${target_dir} || true echo "==== Done with /media/${SRC} to ${target_dir}. ====" >> ${LOGFILE} ls -lR "${target_dir}" diff --git a/fs/opt/groomer/init.sh b/fs/opt/groomer/init.sh index 821b8dc..a926213 100755 --- a/fs/opt/groomer/init.sh +++ b/fs/opt/groomer/init.sh @@ -1,7 +1,7 @@ #!/bin/bash set -e -#set -x +set -x source ./constraint.sh diff --git a/python/bin/generic.py b/python/bin/generic.py deleted file mode 100644 index 21a58f8..0000000 --- a/python/bin/generic.py +++ /dev/null @@ -1,332 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import magic -import os -import mimetypes -import shlex -import subprocess -import time - -from kittengroomer import FileBase, KittenGroomerBase, main - -UNOCONV = '/usr/bin/unoconv' -LIBREOFFICE = '/usr/bin/libreoffice' -GS = '/usr/bin/gs' -PDF2HTMLEX = '/usr/bin/pdf2htmlEX' -SEVENZ = '/usr/bin/7z' - - -# Prepare application/ -mimes_office = ['msword', 'vnd.openxmlformats-officedocument.', 'vnd.ms-', - 'vnd.oasis.opendocument'] -mimes_pdf = ['pdf'] -mimes_xml = ['xml'] -mimes_ms = ['x-dosexec'] -mimes_compressed = ['zip', 'x-rar', 'x-bzip2', 'x-lzip', 'x-lzma', 'x-lzop', - 'x-xz', 'x-compress', 'x-gzip', 'x-tar', 'compressed'] -mimes_data = ['octet-stream'] - - -class File(FileBase): - - def __init__(self, src_path, dst_path): - ''' Init file object, set the mimetype ''' - super(File, self).__init__(src_path, dst_path) - mimetype = magic.from_file(src_path, mime=True) - self.main_type, self.sub_type = mimetype.split('/') - self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type}) - self.expected_mimetype, self.expected_extensions = self.crosscheck_mime() - self.is_recursive = False - - def crosscheck_mime(self): - ''' - Set the expected mime and extension variables based on mime type. - ''' - # /usr/share/mime has interesting stuff - - # guess_type uses the extension to get a mime type - expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False) - if expected_mimetype is not None: - expected_extensions = mimetypes.guess_all_extensions(expected_mimetype, - strict=False) - else: - # the extension is unknown... - expected_extensions = None - - return expected_mimetype, expected_extensions - - def verify_extension(self): - '''Check if the extension is the one we expect''' - if self.expected_extensions is None: - return None - path, actual_extension = os.path.splitext(self.src_path) - return actual_extension in self.expected_extensions - - def verify_mime(self): - '''Check if the mime is the one we expect''' - if self.expected_mimetype is None: - return None - actual_mimetype = '{}/{}'.format(self.main_type, self.sub_type) - return actual_mimetype == self.expected_mimetype - - -class KittenGroomer(KittenGroomerBase): - - def __init__(self, root_src=None, root_dst=None, max_recursive=5): - ''' - Initialize the basics of the conversion process - ''' - if root_src is None: - root_src = os.path.join(os.sep, 'media', 'src') - if root_dst is None: - root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomer, self).__init__(root_src, root_dst) - - self.recursive = 0 - self.max_recursive = max_recursive - - subtypes_apps = [ - (mimes_office, self._office_related), - (mimes_pdf, self._pdf), - (mimes_xml, self._office_related), - (mimes_ms, self._executables), - (mimes_compressed, self._archive), - (mimes_data, self._binary_app), - ] - self.subtypes_application = self._init_subtypes_application(subtypes_apps) - - self.mime_processing_options = { - 'text': self.text, - 'audio': self.audio, - 'image': self.image, - 'video': self.video, - 'application': self.application, - 'example': self.example, - 'message': self.message, - 'model': self.model, - 'multipart': self.multipart, - 'inode': self.inode, - } - - # Dirty trick to run libreoffice at least once and avoid unoconv to crash... - self._run_process(LIBREOFFICE, 5) - - # ##### Helpers ##### - def _init_subtypes_application(self, subtypes_application): - ''' - Create the Dict to pick the right function based on the sub mime type - ''' - to_return = {} - for list_subtypes, fct in subtypes_application: - for st in list_subtypes: - to_return[st] = fct - return to_return - - def _print_log(self): - ''' - Print the logs related to the current file being processed - ''' - tmp_log = self.log_name.fields(**self.cur_file.log_details) - if self.cur_file.log_details.get('dangerous'): - tmp_log.warning(self.cur_file.log_string) - elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'): - tmp_log.info(self.cur_file.log_string) - else: - tmp_log.debug(self.cur_file.log_string) - - def _run_process(self, command_line, timeout=0): - '''Run subprocess, wait until it finishes''' - if timeout != 0: - deadline = time.time() + timeout - else: - deadline = None - args = shlex.split(command_line) - p = subprocess.Popen(args) - while True: - code = p.poll() - if code is not None: - break - if deadline is not None and time.time() > deadline: - p.kill() - break - time.sleep(1) - return True - - ####################### - - # ##### Discarded mime types, reason in the comments ###### - def inode(self): - ''' Usually empty file. No reason (?) to copy it on the dest key''' - self.cur_file.log_string += 'Inode file' - - def unknown(self): - ''' This main type is unknown, that should not happen ''' - self.cur_file.log_string += 'Unknown file' - - # ##### Threated as malicious, no reason to have it on a USB key ###### - def example(self): - '''Way to process example file''' - self.cur_file.log_string += 'Example file' - self.cur_file.make_dangerous() - self._safe_copy() - - def message(self): - '''Way to process message file''' - self.cur_file.log_string += 'Message file' - self.cur_file.make_dangerous() - self._safe_copy() - - def model(self): - '''Way to process model file''' - self.cur_file.log_string += 'Model file' - self.cur_file.make_dangerous() - self._safe_copy() - - def multipart(self): - '''Way to process multipart file''' - self.cur_file.log_string += 'Multipart file' - self.cur_file.make_dangerous() - self._safe_copy() - - ####################### - - # ##### Converted ###### - def text(self): - ''' LibreOffice should be able to open all the files ''' - self.cur_file.log_string += 'Text file' - self._office_related() - - def application(self): - ''' Everything can be there, using the subtype to decide ''' - for subtype, fct in list(self.subtypes_application.items()): - if subtype in self.cur_file.sub_type: - fct() - self.cur_file.log_string += 'Application file' - return - self.cur_file.log_string += 'Unknown Application file' - self._unknown_app() - - def _executables(self): - '''Way to process executable file''' - self.cur_file.add_log_details('processing_type', 'executable') - self.cur_file.make_dangerous() - self._safe_copy() - - def _office_related(self): - '''Way to process all the files LibreOffice can handle''' - self.cur_file.add_log_details('processing_type', 'office') - dst_dir, filename = os.path.split(self.cur_file.dst_path) - tmpdir = os.path.join(dst_dir, 'temp') - name, ext = os.path.splitext(filename) - tmppath = os.path.join(tmpdir, name + '.pdf') - self._safe_mkdir(tmpdir) - lo_command = '{} --format pdf -eSelectPdfVersion=1 --output {} {}'.format( - UNOCONV, tmppath, self.cur_file.src_path) - self._run_process(lo_command) - self._pdfa(tmppath) - self._safe_rmtree(tmpdir) - - def _pdfa(self, tmpsrcpath): - '''Way to process PDF/A file''' - pdf_command = '{} --dest-dir / {} {}'.format(PDF2HTMLEX, tmpsrcpath, - self.cur_file.dst_path + '.html') - self._run_process(pdf_command) - - def _pdf(self): - '''Way to process PDF file''' - self.cur_file.add_log_details('processing_type', 'pdf') - dst_dir, filename = os.path.split(self.cur_file.dst_path) - tmpdir = os.path.join(dst_dir, 'temp') - tmppath = os.path.join(tmpdir, filename) - self._safe_mkdir(tmpdir) - gs_command = '{} -dPDFA -dBATCH -dNOPAUSE -sDEVICE=pdfwrite -sOutputFile={} {}'.format( - GS, tmppath, self.cur_file.src_path) - self._run_process(gs_command) - self._pdfa(tmppath) - self._safe_rmtree(tmpdir) - - def _archive(self): - '''Way to process Archive''' - self.cur_file.add_log_details('processing_type', 'archive') - self.cur_file.is_recursive = True - self.cur_file.log_string += 'Archive extracted, processing content.' - tmpdir = self.cur_file.dst_path + '_temp' - self._safe_mkdir(tmpdir) - extract_command = '{} -p1 x {} -o{} -bd'.format(SEVENZ, self.cur_file.src_path, tmpdir) - self._run_process(extract_command) - self.recursive += 1 - self.processdir(tmpdir, self.cur_file.dst_path) - self.recursive -= 1 - self._safe_rmtree(tmpdir) - - def _unknown_app(self): - '''Way to process an unknown file''' - self.cur_file.make_unknown() - self._safe_copy() - - def _binary_app(self): - '''Way to process an unknown binary file''' - self.cur_file.make_binary() - self._safe_copy() - - ####################### - - # ##### Not converted, checking the mime type ###### - def audio(self): - '''Way to process an audio file''' - self.cur_file.log_string += 'Audio file' - self._media_processing() - - def image(self): - '''Way to process an image''' - self.cur_file.log_string += 'Image file' - self._media_processing() - - def video(self): - '''Way to process a video''' - self.cur_file.log_string += 'Video file' - self._media_processing() - - def _media_processing(self): - '''Generic way to process all the media files''' - self.cur_log.fields(processing_type='media') - if not self.cur_file.verify_mime() or not self.cur_file.verify_extension(): - # The extension is unknown or doesn't match the mime type => suspicious - # TODO: write details in the logfile - self.cur_file.make_dangerous() - self._safe_copy() - - ####################### - - def processdir(self, src_dir=None, dst_dir=None): - ''' - Main function doing the processing - ''' - if src_dir is None: - src_dir = self.src_root_dir - if dst_dir is None: - dst_dir = self.dst_root_dir - - if self.recursive > 0: - self._print_log() - - if self.recursive >= self.max_recursive: - self.cur_log.warning('ARCHIVE BOMB.') - self.cur_log.warning('The content of the archive contains recursively other archives.') - self.cur_log.warning('This is a bad sign so the archive is not extracted to the destination key.') - self._safe_rmtree(src_dir) - if src_dir.endswith('_temp'): - archbomb_path = src_dir[:-len('_temp')] - self._safe_remove(archbomb_path) - - for srcpath in self._list_all_files(src_dir): - self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir)) - - self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''), - self.cur_file.main_type, self.cur_file.sub_type) - self.mime_processing_options.get(self.cur_file.main_type, self.unknown)() - if not self.cur_file.is_recursive: - self._print_log() - -if __name__ == '__main__': - main(KittenGroomer, 'Generic version of the KittenGroomer. Convert and rename files.') diff --git a/python/bin/pier9.py b/python/bin/pier9.py deleted file mode 100644 index 004494a..0000000 --- a/python/bin/pier9.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import os - -from kittengroomer import FileBase, KittenGroomerBase, main - - -printers = ['.STL', '.obj'] -cnc = ['.nc', '.tap', '.gcode', '.dxf', '.stl', '.obj', '.iges', '.igs', - '.vrml', '.vrl', '.thing', '.step', '.stp', '.x3d'] -shopbot = ['.ai', '.svg', '.dxf', '.dwg', '.eps'] -omax = ['.ai', '.svg', '.dxf', '.dwg', '.eps', '.omx', '.obj'] -epilog_laser = ['.ai', '.svg', '.dxf', '.dwg', '.eps'] -metabeam = ['.dxf'] -up = ['.upp', '.up3', '.stl', '.obj'] - - -class FilePier9(FileBase): - - def __init__(self, src_path, dst_path): - ''' Init file object, set the extension ''' - super(FilePier9, self).__init__(src_path, dst_path) - a, self.extension = os.path.splitext(self.src_path) - - -class KittenGroomerPier9(KittenGroomerBase): - - def __init__(self, root_src=None, root_dst=None): - ''' - Initialize the basics of the copy - ''' - if root_src is None: - root_src = os.path.join(os.sep, 'media', 'src') - if root_dst is None: - root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomerPier9, self).__init__(root_src, root_dst) - - # The initial version will accept all the file extension for all the machines. - self.authorized_extensions = printers + cnc + shopbot + omax + epilog_laser + metabeam + up - - def _print_log(self): - ''' - Print the logs related to the current file being processed - ''' - tmp_log = self.log_name.fields(**self.cur_file.log_details) - if not self.cur_file.log_details.get('valid'): - tmp_log.warning(self.cur_file.log_string) - else: - tmp_log.debug(self.cur_file.log_string) - - def processdir(self): - ''' - Main function doing the processing - ''' - for srcpath in self._list_all_files(self.src_root_dir): - self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) - self.cur_file = FilePier9(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) - if self.cur_file.extension in self.authorized_extensions: - self.cur_file.add_log_details('valid', True) - self.cur_file.log_string = 'Expected extension: ' + self.cur_file.extension - self._safe_copy() - else: - self.cur_file.log_string = 'Bad extension: ' + self.cur_file.extension - self._print_log() - - -if __name__ == '__main__': - main(KittenGroomerPier9, 'Pier 9 version of the KittenGroomer. Only copy some files.') diff --git a/python/kittengroomer/__init__.py b/python/kittengroomer/__init__.py deleted file mode 100644 index 0b6ceb6..0000000 --- a/python/kittengroomer/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from helpers import FileBase, KittenGroomerBase, main diff --git a/python/kittengroomer/helpers.py b/python/kittengroomer/helpers.py deleted file mode 100644 index 3e1fd09..0000000 --- a/python/kittengroomer/helpers.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import os -import shutil -from twiggy import quickSetup, log -import argparse - - -class KittenGroomerError(Exception): - def __init__(self, message): - ''' - Base KittenGroomer exception handler. - ''' - super(KittenGroomerError, self).__init__(message) - self.message = message - - -class ImplementationRequired(KittenGroomerError): - ''' - Implementation required error - ''' - pass - - -class FileBase(object): - - def __init__(self, src_path, dst_path): - ''' - Contains base information for a file on the source USB key, - initialised with expected src and dest path - ''' - self.src_path = src_path - self.dst_path = dst_path - self.log_details = {'filepath': self.src_path} - self.log_string = '' - - def add_log_details(self, key, value): - ''' - Add an entry in the log dictionary - ''' - self.log_details[key] = value - - def make_dangerous(self): - ''' - This file should be considered as dangerous and never run. - Prepending and appending DANGEROUS to the destination - file name avoid double-click of death - ''' - self.log_details['dangerous'] = True - path, filename = os.path.split(self.dst_path) - self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename)) - - def make_unknown(self): - ''' - This file has an unknown type and it was not possible to take - a decision. Theuser will have to decide what to do. - Prepending UNKNOWN - ''' - self.log_details['unknown'] = True - path, filename = os.path.split(self.dst_path) - self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename)) - - def make_binary(self): - ''' - This file is a binary, and should probably not be run. - Appending .bin avoir double click of death but the user - will have to decide by itself. - ''' - self.log_details['binary'] = True - path, filename = os.path.split(self.dst_path) - self.dst_path = os.path.join(path, '{}.bin'.format(filename)) - - -class KittenGroomerBase(object): - - def __init__(self, root_src, root_dst): - ''' - Setup the base options of the copy/convert setup - ''' - self.src_root_dir = root_src - self.dst_root_dir = root_dst - self.log_root_dir = os.path.join(self.dst_root_dir, 'logs') - self.log_processing = os.path.join(self.log_root_dir, 'processing.log') - - # quickSetup(file=self.log_processing) - quickSetup() - self.log_name = log.name('files') - - self.cur_file = None - - # ##### Helpers ##### - def _safe_rmtree(self, directory): - '''Remove a directory tree if it exists''' - if os.path.exists(directory): - shutil.rmtree(directory) - - def _safe_remove(self, filepath): - '''Remove a file if it exists''' - if os.path.exists(filepath): - os.remove(filepath) - - def _safe_mkdir(self, directory): - '''Remove a directory if it exists''' - if not os.path.exists(directory): - os.makedirs(directory) - - def _safe_copy(self): - ''' Copy a file and create directory if needed ''' - try: - dst_path, filename = os.path.split(self.cur_file.dst_path) - self._safe_mkdir(dst_path) - shutil.copy(self.cur_file.src_path, self.cur_file.dst_path) - return True - except Exception as e: - # TODO: Logfile - print(e) - return False - - def _list_all_files(self, directory): - ''' Generate an iterator over all the files in a directory tree ''' - for root, dirs, files in os.walk(directory): - for filename in files: - filepath = os.path.join(root, filename) - yield filepath - - def _print_log(self): - ''' - Print log, should be called after each file. - - You probably want to reimplement it in the subclass - ''' - tmp_log = self.log_name.fields(**self.cur_file.log_details) - tmp_log.info('It did a thing.') - - ####################### - - def processdir(self, src_dir=None, dst_dir=None): - ''' - Main function doing the work, you have to implement it yourself. - ''' - raise ImplementationRequired('You have to implement the result processdir.') - - -def main(kg_implementation, description='Call the KittenGroomer implementation to do things on files present in the source directory to the destination directory'): - parser = argparse.ArgumentParser(prog='KittenGroomer', description=description) - parser.add_argument('-s', '--source', type=str, help='Source directory') - parser.add_argument('-d', '--destination', type=str, help='Destination directory') - args = parser.parse_args() - kg = kg_implementation(args.source, args.destination) - kg.processdir() diff --git a/python/setup.py b/python/setup.py deleted file mode 100644 index d5b9c9b..0000000 --- a/python/setup.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -from setuptools import setup - -setup( - name='kittengroomer', - version='1.0', - author='Raphaël Vinot', - author_email='raphael.vinot@circl.lu', - maintainer='Raphaël Vinot', - url='https://github.com/CIRCL/CIRCLean', - description='Standalone CIRCLean/KittenGroomer code.', - packages=['kittengroomer'], - scripts=['bin/generic.py', 'bin/pier9.py'], - classifiers=[ - 'License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Environment :: Console', - 'Intended Audience :: Science/Research', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Topic :: Communications :: File Sharing', - 'Topic :: Security', - ], - install_requires=['twiggy'], -) diff --git a/tests/run.sh b/tests/run.sh index 3480b34..a20e033 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -72,7 +72,7 @@ mount -o loop,offset=${OFFSET_VFAT_NORM} ${IMAGE_EXT4} ${SETUP_DIR} cp -rf content_img_vfat_norm/* ${SETUP_DIR} umount ${SETUP_DIR} -chmod -w ${IMAGE} +chmod a-w ${IMAGE} ./run.exp ${IMAGE} ${IMAGE_VFAT_NORM} ${IMAGE_DEST} #sleep 10 #./run.exp ${IMAGE} ${IMAGE_VFAT_PART} ${IMAGE_DEST}