diff --git a/CHANGELOG.md b/CHANGELOG.md index df1a217..3a4c16e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,19 @@ Changelog ========= +2.2.0 (in progress) +--- +New features: +- Filecheck.py configuration information is now conveniently held in a Config +object instead of in globals +- New easier to read text-based logger (removed twiggy dependency) +- Various filetypes in filecheck.py now have improved descriptions for log +- Improved the interface for adding file descriptions to files + +Fixes: +- + + 2.1.0 --- diff --git a/bin/filecheck.py b/bin/filecheck.py index da7e00b..19cb93f 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -6,6 +6,7 @@ import shlex import subprocess import zipfile import argparse +import shutil import oletools.oleid import olefile @@ -13,10 +14,11 @@ import officedissector import warnings import exifread from PIL import Image +# TODO: why do we have this import? How does filecheck handle pngs? # from PIL import PngImagePlugin from pdfid import PDFiD, cPDFiD -from kittengroomer import FileBase, KittenGroomerBase +from kittengroomer import FileBase, KittenGroomerBase, Logging SEVENZ_PATH = '/usr/bin/7z' @@ -86,8 +88,10 @@ class Config: class File(FileBase): def __init__(self, src_path, dst_path, logger): - super(File, self).__init__(src_path, dst_path, logger) + super(File, self).__init__(src_path, dst_path) self.is_recursive = False + self.logger = logger + self.tempdir_path = self.dst_path + '_temp' subtypes_apps = [ (Config.mimes_office, self._winoffice), @@ -123,11 +127,11 @@ class File(FileBase): def _check_dangerous(self): if not self.has_mimetype: - self.make_dangerous('no mimetype') + self.make_dangerous('File has no mimetype') if not self.has_extension: - self.make_dangerous('no extension') + self.make_dangerous('File has no extension') if self.extension in Config.malicious_exts: - self.make_dangerous('malicious_extension') + self.make_dangerous('Extension identifies file as potentially dangerous') def _check_extension(self): """ @@ -147,8 +151,7 @@ class File(FileBase): expected_mimetype = Config.aliases[expected_mimetype] is_known_extension = self.extension in mimetypes.types_map.keys() if is_known_extension and expected_mimetype != self.mimetype: - # LOG: improve this string - self.make_dangerous('expected_mimetype') + self.make_dangerous('Mimetype does not match expected mimetype for this extension') def _check_mimetype(self): """ @@ -165,18 +168,17 @@ class File(FileBase): strict=False) if expected_extensions: if self.has_extension and self.extension not in expected_extensions: - # LOG: improve this string - self.make_dangerous('expected extensions') + self.make_dangerous('Extension does not match expected extensions for this mimetype') def _check_filename(self): if self.filename[0] is '.': - # handle dotfiles + # TODO: handle dotfiles here pass right_to_left_override = u"\u202E" if right_to_left_override in self.filename: self.make_dangerous('Filename contains dangerous character') self.dst_path = self.dst_path.replace(right_to_left_override, '') - # TODO: change self.filename and'filename' property? + # TODO: change self.filename and'filename' property? Or should those reflect the values on the source key def check(self): self._check_dangerous() @@ -188,6 +190,15 @@ class File(FileBase): if not self.is_dangerous: self.mime_processing_options.get(self.main_type, self.unknown)() + def write_log(self): + props = self.get_all_props() + if not self.is_recursive: + if os.path.exists(self.tempdir_path): + # Hack to make images appear at the correct tree depth in log + self.logger.add_file(self.src_path, props, in_tempdir=True) + return + self.logger.add_file(self.src_path, props) + # ##### Helper functions ##### def _make_method_dict(self, list_of_tuples): """Returns a dictionary with mimetype: method pairs.""" @@ -206,7 +217,6 @@ class File(FileBase): def make_tempdir(self): """Make a temporary directory at self.tempdir_path.""" - self.tempdir_path = self.dst_path + '_temp' if not os.path.exists(self.tempdir_path): os.makedirs(self.tempdir_path) return self.tempdir_path @@ -217,52 +227,50 @@ class File(FileBase): """Empty file or symlink.""" if self.is_symlink: symlink_path = self.get_property('symlink') - self.add_file_string('Symlink to {}'.format(symlink_path)) + self.add_description('File is a symlink to {}'.format(symlink_path)) else: - self.add_file_string('Inode file') + self.add_description('File is an inode (empty file)') self.should_copy = False def unknown(self): """Main type should never be unknown.""" - self.add_file_string('Unknown file') + self.add_description('Unknown mimetype') self.should_copy = False def example(self): """Used in examples, should never be returned by libmagic.""" - self.add_file_string('Example file') + self.add_description('Example file') self.should_copy = False def multipart(self): """Used in web apps, should never be returned by libmagic""" - self.add_file_string('Multipart file') + self.add_description('Multipart file - usually found in web apps') self.should_copy = False # ##### Treated as malicious, no reason to have it on a USB key ###### def message(self): """Process a message file.""" - self.add_file_string('Message file') - self.make_dangerous('Message file') + self.make_dangerous('Message file - should not be found on USB key') def model(self): """Process a model file.""" - self.add_file_string('Model file') - self.make_dangerous('Model file') + self.make_dangerous('Model file - should not be found on USB key') # ##### Files that will be converted ###### def text(self): """Process an rtf, ooxml, or plaintext file.""" for mt in Config.mimes_rtf: if mt in self.sub_type: - self.add_file_string('Rich Text file') + self.add_description('Rich Text (rtf) file') # TODO: need a way to convert it to plain text self.force_ext('.txt') return for mt in Config.mimes_ooxml: if mt in self.sub_type: - self.add_file_string('OOXML File') + self.add_description('OOXML (openoffice) file') self._ooxml() return - self.add_file_string('Text file') + self.add_description('Plain text file') self.force_ext('.txt') def application(self): @@ -272,103 +280,98 @@ class File(FileBase): # TODO: should we change the logic so we don't iterate through all of the subtype methods? # TODO: should these methods return a value? method() - self.add_file_string('Application file') return - self.add_file_string('Unknown Application file') self._unknown_app() def _executables(self): """Process an executable file.""" # LOG: change the processing_type property to some other name or include in file_string - self.set_property('processing_type', 'executable') - self.make_dangerous('executable') + self.make_dangerous('Executable file') def _winoffice(self): """Process a winoffice file using olefile/oletools.""" - # LOG: processing_type property - self.set_property('processing_type', 'WinOffice') oid = oletools.oleid.OleID(self.src_path) # First assume a valid file if not olefile.isOleFile(self.src_path): # Manual processing, may already count as suspicious try: ole = olefile.OleFileIO(self.src_path, raise_defects=olefile.DEFECT_INCORRECT) except: - self.make_dangerous('not parsable') + self.make_dangerous('Unparsable WinOffice file') if ole.parsing_issues: - self.make_dangerous('parsing issues') + self.make_dangerous('Parsing issues with WinOffice file') else: if ole.exists('macros/vba') or ole.exists('Macros') \ or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'): - self.make_dangerous('macro') + self.make_dangerous('WinOffice file containing a macro') else: indicators = oid.check() # Encrypted can be set by multiple checks on the script if oid.encrypted.value: - self.make_dangerous('encrypted') + self.make_dangerous('Encrypted WinOffice file') if oid.macros.value or oid.ole.exists('macros/vba') or oid.ole.exists('Macros') \ or oid.ole.exists('_VBA_PROJECT_CUR') or oid.ole.exists('VBA'): - self.make_dangerous('macro') + self.make_dangerous('WinOffice file containing a macro') for i in indicators: if i.id == 'ObjectPool' and i.value: - # TODO: Is it suspicious? + # TODO: is having an ObjectPool suspicious? # LOG: user defined property - self.set_property('objpool', True) + self.add_description('WinOffice file containing an object pool') elif i.id == 'flash' and i.value: - self.make_dangerous('flash') + self.make_dangerous('WinOffice file with embedded flash') + self.add_description('WinOffice file') def _ooxml(self): """Process an ooxml file.""" - # LOG: processing_type property - self.set_property('processing_type', 'ooxml') try: doc = officedissector.doc.Document(self.src_path) except Exception: - self.make_dangerous('invalid ooxml file') + self.make_dangerous('Invalid ooxml file') return # There are probably other potentially malicious features: # fonts, custom props, custom XML if doc.is_macro_enabled or len(doc.features.macros) > 0: - self.make_dangerous('macro') + self.make_dangerous('Ooxml file containing macro') if len(doc.features.embedded_controls) > 0: - self.make_dangerous('activex') + self.make_dangerous('Ooxml file with activex') if len(doc.features.embedded_objects) > 0: # Exploited by CVE-2014-4114 (OLE) - self.make_dangerous('embedded obj') + self.make_dangerous('Ooxml file with embedded objects') if len(doc.features.embedded_packages) > 0: - self.make_dangerous('embedded pack') + self.make_dangerous('Ooxml file with embedded packages') def _libreoffice(self): """Process a libreoffice file.""" - self.set_property('processing_type', 'libreoffice') # As long as there is no way to do a sanity check on the files => dangerous try: lodoc = zipfile.ZipFile(self.src_path, 'r') except: - # TODO: are there specific exceptions we should catch here? Or is anything ok - self.make_dangerous('invalid libreoffice file') + # TODO: are there specific exceptions we should catch here? Or should it be everything + self.make_dangerous('Invalid libreoffice file') for f in lodoc.infolist(): fname = f.filename.lower() if fname.startswith('script') or fname.startswith('basic') or \ fname.startswith('object') or fname.endswith('.bin'): - self.make_dangerous('macro') + self.make_dangerous('Libreoffice file containing executable code') + if not self.is_dangerous: + self.add_description('Libreoffice file') def _pdf(self): """Process a PDF file.""" - # LOG: processing_type property - self.set_property('processing_type', 'pdf') xmlDoc = PDFiD(self.src_path) oPDFiD = cPDFiD(xmlDoc, True) - # TODO: are there other characteristics which should be dangerous? + # TODO: are there other pdf characteristics which should be dangerous? if oPDFiD.encrypt.count > 0: - self.make_dangerous('encrypted pdf') + self.make_dangerous('Encrypted pdf') if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0: - self.make_dangerous('pdf with javascript') + self.make_dangerous('Pdf with embedded javascript') if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0: - self.make_dangerous('openaction') + self.make_dangerous('Pdf with openaction(s)') if oPDFiD.richmedia.count > 0: - self.make_dangerous('flash') + self.make_dangerous('Pdf containing flash') if oPDFiD.launch.count > 0: - self.make_dangerous('launch') + self.make_dangerous('Pdf with launch action(s)') + if not self.is_dangerous: + self.add_description('Pdf file') def _archive(self): """ @@ -378,24 +381,26 @@ class File(FileBase): is called on that directory. The recursive archive depth is increased to protect against archive bombs. """ - # LOG: change this to something archive specific - self.set_property('processing_type', 'archive') + # TODO: change this to something archive type specific instead of generic 'Archive' + self.add_description('Archive') self.should_copy = False self.is_recursive = True def _unknown_app(self): """Process an unknown file.""" + self.add_description('Unknown application file') self.make_unknown() def _binary_app(self): """Process an unknown binary file.""" + self.add_description('Unknown binary file') self.make_binary() ####################### # Metadata extractors def _metadata_exif(self, metadata_file_path): """Read exif metadata from a jpg or tiff file using exifread.""" - # TODO: this method is kind of long, can we shorten it somehow? + # TODO: can we shorten this method somehow? img = open(self.src_path, 'rb') tags = None try: @@ -419,7 +424,7 @@ class File(FileBase): tag_string = str(tag_value) with open(metadata_file_path, 'w+') as metadata_file: metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string)) - # LOG: how do we want to log metadata? + # TODO: how do we want to log metadata? self.set_property('metadata', 'exif') img.close() return True @@ -437,8 +442,7 @@ class File(FileBase): # LOG: handle metadata self.set_property('metadata', 'png') img.close() - # Catch decompression bombs - except Exception as e: + except Exception as e: # Catch decompression bombs # TODO: only catch DecompressionBombWarnings here? self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path)) self.make_dangerous('exception processing metadata') @@ -457,17 +461,17 @@ class File(FileBase): # ##### Media - audio and video aren't converted ###### def audio(self): """Process an audio file.""" - self.log_string += 'Audio file' + self.add_description('Audio file') self._media_processing() def video(self): """Process a video.""" - self.log_string += 'Video file' + self.add_description('Video file') self._media_processing() def _media_processing(self): """Generic way to process all media files.""" - self.set_property('processing_type', 'media') + self.add_description('Media file') def image(self): """ @@ -492,30 +496,113 @@ class File(FileBase): except Exception as e: # Catch decompression bombs # TODO: change this from all Exceptions to specific DecompressionBombWarning self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path)) - self.make_dangerous() - self.add_file_string('Image file') - self.set_property('processing_type', 'image') + self.make_dangerous('Image file containing decompression bomb') + if not self.is_dangerous: + self.add_description('Image file') + + +class GroomerLogger(object): + """Groomer logging interface.""" + + def __init__(self, src_root_path, dst_root_path, debug=False): + self._src_root_path = src_root_path + self._dst_root_path = dst_root_path + self._log_dir_path = self._make_log_dir(dst_root_path) + self.log_path = os.path.join(self._log_dir_path, 'circlean_log.txt') + self._add_root_dir(src_root_path) + if debug: + self.log_debug_err = os.path.join(self._log_dir_path, 'debug_stderr.log') + self.log_debug_out = os.path.join(self._log_dir_path, 'debug_stdout.log') + else: + self.log_debug_err = os.devnull + self.log_debug_out = os.devnull + + def _make_log_dir(self, root_dir_path): + """Make the directory in the dest dir that will hold the logs""" + log_dir_path = os.path.join(root_dir_path, 'logs') + if os.path.exists(log_dir_path): + shutil.rmtree(log_dir_path) + os.makedirs(log_dir_path) + return log_dir_path + + def _add_root_dir(self, root_path): + dirname = os.path.split(root_path)[1] + '/' + with open(self.log_path, mode='ab') as lf: + lf.write(bytes(dirname, 'utf-8')) + lf.write(b'\n') + + def add_file(self, file_path, file_props, in_tempdir=False): + """Add a file to the log. Takes a dict of file properties.""" + # TODO: fix var names in this method + # TODO: handle symlinks better: symlink_string = '{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)) + props = file_props + depth = self._get_path_depth(file_path) + description_string = ', '.join(props['description_string']) + file_hash = Logging.computehash(file_path)[:6] + if props['safety_category'] is None: + descr_cat = "Normal" + else: + descr_cat = props['safety_category'].capitalize() + # TODO: make size adjust to MB/GB for large files + size = str(props['file_size']) + 'B' + file_template = "+- {name} ({sha_hash}): {size}, {mt}/{st}. {desc}: {desc_str}" + file_string = file_template.format( + name=props['filename'], + sha_hash=file_hash, + size=size, + mt=props['maintype'], + st=props['subtype'], + desc=descr_cat, + desc_str=description_string, + # errs='' # TODO: add errors in human readable form here + ) + if in_tempdir: + depth -= 1 + self._write_line_to_log(file_string, depth) + + def add_dir(self, dir_path): + path_depth = self._get_path_depth(dir_path) + dirname = os.path.split(dir_path)[1] + '/' + log_line = '+- ' + dirname + self._write_line_to_log(log_line, path_depth) + + def _get_path_depth(self, path): + if self._dst_root_path in path: + base_path = self._dst_root_path + elif self._src_root_path in path: + base_path = self._src_root_path + relpath = os.path.relpath(path, base_path) + path_depth = relpath.count(os.path.sep) + return path_depth + + def _write_line_to_log(self, line, indentation_depth): + padding = b' ' + padding += b'| ' * indentation_depth + line_bytes = os.fsencode(line) + with open(self.log_path, mode='ab') as lf: + lf.write(padding) + lf.write(line_bytes) + lf.write(b'\n') class KittenGroomerFileCheck(KittenGroomerBase): def __init__(self, root_src, root_dst, max_recursive_depth=2, debug=False): - super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug) + super(KittenGroomerFileCheck, self).__init__(root_src, root_dst) self.recursive_archive_depth = 0 self.max_recursive_depth = max_recursive_depth + self.cur_file = None + self.logger = GroomerLogger(root_src, root_dst, debug) def process_dir(self, src_dir, dst_dir): """Process a directory on the source key.""" - self.logger.tree(src_dir) - for srcpath in self.list_all_files(src_dir): - dstpath = srcpath.replace(src_dir, dst_dir) - # TODO: Can we clean up the way we handle relative_path? - # Relative path is here so that when we print files in the log it - # shows only the file's path. Should we just pass it to the logger - # when we create it? Or let the logger figure it out? - # relative_path = srcpath.replace(src_dir + '/', '') - self.cur_file = File(srcpath, dstpath, self.logger) - self.process_file(self.cur_file) + for srcpath in self.list_files_dirs(src_dir): + if os.path.isdir(srcpath): + self.logger.add_dir(srcpath) + else: + dstpath = os.path.join(dst_dir, os.path.basename(srcpath)) + self.cur_file = File(srcpath, dstpath, self.logger) + self.process_file(self.cur_file) def process_file(self, file): """ @@ -525,12 +612,13 @@ class KittenGroomerFileCheck(KittenGroomerBase): the file to the destionation key, and clean up temporary directory. """ file.check() - if file.is_recursive: - self.process_archive(file) - elif file.should_copy: + if file.should_copy: file.safe_copy() file.set_property('copied', True) - file.write_log() + file.write_log() + if file.is_recursive: + self.process_archive(file) + # TODO: Can probably handle cleaning up the tempdir better if hasattr(file, 'tempdir_path'): self.safe_rmtree(file.tempdir_path) @@ -542,17 +630,17 @@ class KittenGroomerFileCheck(KittenGroomerBase): to an archive. """ self.recursive_archive_depth += 1 - # LOG: write_log or somehow log the archive file here if self.recursive_archive_depth >= self.max_recursive_depth: file.make_dangerous('Archive bomb') else: tempdir_path = file.make_tempdir() # TODO: double check we are properly escaping file.src_path - # otherwise we are running unvalidated user input directly in the shell + # otherwise we are running unsanitized user input directly in the shell command_str = '{} -p1 x "{}" -o"{}" -bd -aoa' unpack_command = command_str.format(SEVENZ_PATH, file.src_path, tempdir_path) self._run_process(unpack_command) + file.write_log() self.process_dir(tempdir_path, file.dst_path) self.safe_rmtree(tempdir_path) self.recursive_archive_depth -= 1 @@ -567,8 +655,19 @@ class KittenGroomerFileCheck(KittenGroomerBase): return return True + def list_files_dirs(self, root_dir_path): + queue = [] + for path in sorted(os.listdir(root_dir_path), key=lambda x: str.lower(x)): + full_path = os.path.join(root_dir_path, path) + if os.path.isdir(full_path): + queue.append(full_path) + queue += self.list_files_dirs(full_path) # if path is a dir, recurse through its contents + elif os.path.isfile(full_path): + queue.append(full_path) + return queue + def run(self): - self.process_dir(self.src_root_dir, self.dst_root_dir) + self.process_dir(self.src_root_path, self.dst_root_path) def main(kg_implementation, description): diff --git a/dev-requirements.txt b/dev-requirements.txt index 8c42eaa..8d2f7f4 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,3 @@ -twiggy python-magic pytest pytest-cov diff --git a/kittengroomer/__init__.py b/kittengroomer/__init__.py index 8428553..25a7965 100644 --- a/kittengroomer/__init__.py +++ b/kittengroomer/__init__.py @@ -1,4 +1,4 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from .helpers import FileBase, KittenGroomerBase, GroomerLogger, main +from .helpers import FileBase, KittenGroomerBase, Logging, main diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 318f1ca..64e7e51 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -3,8 +3,8 @@ """ Contains the base objects for use when creating a sanitizer using -PyCIRCLean. Subclass FileBase and KittenGroomerBase to implement your -desired behavior. +PyCIRCLean. Subclass or import from FileBase/KittenGroomerBase and implement +your desired behavior. """ @@ -14,7 +14,6 @@ import shutil import argparse import magic -import twiggy class KittenGroomerError(Exception): @@ -37,17 +36,15 @@ class FileBase(object): Contains file attributes and various helper methods. """ - def __init__(self, src_path, dst_path, logger=None): + def __init__(self, src_path, dst_path): """ Initialized with the source path and expected destination path. - self.logger should be a logging object with an add_file method. Create various properties and determine the file's mimetype. """ self.src_path = src_path self.dst_path = dst_path self.filename = os.path.basename(self.src_path) - self.logger = logger self._file_props = { 'filepath': self.src_path, 'filename': self.filename, @@ -58,7 +55,7 @@ class FileBase(object): 'safety_category': None, 'symlink': False, 'copied': False, - 'file_string_set': set(), + 'description_string': [], # array of descriptions to be joined 'errors': {}, 'user_defined': {} } @@ -90,9 +87,9 @@ class FileBase(object): else: try: mt = magic.from_file(self.src_path, mime=True) - # Note: magic will always return something, even if it's just 'data' + # Note: libmagic will always return something, even if it's just 'data' except UnicodeEncodeError as e: - # FIXME: The encoding of the file is broken (possibly UTF-16) + # FIXME: The encoding of the file that triggers this is broken (possibly it's UTF-16 and Python expects utf8) # Note: one of the Travis files will trigger this exception self.add_error(e, '') mt = None @@ -121,8 +118,6 @@ class FileBase(object): @property def has_mimetype(self): """True if file has a main and sub mimetype, else False.""" - # TODO: broken mimetype checks should be done somewhere else. - # Should the check be by default or should we let the API consumer write it? if not self.main_type or not self.sub_type: return False else: @@ -161,34 +156,47 @@ class FileBase(object): def set_property(self, prop_string, value): """ - Take a property and a value and add them to self._file_props. + Take a property and a value and add them to the file's property dict. - If prop_string is already in _file_props, set prop_string to value. - If prop_string not in _file_props, set prop_string to value in - _file_props['user_defined']. + If `prop_string` is part of the file property API, set it to `value`. + Otherwise, add `prop_string`: `value` to `user_defined` properties. """ - if prop_string in self._file_props.keys(): + if prop_string is 'description_string': + if prop_string not in self._file_props['description_string']: + self._file_props['description_string'].append(value) + elif prop_string in self._file_props.keys(): self._file_props[prop_string] = value else: self._file_props['user_defined'][prop_string] = value - def get_property(self, file_prop): - """Get the value for a property in _file_props.""" - # TODO: could probably be refactored - if file_prop in self._file_props: - return self._file_props[file_prop] - elif file_prop in self._file_props['user_defined']: - return self._file_props['user_defined'][file_prop] + def get_property(self, prop_string): + """ + Get the value for a property stored on the file. + + Returns `None` if `prop_string` cannot be found on the file. + """ + if prop_string in self._file_props: + return self._file_props[prop_string] + elif prop_string in self._file_props['user_defined']: + return self._file_props['user_defined'][prop_string] else: return None - def add_error(self, error, info): - """Add an error: info pair to _file_props['errors'].""" - self._file_props['errors'].update({error: info}) + def get_all_props(self): + """Return a dict containing all stored properties of this file.""" + return self._file_props - def add_file_string(self, file_string): - """Add a file descriptor string to _file_props.""" - self._file_props['file_string_set'].add(file_string) + def add_error(self, error, info_string): + """Add an `error`: `info_string` pair to the file.""" + self._file_props['errors'].update({error: info_string}) + + def add_description(self, description_string): + """ + Add a description string to the file. + + If `description_string` is already present, will prevent duplicates. + """ + self.set_property('description_string', description_string) def make_dangerous(self, reason_string=None): """ @@ -198,9 +206,10 @@ class FileBase(object): to help prevent double-click of death. """ if self.is_dangerous: + self.set_property('description_string', reason_string) return self.set_property('safety_category', 'dangerous') - # LOG: store reason string somewhere and do something with it + self.set_property('description_string', reason_string) path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename)) @@ -235,76 +244,50 @@ class FileBase(object): self.add_error(e, '') def force_ext(self, ext): - """If dst_path does not end in ext, change it and edit _file_props.""" + """If dst_path does not end in ext, append .ext to it.""" + ext = self._check_leading_dot(ext) if not self.dst_path.endswith(ext): - self.set_property('force_ext', True) + # LOG: do we want to log that the extension was changed as below? + # self.set_property('force_ext', True) self.dst_path += ext if not self._file_props['extension'] == ext: self.set_property('extension', ext) def create_metadata_file(self, ext): - """Create a separate file to hold metadata from this file.""" + """ + Create a separate file to hold extracted metadata. + + The string `ext` will be used as the extension for the metadata file. + """ + ext = self._check_leading_dot(ext) try: - # make sure we aren't overwriting anything if os.path.exists(self.src_path + ext): - raise KittenGroomerError("Cannot create split metadata file for \"" + - self.dst_path + "\", type '" + - ext + "': File exists.") + err_str = ("Could not create metadata file for \"" + + self.filename + + "\": a file with that path already exists.") + raise KittenGroomerError(err_str) else: dst_dir_path, filename = os.path.split(self.dst_path) if not os.path.exists(dst_dir_path): os.makedirs(dst_dir_path) - # TODO: Check extension for leading "." self.metadata_file_path = self.dst_path + ext return self.metadata_file_path except KittenGroomerError as e: self.add_error(e, '') return False - def write_log(self): - """Write logs from file to self.logger.""" - file_log = self.logger.add_file(self) - file_log.fields(**self._file_props) + def _check_leading_dot(self, ext): + if len(ext) > 0: + if not ext.startswith('.'): + return '.' + ext + return ext -class GroomerLogger(object): - """Groomer logging interface.""" +class Logging(object): - def __init__(self, root_dir_path, debug=False): - self.root_dir = root_dir_path - self.log_dir_path = os.path.join(root_dir_path, 'logs') - if os.path.exists(self.log_dir_path): - shutil.rmtree(self.log_dir_path) - os.makedirs(self.log_dir_path) - self.log_processing = os.path.join(self.log_dir_path, 'processing.log') - self.log_content = os.path.join(self.log_dir_path, 'content.log') - twiggy.quick_setup(file=self.log_processing) - self.log = twiggy.log.name('files') - if debug: - self.log_debug_err = os.path.join(self.log_dir_path, 'debug_stderr.log') - self.log_debug_out = os.path.join(self.log_dir_path, 'debug_stdout.log') - else: - self.log_debug_err = os.devnull - self.log_debug_out = os.devnull - - def tree(self, base_dir, padding=' '): - """Write a graphical tree to the log for `base_dir`.""" - with open(self.log_content, 'ab') as lf: - lf.write(bytes('#' * 80 + '\n', 'UTF-8')) - lf.write(bytes('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)).encode()), 'utf8')) - padding += '| ' - files = sorted(os.listdir(base_dir)) - for f in files: - curpath = os.path.join(base_dir, f) - if os.path.islink(curpath): - lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)).encode(errors='ignore')) - elif os.path.isdir(curpath): - self.tree(curpath, padding) - elif os.path.isfile(curpath): - lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)).encode(errors='ignore')) - - def _computehash(self, path): - """Return a sha256 hash of a file at a given path.""" + @staticmethod + def computehash(path): + """Return the sha256 hash of a file at a given path.""" s = hashlib.sha256() with open(path, 'rb') as f: while True: @@ -314,53 +297,47 @@ class GroomerLogger(object): s.update(buf) return s.hexdigest() - def add_file(self, file): - """Add a file to the log.""" - return self.log.name('file.src_path') - class KittenGroomerBase(object): """Base object responsible for copy/sanitization process.""" - def __init__(self, root_src, root_dst, debug=False): + def __init__(self, src_root_path, dst_root_path): """Initialized with path to source and dest directories.""" - self.src_root_dir = root_src - self.dst_root_dir = root_dst - self.debug = debug - self.cur_file = None - self.logger = GroomerLogger(self.dst_root_dir, debug) + self.src_root_path = os.path.abspath(src_root_path) + self.dst_root_path = os.path.abspath(dst_root_path) - def safe_rmtree(self, directory): + def safe_rmtree(self, directory_path): """Remove a directory tree if it exists.""" - if os.path.exists(directory): - shutil.rmtree(directory) + if os.path.exists(directory_path): + shutil.rmtree(directory_path) - def safe_remove(self, filepath): - """Remove a file if it exists.""" - if os.path.exists(filepath): - os.remove(filepath) + def safe_remove(self, file_path): + """Remove file at file_path if it exists.""" + if os.path.exists(file_path): + os.remove(file_path) - def safe_mkdir(self, directory): + def safe_mkdir(self, directory_path): """Make a directory if it does not exist.""" - if not os.path.exists(directory): - os.makedirs(directory) + if not os.path.exists(directory_path): + os.makedirs(directory_path) - def list_all_files(self, directory): + def list_all_files(self, directory_path): """Generator yielding path to all of the files in a directory tree.""" - for root, dirs, files in os.walk(directory): + for root, dirs, files in os.walk(directory_path): + # files is a list anyway so we don't get much from using a generator here for filename in files: filepath = os.path.join(root, filename) yield filepath ####################### - # TODO: feels like this function doesn't need to exist if we move main() + # TODO: if we move main() we can get rid of this as well def processdir(self, src_dir, dst_dir): """Implement this function to define file processing behavior.""" raise ImplementationRequired('Please implement processdir.') -# TODO: Maybe this shouldn't exist? It should probably get moved to filecheck since this isn't really API code +# TODO: Should this get moved to filecheck? It isn't really API code and somebody can implement it themselves def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'): parser = argparse.ArgumentParser(prog='KittenGroomer', description=description) parser.add_argument('-s', '--source', type=str, help='Source directory') diff --git a/setup.py b/setup.py index 4397c5f..21b03f9 100644 --- a/setup.py +++ b/setup.py @@ -23,5 +23,5 @@ setup( 'Topic :: Communications :: File Sharing', 'Topic :: Security', ], - install_requires=['twiggy', 'python-magic'], + install_requires=['python-magic'], ) diff --git a/tests/logging.py b/tests/logging.py index c937a71..e705231 100644 --- a/tests/logging.py +++ b/tests/logging.py @@ -1,22 +1,28 @@ import os +from datetime import datetime def save_logs(groomer, test_description): divider = ('=' * 10 + '{}' + '=' * 10 + '\n') test_log_path = 'tests/test_logs/{}.log'.format(test_description) - with open(test_log_path, 'w+') as test_log: - test_log.write(divider.format('TEST LOG')) - with open(groomer.logger.log_processing, 'r') as logfile: + time_now = str(datetime.now().time()) + '\n' + with open(test_log_path, 'wb+') as test_log: + log_header = divider.format('TEST LOG') + test_log.write(bytes(log_header, encoding='utf-8')) + test_log.write(bytes(time_now, encoding='utf-8')) + test_log.write(bytes(test_description, encoding='utf-8')) + test_log.write(b'\n') + test_log.write(b'-' * 20 + b'\n') + with open(groomer.logger.log_path, 'rb') as logfile: log = logfile.read() test_log.write(log) - if groomer.debug: - if os.path.exists(groomer.logger.log_debug_err): - test_log.write(divider.format('ERR LOG')) - with open(groomer.logger.log_debug_err, 'r') as debug_err: - err = debug_err.read() - test_log.write(err) - if os.path.exists(groomer.logger.log_debug_out): - test_log.write(divider.format('OUT LOG')) - with open(groomer.logger.log_debug_out, 'r') as debug_out: - out = debug_out.read() - test_log.write(out) + if os.path.exists(groomer.logger.log_debug_err): + test_log.write(bytes(divider.format('ERR LOG'), encoding='utf-8')) + with open(groomer.logger.log_debug_err, 'rb') as debug_err: + err = debug_err.read() + test_log.write(err) + if os.path.exists(groomer.logger.log_debug_out): + test_log.write(bytes(divider.format('OUT LOG'), encoding='utf-8')) + with open(groomer.logger.log_debug_out, 'rb') as debug_out: + out = debug_out.read() + test_log.write(out) diff --git a/tests/src_valid/dir1/dir2/blah.conf b/tests/src_valid/dir1/dir2/blah.conf new file mode 100644 index 0000000..484ba93 --- /dev/null +++ b/tests/src_valid/dir1/dir2/blah.conf @@ -0,0 +1 @@ +This is a test. diff --git a/tests/src_valid/test.zip b/tests/src_valid/test.zip new file mode 100644 index 0000000..4b911c0 Binary files /dev/null and b/tests/src_valid/test.zip differ diff --git a/tests/test_filecheck.py b/tests/test_filecheck.py index f58152d..e7aeaac 100644 --- a/tests/test_filecheck.py +++ b/tests/test_filecheck.py @@ -13,44 +13,26 @@ try: except ImportError: NODEPS = True +fixture = pytest.fixture +skip = pytest.mark.skip skipif_nodeps = pytest.mark.skipif(NODEPS, reason="Dependencies aren't installed") @skipif_nodeps -class TestIntegration: +class TestSystem: - @pytest.fixture - def src_valid_path(self): - return os.path.join(os.getcwd(), 'tests/src_valid') + @fixture + def valid_groomer(self): + src_path = os.path.join(os.getcwd(), 'tests/src_valid') + dst_path = self.make_dst_dir_path(src_path) + return KittenGroomerFileCheck(src_path, dst_path, debug=True) - @pytest.fixture - def src_invalid_path(self): - return os.path.join(os.getcwd(), 'tests/src_invalid') - - @pytest.fixture - def dst(self): - return os.path.join(os.getcwd(), 'tests/dst') - - def test_filecheck_src_invalid(self, src_invalid_path): - dst_path = self.make_dst_dir_path(src_invalid_path) - groomer = KittenGroomerFileCheck(src_invalid_path, dst_path, debug=True) - groomer.run() - test_description = "filecheck_invalid" - save_logs(groomer, test_description) - - def test_filecheck_2(self, src_valid_path): - dst_path = self.make_dst_dir_path(src_valid_path) - groomer = KittenGroomerFileCheck(src_valid_path, dst_path, debug=True) - groomer.run() - test_description = "filecheck_valid" - save_logs(groomer, test_description) - - def test_processdir(self): - pass - - def test_handle_archives(self): - pass + @fixture + def invalid_groomer(self): + src_path = os.path.join(os.getcwd(), 'tests/src_invalid') + dst_path = self.make_dst_dir_path(src_path) + return KittenGroomerFileCheck(src_path, dst_path, debug=True) def make_dst_dir_path(self, src_dir_path): dst_path = src_dir_path + '_dst' @@ -58,6 +40,16 @@ class TestIntegration: os.makedirs(dst_path, exist_ok=True) return dst_path + def test_filecheck_src_valid(self, valid_groomer): + valid_groomer.run() + test_description = "filecheck_valid" + save_logs(valid_groomer, test_description) + + def test_filecheck_src_invalid(self, invalid_groomer): + invalid_groomer.run() + test_description = "filecheck_invalid" + save_logs(invalid_groomer, test_description) + class TestFileHandling: def test_autorun(self): diff --git a/tests/test_kittengroomer.py b/tests/test_kittengroomer.py index 940136d..538314d 100644 --- a/tests/test_kittengroomer.py +++ b/tests/test_kittengroomer.py @@ -5,8 +5,7 @@ import os import pytest -from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger -from kittengroomer.helpers import ImplementationRequired +from kittengroomer import FileBase, KittenGroomerBase skip = pytest.mark.skip xfail = pytest.mark.xfail @@ -190,7 +189,6 @@ class TestFileBase: assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf' generic_conf_file.force_ext('.txt') assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt' - assert generic_conf_file.get_property('force_ext') is True assert generic_conf_file.get_property('extension') == '.txt' # should be able to handle weird paths @@ -203,7 +201,6 @@ class TestFileBase: # shouldn't change a file's extension if it already is right def test_create_metadata_file(self, temp_file): - # Try making a metadata file metadata_file_path = temp_file.create_metadata_file('.metadata.txt') with open(metadata_file_path, 'w+') as metadata_file: metadata_file.write('Have some metadata!') @@ -219,12 +216,7 @@ class TestFileBase: class TestLogger: - @fixture - def generic_logger(self, tmpdir): - return GroomerLogger(tmpdir.strpath) - - def test_tree(self, generic_logger): - generic_logger.tree(generic_logger.root_dir) + pass class TestKittenGroomerBase: @@ -245,10 +237,7 @@ class TestKittenGroomerBase: assert generic_groomer def test_instantiation(self, source_directory, dest_directory): - groomer = KittenGroomerBase(source_directory, dest_directory) - debug_groomer = KittenGroomerBase(source_directory, - dest_directory, - debug=True) + KittenGroomerBase(source_directory, dest_directory) def test_list_all_files(self, tmpdir): file = tmpdir.join('test.txt') @@ -256,6 +245,6 @@ class TestKittenGroomerBase: testdir = tmpdir.join('testdir') os.mkdir(testdir.strpath) simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) - files = simple_groomer.list_all_files(simple_groomer.src_root_dir) + files = simple_groomer.list_all_files(simple_groomer.src_root_path) assert file.strpath in files assert testdir.strpath not in files