diff --git a/bin/filecheck.py b/bin/filecheck.py index e58d478..8200eeb 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -6,6 +6,7 @@ import shlex import subprocess import zipfile import argparse +import shutil import oletools.oleid import olefile @@ -17,7 +18,7 @@ from PIL import Image # from PIL import PngImagePlugin from pdfid import PDFiD, cPDFiD -from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger +from kittengroomer import FileBase, KittenGroomerBase, Logging SEVENZ_PATH = '/usr/bin/7z' @@ -90,6 +91,7 @@ class File(FileBase): super(File, self).__init__(src_path, dst_path) self.is_recursive = False self.logger = logger + self.tempdir_path = self.dst_path + '_temp' subtypes_apps = [ (Config.mimes_office, self._winoffice), @@ -190,7 +192,12 @@ class File(FileBase): def write_log(self): props = self.get_all_props() - self.logger.add_file(props) + if not self.is_recursive: + if os.path.exists(self.tempdir_path): + # Hack to make images appear at the correct tree depth in log + self.logger.add_file(self.src_path, props, in_tempdir=True) + return + self.logger.add_file(self.src_path, props) # ##### Helper functions ##### def _make_method_dict(self, list_of_tuples): @@ -210,7 +217,6 @@ class File(FileBase): def make_tempdir(self): """Make a temporary directory at self.tempdir_path.""" - self.tempdir_path = self.dst_path + '_temp' if not os.path.exists(self.tempdir_path): os.makedirs(self.tempdir_path) return self.tempdir_path @@ -490,9 +496,93 @@ class File(FileBase): except Exception as e: # Catch decompression bombs # TODO: change this from all Exceptions to specific DecompressionBombWarning self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path)) - self.make_dangerous() - self.add_description('Image file') - self.set_property('processing_type', 'image') + self.make_dangerous('Image file containing decompression bomb') + if not self.is_dangerous: + self.add_description('Image file') + + +class GroomerLogger(object): + """Groomer logging interface.""" + + def __init__(self, src_root_path, dst_root_path, debug=False): + self._src_root_path = src_root_path + self._dst_root_path = dst_root_path + self._log_dir_path = self._make_log_dir(dst_root_path) + self.log_path = os.path.join(self._log_dir_path, 'circlean_log.txt') + self._add_root_dir(src_root_path) + if debug: + self.log_debug_err = os.path.join(self._log_dir_path, 'debug_stderr.log') + self.log_debug_out = os.path.join(self._log_dir_path, 'debug_stdout.log') + else: + self.log_debug_err = os.devnull + self.log_debug_out = os.devnull + + def _make_log_dir(self, root_dir_path): + """Make the directory in the dest dir that will hold the logs""" + log_dir_path = os.path.join(root_dir_path, 'logs') + if os.path.exists(log_dir_path): + shutil.rmtree(log_dir_path) + os.makedirs(log_dir_path) + return log_dir_path + + def _add_root_dir(self, root_path): + dirname = os.path.split(root_path)[1] + '/' + with open(self.log_path, mode='ab') as lf: + lf.write(bytes(dirname, 'utf-8')) + lf.write(b'\n') + + def add_file(self, file_path, file_props, in_tempdir=False): + """Add a file to the log. Takes a dict of file properties.""" + # TODO: fix var names in this method + # TODO: handle symlinks better: symlink_string = '{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)) + props = file_props + depth = self._get_path_depth(file_path) + description_string = ', '.join(props['description_string']) + file_hash = Logging.computehash(file_path)[:6] + if props['safety_category'] is None: + descr_cat = "Normal" + else: + descr_cat = props['safety_category'].capitalize() + # TODO: make size adjust to MB/GB for large files + size = str(props['file_size']) + 'B' + file_template = "+- {name} ({sha_hash}): {size}, {mt}/{st}. {desc}: {desc_str}" + file_string = file_template.format( + name=props['filename'], + sha_hash=file_hash, + size=size, + mt=props['maintype'], + st=props['subtype'], + desc=descr_cat, + desc_str=description_string, + # errs='' # TODO: add errors in human readable form here + ) + if in_tempdir: + depth -= 1 + self._write_line_to_log(file_string, depth) + + def add_dir(self, dir_path): + path_depth = self._get_path_depth(dir_path) + dirname = os.path.split(dir_path)[1] + '/' + log_line = '+- ' + dirname + self._write_line_to_log(log_line, path_depth) + + def _get_path_depth(self, path): + if self._dst_root_path in path: + base_path = self._dst_root_path + elif self._src_root_path in path: + base_path = self._src_root_path + relpath = os.path.relpath(path, base_path) + path_depth = relpath.count(os.path.sep) + return path_depth + + def _write_line_to_log(self, line, indentation_depth): + # TODO: should we use fsencode and fsdecode here instead of just bytestrings? + padding = b' ' + padding += b'| ' * indentation_depth + with open(self.log_path, mode='ab') as lf: + lf.write(padding) + lf.write(bytes(line, encoding='utf-8')) + lf.write(b'\n') class KittenGroomerFileCheck(KittenGroomerBase): @@ -502,20 +592,17 @@ class KittenGroomerFileCheck(KittenGroomerBase): self.recursive_archive_depth = 0 self.max_recursive_depth = max_recursive_depth self.cur_file = None - self.logger = GroomerLogger(self.dst_root_path, debug) + self.logger = GroomerLogger(root_src, root_dst, debug) def process_dir(self, src_dir, dst_dir): """Process a directory on the source key.""" - self.logger.tree(src_dir) - for srcpath in self.list_all_files(src_dir): - dstpath = srcpath.replace(src_dir, dst_dir) - # TODO: Can we clean up the way we handle relative_path? - # Relative path is here so that when we print files in the log it - # shows only the file's path. Should we just pass it to the logger - # when we create it? Or let the logger figure it out? - # relative_path = srcpath.replace(src_dir + '/', '') - self.cur_file = File(srcpath, dstpath, self.logger) - self.process_file(self.cur_file) + for srcpath in self.list_files_dirs(src_dir): + if os.path.isdir(srcpath): + self.logger.add_dir(srcpath) + else: + dstpath = os.path.join(dst_dir, os.path.basename(srcpath)) + self.cur_file = File(srcpath, dstpath, self.logger) + self.process_file(self.cur_file) def process_file(self, file): """ @@ -525,12 +612,13 @@ class KittenGroomerFileCheck(KittenGroomerBase): the file to the destionation key, and clean up temporary directory. """ file.check() - if file.is_recursive: - self.process_archive(file) - elif file.should_copy: + if file.should_copy: file.safe_copy() file.set_property('copied', True) - file.write_log() + file.write_log() + if file.is_recursive: + self.process_archive(file) + # TODO: Can probably handle cleaning up the tempdir better if hasattr(file, 'tempdir_path'): self.safe_rmtree(file.tempdir_path) @@ -547,11 +635,12 @@ class KittenGroomerFileCheck(KittenGroomerBase): else: tempdir_path = file.make_tempdir() # TODO: double check we are properly escaping file.src_path - # otherwise we are running unvalidated user input directly in the shell + # otherwise we are running unsanitized user input directly in the shell command_str = '{} -p1 x "{}" -o"{}" -bd -aoa' unpack_command = command_str.format(SEVENZ_PATH, file.src_path, tempdir_path) self._run_process(unpack_command) + file.write_log() self.process_dir(tempdir_path, file.dst_path) self.safe_rmtree(tempdir_path) self.recursive_archive_depth -= 1 @@ -566,6 +655,17 @@ class KittenGroomerFileCheck(KittenGroomerBase): return return True + def list_files_dirs(self, root_dir_path): + queue = [] + for path in sorted(os.listdir(root_dir_path), key=lambda x: str.lower(x)): + full_path = os.path.join(root_dir_path, path) + if os.path.isdir(full_path): + queue.append(full_path) + queue += self.list_files_dirs(full_path) # if path is a dir, recurse through its contents + elif os.path.isfile(full_path): + queue.append(full_path) + return queue + def run(self): self.process_dir(self.src_root_path, self.dst_root_path) diff --git a/kittengroomer/__init__.py b/kittengroomer/__init__.py index 8428553..25a7965 100644 --- a/kittengroomer/__init__.py +++ b/kittengroomer/__init__.py @@ -1,4 +1,4 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from .helpers import FileBase, KittenGroomerBase, GroomerLogger, main +from .helpers import FileBase, KittenGroomerBase, Logging, main diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 86c0fd6..6411404 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -14,7 +14,6 @@ import shutil import argparse import magic -# import twiggy class KittenGroomerError(Exception): @@ -286,52 +285,11 @@ class FileBase(object): return ext -class GroomerLogger(object): - """Groomer logging interface.""" +class Logging(object): - def __init__(self, root_dir_path, debug=False): - self._root_dir_path = root_dir_path - self._log_dir_path = self._make_log_dir(root_dir_path) - # LOG: rename logfile to something more descriptive - self.log_path = os.path.join(self._log_dir_path, 'log.txt') - # twiggy.quick_setup(file=self.log_processing) - # self.log = twiggy.log.name('files') - if debug: - self.log_debug_err = os.path.join(self._log_dir_path, 'debug_stderr.log') - self.log_debug_out = os.path.join(self._log_dir_path, 'debug_stdout.log') - else: - self.log_debug_err = os.devnull - self.log_debug_out = os.devnull - - def _make_log_dir(self, root_dir_path): - log_dir_path = os.path.join(root_dir_path, 'logs') - if os.path.exists(log_dir_path): - shutil.rmtree(log_dir_path) - os.makedirs(log_dir_path) - return log_dir_path - - def tree(self, base_dir, padding=' '): - """Write a graphical tree to the log for `base_dir`.""" - horizontal_divider = '#' * 80 + '\n' - with open(self.log_path, mode='a', encoding='utf-8') as lf: - lf.write(horizontal_divider) - base_dir_name = os.path.basename(os.path.abspath(base_dir)) - lf.write('{}+- {}/\n'.format(padding, base_dir_name)) - padding += '| ' - # TODO: make sure this gets all sub directories and use scandir() - files = sorted(os.listdir(base_dir)) - for f in files: - curpath = os.path.join(base_dir, f) - if os.path.islink(curpath): - lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath))) - elif os.path.isdir(curpath): - self.tree(curpath, padding) - elif os.path.isfile(curpath): - lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath))) - lf.write(horizontal_divider) - - def _computehash(self, path): - """Return a sha256 hash of a file at a given path.""" + @staticmethod + def computehash(path): + """Return the sha256 hash of a file at a given path.""" s = hashlib.sha256() with open(path, 'rb') as f: while True: @@ -341,23 +299,6 @@ class GroomerLogger(object): s.update(buf) return s.hexdigest() - def _write_file_to_disk(self, file_string): - with open(self.log_path, mode='a', encoding='utf-8') as lf: - lf.write(file_string) - - def add_file(self, file_props): - """Add a file to the log. Takes a dict of file properties.""" - props = file_props - description_string = ', '.join(props['description_string']) - file_string = " * {}: {}/{}, {}: {}\n".format( - props['filename'], - props['maintype'], - props['subtype'], - props['safety_category'], - description_string - ) - self._write_file_to_disk(file_string) - class KittenGroomerBase(object): """Base object responsible for copy/sanitization process.""" diff --git a/tests/test_kittengroomer.py b/tests/test_kittengroomer.py index 0ebef48..538314d 100644 --- a/tests/test_kittengroomer.py +++ b/tests/test_kittengroomer.py @@ -5,7 +5,7 @@ import os import pytest -from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger +from kittengroomer import FileBase, KittenGroomerBase skip = pytest.mark.skip xfail = pytest.mark.xfail @@ -216,12 +216,7 @@ class TestFileBase: class TestLogger: - @fixture - def generic_logger(self, tmpdir): - return GroomerLogger(tmpdir.strpath) - - def test_tree(self, generic_logger, tmpdir): - generic_logger.tree(tmpdir.strpath) + pass class TestKittenGroomerBase: