diff --git a/bin/filecheck.py b/bin/filecheck.py index a0c4f6f..47f9c02 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -120,13 +120,12 @@ class File(FileBase): } def _check_dangerous(self): - if not self.has_mimetype(): # No mimetype, should not happen. - self.make_dangerous() + if not self.has_mimetype(): + self.make_dangerous('no mimetype') if not self.has_extension(): - self.make_dangerous() + self.make_dangerous('no extension') if self.extension in Config.malicious_exts: - self.set_property('malicious_extension', self.extension) - self.make_dangerous() + self.make_dangerous('malicious_extension') def _check_extension(self): """Guesses the file's mimetype based on its extension. If the file's @@ -143,8 +142,8 @@ class File(FileBase): expected_mimetype = Config.aliases[expected_mimetype] is_known_extension = self.extension in mimetypes.types_map.keys() if is_known_extension and expected_mimetype != self.mimetype: - self.set_property('expected_mimetype', expected_mimetype) - self.make_dangerous() + # LOG: improve this string + self.make_dangerous('expected_mimetyped') def _check_mimetype(self): """Takes the mimetype (as determined by libmagic) and determines @@ -158,8 +157,8 @@ class File(FileBase): strict=False) if expected_extensions: if self.has_extension() and self.extension not in expected_extensions: - self.set_property('expected_extensions', expected_extensions) - self.make_dangerous() + # LOG: improve this string + self.make_dangerous('expected extensions') def check(self): self._check_dangerous() @@ -179,11 +178,11 @@ class File(FileBase): def write_log(self): """Print the logs related to the current file being processed.""" - # LOG: move to helpers.py and change + # LOG: move to helpers.py GroomerLogger and modify tmp_log = self.logger.log.fields(**self._file_props) if self.is_dangerous(): tmp_log.warning(self.log_string) - elif self.get_property('unknown') or self.get_property('binary'): + elif self.is_unknown() or self.is_binary(): tmp_log.info(self.log_string) else: tmp_log.debug(self.log_string) @@ -206,67 +205,71 @@ class File(FileBase): def inode(self): """Empty file or symlink.""" if self.is_symlink(): - self.log_string += 'Symlink to {}'.format(self.get_property('symlink')) + symlink_path = self.get_property('symlink') + self.add_file_string('Symlink to {}'.format(symlink_path)) else: - self.log_string += 'Inode file' + self.add_file_string('Inode file') def unknown(self): """Main type should never be unknown.""" - self.log_string += 'Unknown file' + self.add_file_string('Unknown file') def example(self): """Used in examples, should never be returned by libmagic.""" - self.log_string += 'Example file' + self.add_file_string('Example file') def multipart(self): """Used in web apps, should never be returned by libmagic""" - self.log_string += 'Multipart file' + self.add_file_string('Multipart file') # ##### Treated as malicious, no reason to have it on a USB key ###### def message(self): """Process a message file.""" - self.log_string += 'Message file' - self.make_dangerous() + self.add_file_string('Message file') + self.make_dangerous('Message file') def model(self): """Process a model file.""" - self.log_string += 'Model file' - self.make_dangerous() + self.add_file_string('Model file') + self.make_dangerous('Model file') # ##### Files that will be converted ###### def text(self): """Process an rtf, ooxml, or plaintext file.""" for mt in Config.mimes_rtf: if mt in self.sub_type: - self.log_string += 'Rich Text file' + self.add_file_string('Rich Text file') # TODO: need a way to convert it to plain text self.force_ext('.txt') return for mt in Config.mimes_ooxml: if mt in self.sub_type: - self.log_string += 'OOXML File' + self.add_file_string('OOXML File') self._ooxml() return - self.log_string += 'Text file' + self.add_file_string('Text file') self.force_ext('.txt') def application(self): """Processes an application specific file according to its subtype.""" for subtype, method in self.app_subtype_methods.items(): if subtype in self.sub_type: + # TODO: should this return a value? method() - self.log_string += 'Application file' + self.add_file_string('Application file') return - self.log_string += 'Unknown Application file' + self.add_file_string('Unknown Application file') self._unknown_app() def _executables(self): """Processes an executable file.""" + # LOG: change this property self.set_property('processing_type', 'executable') - self.make_dangerous() + self.make_dangerous('executable') def _winoffice(self): """Processes a winoffice file using olefile/oletools.""" + # LOG: change this property self.set_property('processing_type', 'WinOffice') # Try as if it is a valid document oid = oletools.oleid.OleID(self.src_path) @@ -275,33 +278,27 @@ class File(FileBase): try: ole = olefile.OleFileIO(self.src_path, raise_defects=olefile.DEFECT_INCORRECT) except: - self.set_property('not_parsable', True) - self.make_dangerous() + self.make_dangerous('not parsable') if ole.parsing_issues: - self.set_property('parsing_issues', True) - self.make_dangerous() + self.make_dangerous('parsing issues') else: if ole.exists('macros/vba') or ole.exists('Macros') \ or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'): - self.set_property('macro', True) - self.make_dangerous() + self.make_dangerous('macro') else: indicators = oid.check() - # Encrypted ban be set by multiple checks on the script + # Encrypted can be set by multiple checks on the script if oid.encrypted.value: - self.set_property('encrypted', True) - self.make_dangerous() + self.make_dangerous('encrypted') if oid.macros.value or oid.ole.exists('macros/vba') or oid.ole.exists('Macros') \ or oid.ole.exists('_VBA_PROJECT_CUR') or oid.ole.exists('VBA'): - self.set_property('macro', True) - self.make_dangerous() + self.make_dangerous('macro') for i in indicators: if i.id == 'ObjectPool' and i.value: - # FIXME: Is it suspicious? + # TODO: Is it suspicious? self.set_property('objpool', True) elif i.id == 'flash' and i.value: - self.set_property('flash', True) - self.make_dangerous() + self.make_dangerous('flash') def _ooxml(self): """Processes an ooxml file.""" @@ -309,24 +306,19 @@ class File(FileBase): try: doc = officedissector.doc.Document(self.src_path) except Exception: - # Invalid file - self.make_dangerous() + self.make_dangerous('invalid ooxml file') return # There are probably other potentially malicious features: # fonts, custom props, custom XML if doc.is_macro_enabled or len(doc.features.macros) > 0: - self.set_property('macro', True) - self.make_dangerous() + self.make_dangerous('macro') if len(doc.features.embedded_controls) > 0: - self.set_property('activex', True) - self.make_dangerous() + self.make_dangerous('activex') if len(doc.features.embedded_objects) > 0: # Exploited by CVE-2014-4114 (OLE) - self.set_property('embedded_obj', True) - self.make_dangerous() + self.make_dangerous('embedded obj') if len(doc.features.embedded_packages) > 0: - self.set_property('embedded_pack', True) - self.make_dangerous() + self.make_dangerous('embedded pack') def _libreoffice(self): """Processes a libreoffice file.""" @@ -336,14 +328,12 @@ class File(FileBase): lodoc = zipfile.ZipFile(self.src_path, 'r') except: # TODO: are there specific exceptions we should catch here? Or is anything ok - self.set_property('invalid', True) - self.make_dangerous() + self.make_dangerous('invalid libreoffice file') for f in lodoc.infolist(): fname = f.filename.lower() if fname.startswith('script') or fname.startswith('basic') or \ fname.startswith('object') or fname.endswith('.bin'): - self.set_property('macro', True) - self.make_dangerous() + self.make_dangerous('macro') def _pdf(self): """Processes a PDF file.""" @@ -352,20 +342,15 @@ class File(FileBase): oPDFiD = cPDFiD(xmlDoc, True) # TODO: are there other characteristics which should be dangerous? if oPDFiD.encrypt.count > 0: - self.set_property('encrypted', True) - self.make_dangerous() + self.make_dangerous('encrypted pdf') if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0: - self.set_property('javascript', True) - self.make_dangerous() + self.make_dangerous('pdf with javascript') if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0: - self.set_property('openaction', True) - self.make_dangerous() + self.make_dangerous('openaction') if oPDFiD.richmedia.count > 0: - self.set_property('flash', True) - self.make_dangerous() + self.make_dangerous('flash') if oPDFiD.launch.count > 0: - self.set_property('launch', True) - self.make_dangerous() + self.make_dangerous('launch') def _archive(self): """Processes an archive using 7zip. The archive is extracted to a @@ -394,16 +379,12 @@ class File(FileBase): try: tags = exifread.process_file(img, debug=True) except Exception as e: - # LOG: log instead of print - print("Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path)) - print(e) + self.add_error(e, "Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path)) if tags is None: try: tags = exifread.process_file(img, debug=True) except Exception as e: - # LOG: log instead of print - print("Failed to get any metadata for file {}.".format(self.src_path)) - print(e) + self.add_error(e, "Failed to get any metadata for file {}.".format(self.src_path)) img.close() return False @@ -436,10 +417,8 @@ class File(FileBase): img.close() # Catch decompression bombs except Exception as e: - # LOG: log instead of print - print("Caught exception processing metadata for {}".format(self.src_path)) - print(e) - self.make_dangerous() + self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path)) + self.make_dangerous('exception processing metadata') return False def extract_metadata(self): @@ -485,11 +464,9 @@ class File(FileBase): self.src_path = tempfile_path except Exception as e: # Catch decompression bombs # TODO: change this from all Exceptions to specific DecompressionBombWarning - # LOG: change this from printing to logging - print("Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path)) - print(e) + self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path)) self.make_dangerous() - self.log_string += 'Image file' + self.add_file_string('Image file') self.set_property('processing_type', 'image') @@ -521,11 +498,10 @@ class KittenGroomerFileCheck(KittenGroomerBase): file.check() if file.is_recursive: self.process_archive(file) - else: - # TODO: Make a File attribute for should be copied True/False and check it - self.safe_copy() + elif file.should_copy: + self.safe_copy(file.src_path, file.dst_path) file.write_log() - if hasattr(file, "tempdir_path"): + if hasattr(file, 'tempdir_path'): self.safe_rmtree(file.tempdir_path) def process_archive(self, file): @@ -550,8 +526,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): self.recursive_archive_depth -= 1 def _handle_archivebomb(self, file): - file.make_dangerous() - file.set_property('Archive Bomb', True) + file.make_dangerous('Archive bomb') self.logger.log.warning('ARCHIVE BOMB.') self.logger.log.warning('The content of the archive contains recursively other archives.') self.logger.log.warning('This is a bad sign so the archive is not extracted to the destination key.') diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index cf9e098..cef4c8d 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -44,15 +44,26 @@ class FileBase(object): self.filename = os.path.basename(self.src_path) self.log_string = '' self.logger = logger + self.symlink = None self.extension = self._determine_extension() self.mimetype = self._determine_mimetype() + self.should_copy = True if self.mimetype: self.main_type, self.sub_type = self._split_subtypes(self.mimetype) - self._file_props = {'filepath': self.src_path, - 'filename': self.filename, - 'maintype': self.main_type, - 'subtype': self.sub_type, - 'extension': self.extension} + self._file_props = { + 'filepath': self.src_path, + 'filename': self.filename, + 'maintype': self.main_type, + 'subtype': self.sub_type, + 'extension': self.extension, + 'file_size': self.size, + 'safety_category': None, + 'symlink': self.symlink, + 'copied': False, + 'file_string_set': set(), + 'errors': {}, + 'user_defined': {} + } def _determine_extension(self): _, ext = os.path.splitext(self.src_path) @@ -65,13 +76,14 @@ class FileBase(object): if os.path.islink(self.src_path): # magic will throw an IOError on a broken symlink mimetype = 'inode/symlink' + self.symlink = os.readlink(self.src_path) else: try: mt = magic.from_file(self.src_path, mime=True) # Note: magic will always return something, even if it's just 'data' except UnicodeEncodeError as e: # FIXME: The encoding of the file is broken (possibly UTF-16) - # LOG: log this error + self.add_error(e, '') mt = None try: mimetype = mt.decode("utf-8") @@ -86,10 +98,14 @@ class FileBase(object): main_type, sub_type = None, None return main_type, sub_type + @property + def size(self): + return os.path.getsize(self.src_path) + def has_mimetype(self): """Returns True if file has a full mimetype, else False.""" if not self.main_type or not self.sub_type: - # LOG: log this as an error + # LOG: log this as an error, move somewhere else? # self._file_props.update({'broken_mime': True}) return False else: @@ -98,7 +114,7 @@ class FileBase(object): def has_extension(self): """Returns True if self.extension is set, else False.""" if self.extension is None: - # TODO: move this props updating to some other method + # TODO: do we actually want to have a seperate prop for no extension? # self.set_property('no_extension', True) return False else: @@ -106,33 +122,46 @@ class FileBase(object): def is_dangerous(self): """True if file has been marked 'dangerous' else False.""" - return ('dangerous' in self._file_props) + return self._file_props['safety_category'] is 'dangerous' def is_unknown(self): """True if file has been marked 'unknown' else False.""" - return ('unknown' in self._file_props) + return self._file_props['safety_category'] is 'unknown' def is_binary(self): """True if file has been marked 'binary' else False.""" - return ('binary' in self._file_props) + return self._file_props['safety_category'] is 'binary' def is_symlink(self): """Returns True and updates log if file is a symlink.""" - if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink': - # TODO: move this props updating somewhere else - # self.set_property('symlink', os.readlink(self.src_path)) - return True - else: + if self._file_props['symlink'] is None: return False + else: + return True def set_property(self, prop_string, value): """Takes a property + a value and adds them to self._file_props.""" - self._file_props[prop_string] = value + if prop_string in self._file_props.keys(): + self._file_props[prop_string] = value + else: + self._file_props['user_defined'][prop_string] = value def get_property(self, file_prop): - return self._file_props.get(file_prop, None) + # FIXME: needs to be refactored + if file_prop in self._file_props: + return self._file_props[file_prop] + elif file_prop in self._file_props['user_defined']: + return self._file_props['user_defined'][file_prop] + else: + return None - def make_dangerous(self): + def add_error(self, error, info): + self._file_props['errors'].update({error: info}) + + def add_file_string(self, file_string): + self._file_props['file_string_set'].add(file_string) + + def make_dangerous(self, reason_string=None): """ Marks a file as dangerous. @@ -141,7 +170,8 @@ class FileBase(object): """ if self.is_dangerous(): return - self.set_property('dangerous', True) + self.set_property('safety_category', 'dangerous') + # LOG: store reason string path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename)) @@ -149,7 +179,7 @@ class FileBase(object): """Marks a file as an unknown type and prepends UNKNOWN to filename.""" if self.is_dangerous() or self.is_binary(): return - self.set_property('unknown', True) + self.set_property('safety_category', 'unknown') path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename)) @@ -157,7 +187,7 @@ class FileBase(object): """Marks a file as a binary and appends .bin to filename.""" if self.is_dangerous(): return - self.set_property('binary', True) + self.set_property('safety_category', 'binary') path, filename = os.path.split(self.dst_path) self.dst_path = os.path.join(path, '{}.bin'.format(filename)) @@ -185,7 +215,7 @@ class FileBase(object): self.metadata_file_path = self.dst_path + ext return self.metadata_file_path except KittenGroomerError as e: - # LOG: Write to log file + self.add_error(e, '') return False @@ -281,11 +311,8 @@ class KittenGroomerBase(object): dst_path, filename = os.path.split(dst) self.safe_mkdir(dst_path) shutil.copy(src, dst) - return True except Exception as e: - # LOG: Logfile - print(e) - return False + self.add_error(e, '') def list_all_files(self, directory): """Generator yielding path to all of the files in a directory tree.""" diff --git a/tests/test_filecheck.py b/tests/test_filecheck.py index b573a64..f187523 100644 --- a/tests/test_filecheck.py +++ b/tests/test_filecheck.py @@ -43,6 +43,9 @@ class TestIntegration: test_description = "filecheck_valid" save_logs(groomer, test_description) + def test_processdir(self): + pass + class TestFileHandling: def test_autorun(self): diff --git a/tests/test_kittengroomer.py b/tests/test_kittengroomer.py index 5866aaf..41f9a40 100644 --- a/tests/test_kittengroomer.py +++ b/tests/test_kittengroomer.py @@ -30,7 +30,7 @@ class TestFileBase: return FileBase(source_file, dest_file) @fixture - def symlink(self, tmpdir): + def symlink_file(self, tmpdir): file_path = tmpdir.join('test.txt') file_path.write('testing') file_path = file_path.strpath @@ -65,7 +65,7 @@ class TestFileBase: @fixture def file_marked_binary(self, generic_conf_file): - generic_conf_file.mark_binary() + generic_conf_file.make_binary() return generic_conf_file @fixture(params=[ @@ -81,24 +81,17 @@ class TestFileBase: # What should FileBase do if it's given a path that isn't a file (doesn't exist or is a dir)? Currently magic throws an exception # We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log) - def test_create(self): - file = FileBase('tests/src_valid/blah.conf', '/tests/dst/blah.conf') - def test_create_broken(self, tmpdir): with pytest.raises(TypeError): - file_no_args = FileBase() + FileBase() with pytest.raises(FileNotFoundError): - file_empty_args = FileBase('', '') + FileBase('', '') with pytest.raises(IsADirectoryError): - file_directory = FileBase(tmpdir.strpath, tmpdir.strpath) - # are there other cases here? path to a file that doesn't exist? permissions? + FileBase(tmpdir.strpath, tmpdir.strpath) + # TODO: are there other cases here? path to a file that doesn't exist? permissions? def test_init(self, generic_conf_file): - file = generic_conf_file - assert file.get_property('filepath') == file.src_path - assert file.get_property('extension') == '.conf' - # TODO: should we make log_details undeletable? - # TODO: we should probably check for more extensions here + generic_conf_file def test_extension_uppercase(self, tmpdir): file_path = tmpdir.join('TEST.TXT') @@ -108,10 +101,10 @@ class TestFileBase: assert file.extension == '.txt' def test_mimetypes(self, generic_conf_file): - assert generic_conf_file.has_mimetype() assert generic_conf_file.mimetype == 'text/plain' assert generic_conf_file.main_type == 'text' assert generic_conf_file.sub_type == 'plain' + assert generic_conf_file.has_mimetype() # Need to test something without a mimetype # Need to test something that's a directory # Need to test something that causes the unicode exception @@ -151,14 +144,14 @@ class TestFileBase: file_path = file_path.strpath symlink_path = tmpdir.join('symlinked.txt') symlink_path = symlink_path.strpath - file_symlink = os.symlink(file_path, symlink_path) + os.symlink(file_path, symlink_path) file = FileBase(file_path, file_path) symlink = FileBase(symlink_path, symlink_path) assert file.is_symlink() is False assert symlink.is_symlink() is True - def test_has_symlink_fixture(self, symlink): - assert symlink.is_symlink() is True + def test_has_symlink_fixture(self, symlink_file): + assert symlink_file.is_symlink() is True def test_generic_make_unknown(self, generic_conf_file): assert generic_conf_file.is_unknown() is False @@ -187,11 +180,9 @@ class TestFileBase: if file.is_dangerous(): file.make_binary() assert file.is_binary() is False - assert file.get_property('binary') is None else: file.make_binary() assert file.is_binary() - assert file.get_property('binary') is True def test_force_ext_change(self, generic_conf_file): assert generic_conf_file.has_extension() @@ -254,7 +245,6 @@ class TestKittenGroomerBase: debug_groomer = KittenGroomerBase(source_directory, dest_directory, debug=True) - # we should maybe protect access to self.current_file in some way? def test_safe_copy(self, tmpdir): file = tmpdir.join('test.txt') @@ -264,8 +254,8 @@ class TestKittenGroomerBase: filedest = testdir.join('test.txt') simple_groomer = KittenGroomerBase(tmpdir.strpath, testdir.strpath) simple_groomer.cur_file = FileBase(file.strpath, filedest.strpath) - assert simple_groomer.safe_copy() is True - #check that it handles weird file path inputs + simple_groomer.safe_copy() + # check that safe copy can handle weird file path inputs def test_list_all_files(self, tmpdir): file = tmpdir.join('test.txt') @@ -276,7 +266,3 @@ class TestKittenGroomerBase: files = simple_groomer.list_all_files(simple_groomer.src_root_dir) assert file.strpath in files assert testdir.strpath not in files - - def test_processdir(self, generic_groomer): - with pytest.raises(ImplementationRequired): - generic_groomer.processdir(None, None)