diff --git a/.travis.yml b/.travis.yml index b3d88a4..fc6e793 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,14 @@ language: python python: - - "2.7_with_system_site_packages" - - "3.3" - - "3.4" - - "3.5" - - "nightly" + - 2.7 + - 3.3 + - 3.4 + - 3.5 + - nightly sudo: required +# do we need sudo? should double check dist: trusty @@ -44,18 +45,16 @@ install: - sudo apt-get install libxml2-dev libxslt1-dev - wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip - unzip pdfid_v0_2_1.zip + - pip install -U pip + - pip install lxml exifread pillow + - pip install git+https://github.com/Rafiot/officedissector.git - | - if [[ "$TRAVIS_PYTHON_VERSION" == "2.7_with_system_site_packages" ]]; then - sudo pip install -U pip lxml exifread pillow - sudo pip install -U git+https://github.com/Rafiot/officedissector.git - sudo pip install -U oletools olefile coveralls codecov pytest-cov - else - pip install -U pip lxml exifread pillow - pip install -U git+https://github.com/Rafiot/officedissector.git - pip install -U coveralls codecov pytest-cov + if [[ "$TRAVIS_PYTHON_VERSION" == 2* ]]; then + pip install -U oletools olefile fi # Module dependencies - pip install -r dev-requirements.txt + - pip install coveralls codecov # Testing dependencies - sudo apt-get install rar # Prepare tests @@ -65,18 +64,18 @@ install: - python unpackall.py - popd - mv theZoo/malwares/Binaries/out tests/src_complex/ - # path traversal - # - hg clone https://bitbucket.org/jwilk/path-traversal-samples - # - pushd path-traversal-samples - # - pushd zip - # - make - # - popd - # - pushd rar - # - make - # - popd - # - popd - # - mv path-traversal-samples/zip/*.zip tests/src_complex/ - # - mv path-traversal-samples/rar/*.rar tests/src_complex/ + # Path traversal + - git clone https://github.com/jwilk/path-traversal-samples + - pushd path-traversal-samples + - pushd zip + - make + - popd + - pushd rar + - make + - popd + - popd + - mv path-traversal-samples/zip/*.zip tests/src_complex/ + - mv path-traversal-samples/rar/*.rar tests/src_complex/ # Office docs - git clone https://github.com/eea/odfpy.git - mv odfpy/tests/examples/* tests/src_complex/ diff --git a/bin/filecheck.py b/bin/filecheck.py index f35f4b7..1d3873c 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os +import sys import mimetypes import shlex import subprocess @@ -21,6 +22,7 @@ from pdfid import PDFiD, cPDFiD from kittengroomer import FileBase, KittenGroomerBase, main SEVENZ = '/usr/bin/7z' +PY3 = sys.version_info.major == 3 # Prepare application/ diff --git a/bin/generic.py b/bin/generic.py index d368755..e76fccd 100644 --- a/bin/generic.py +++ b/bin/generic.py @@ -258,7 +258,7 @@ class KittenGroomer(KittenGroomerBase): self._safe_mkdir(tmpdir) # The magic comes from here: http://svn.ghostscript.com/ghostscript/trunk/gs/doc/Ps2pdf.htm#PDFA curdir = os.getcwd() - os.chdir(self.ressources_path) + os.chdir(self.resources_path) gs_command = '{} -dPDFA -dQUIET -dSAFER -dBATCH -dNOPAUSE -dNOOUTERSAVE -sProcessColorModel=DeviceCMYK -sDEVICE=pdfwrite -sPDFACompatibilityPolicy=1 -sOutputFile="{}" ./PDFA_def.ps "{}"'.format( GS, os.path.join(curdir, tmppath), os.path.join(curdir, self.cur_file.src_path)) self._run_process(gs_command) diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 3a20a74..990f899 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -1,43 +1,54 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- + +""" +Contains the base objects for use when creating a sanitizer using +PyCIRCLean. Subclass FileBase and KittenGroomerBase to implement your +desired behavior. +""" + + import os import sys -import magic import hashlib import shutil -from twiggy import quick_setup, log import argparse +import magic +from twiggy import quick_setup, log + class KittenGroomerError(Exception): + """Base KittenGroomer exception handler.""" + def __init__(self, message): - ''' - Base KittenGroomer exception handler. - ''' super(KittenGroomerError, self).__init__(message) self.message = message class ImplementationRequired(KittenGroomerError): - ''' - Implementation required error - ''' + """Implementation required error.""" + pass class FileBase(object): + """ + Base object for individual files in the source directory. Contains file + attributes and various helper methods. Subclass and add attributes + or methods relevant to a given implementation. + """ def __init__(self, src_path, dst_path): - ''' - Contains base information for a file on the source USB key, - initialised with expected src and dest path - ''' + """Initialized with the source path and expected destination path.""" self.src_path = src_path self.dst_path = dst_path self.log_details = {'filepath': self.src_path} self.log_string = '' - a, self.extension = os.path.splitext(self.src_path) + _, self.extension = os.path.splitext(self.src_path) + self._determine_mimetype() + def _determine_mimetype(self): if os.path.islink(self.src_path): # magic will throw an IOError on a broken symlink self.mimetype = 'inode/symlink' @@ -52,7 +63,6 @@ class FileBase(object): self.mimetype = mt.decode("utf-8") except: self.mimetype = mt - if self.mimetype and '/' in self.mimetype: self.main_type, self.sub_type = self.mimetype.split('/') else: @@ -60,40 +70,53 @@ class FileBase(object): self.sub_type = '' def has_mimetype(self): + """ + Returns True if file has a full mimetype, else False. + + Returns False + updates log if self.main_type or self.sub_type + are not set. + """ + if not self.main_type or not self.sub_type: self.log_details.update({'broken_mime': True}) return False return True def has_extension(self): + """ + Returns True if self.extension is set, else False. + + Returns False + updates self.log_details if self.extension is not set. + """ if not self.extension: self.log_details.update({'no_extension': True}) return False return True def is_dangerous(self): + """Returns True if self.log_details contains 'dangerous'.""" if self.log_details.get('dangerous'): return True return False def is_symlink(self): + """Returns True and updates log if file is a symlink.""" if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink': self.log_details.update({'symlink': os.readlink(self.src_path)}) return True return False def add_log_details(self, key, value): - ''' - Add an entry in the log dictionary - ''' + """Takes a key + a value and adds them to self.log_details.""" self.log_details[key] = value def make_dangerous(self): - ''' - This file should be considered as dangerous and never run. - Prepending and appending DANGEROUS to the destination - file name avoid double-click of death - ''' + """ + Marks a file as dangerous. + + Prepends and appends DANGEROUS to the destination file name + to avoid double-click of death. + """ if self.is_dangerous(): # Already marked as dangerous, do nothing return @@ -102,11 +125,7 @@ class FileBase(object): self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename)) def make_unknown(self): - ''' - This file has an unknown type and it was not possible to take - a decision. Theuser will have to decide what to do. - Prepending UNKNOWN - ''' + """Marks a file as an unknown type and prepends UNKNOWN to filename.""" if self.is_dangerous() or self.log_details.get('binary'): # Already marked as dangerous or binary, do nothing return @@ -115,11 +134,7 @@ class FileBase(object): self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename)) def make_binary(self): - ''' - This file is a binary, and should probably not be run. - Appending .bin avoir double click of death but the user - will have to decide by itself. - ''' + """Marks a file as a binary and appends .bin to filename.""" if self.is_dangerous(): # Already marked as dangerous, do nothing return @@ -128,17 +143,17 @@ class FileBase(object): self.dst_path = os.path.join(path, '{}.bin'.format(filename)) def force_ext(self, ext): + """If dst_path does not end in ext, appends the ext and updates log.""" if not self.dst_path.endswith(ext): self.log_details['force_ext'] = True self.dst_path += ext class KittenGroomerBase(object): + """Base object responsible for copy/sanitization process.""" def __init__(self, root_src, root_dst, debug=False): - ''' - Setup the base options of the copy/convert setup - ''' + """Initialized with path to source and dest directories.""" self.src_root_dir = root_src self.dst_root_dir = root_dst self.log_root_dir = os.path.join(self.dst_root_dir, 'logs') @@ -150,8 +165,8 @@ class KittenGroomerBase(object): quick_setup(file=self.log_processing) self.log_name = log.name('files') - self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') - os.environ["PATH"] += os.pathsep + self.ressources_path + self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') + os.environ["PATH"] += os.pathsep + self.resources_path self.cur_file = None @@ -164,6 +179,7 @@ class KittenGroomerBase(object): self.log_debug_out = os.devnull def _computehash(self, path): + """Returns a sha1 hash of a file at a given path.""" s = hashlib.sha1() with open(path, 'rb') as f: while True: @@ -174,6 +190,7 @@ class KittenGroomerBase(object): return s.hexdigest() def tree(self, base_dir, padding=' '): + """Writes a graphical tree to the log for a given directory.""" if sys.version_info.major == 2: self.__tree_py2(base_dir, padding) else: @@ -211,22 +228,22 @@ class KittenGroomerBase(object): # ##### Helpers ##### def _safe_rmtree(self, directory): - '''Remove a directory tree if it exists''' + """Remove a directory tree if it exists.""" if os.path.exists(directory): shutil.rmtree(directory) def _safe_remove(self, filepath): - '''Remove a file if it exists''' + """Remove a file if it exists.""" if os.path.exists(filepath): os.remove(filepath) def _safe_mkdir(self, directory): - '''Make a directory if it does not exist''' + """Make a directory if it does not exist.""" if not os.path.exists(directory): os.makedirs(directory) def _safe_copy(self, src=None, dst=None): - ''' Copy a file and create directory if needed''' + """Copy a file and create directory if needed.""" if src is None: src = self.cur_file.src_path if dst is None: @@ -242,10 +259,10 @@ class KittenGroomerBase(object): return False def _safe_metadata_split(self, ext): - '''Create a separate file to hold this file's metadata''' + """Create a separate file to hold this file's metadata.""" dst = self.cur_file.dst_path try: - if os.path.exists(self.cur_file.src_path + ext): + if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well? raise KittenGroomerError("Cannot create split metadata file for \"" + self.cur_file.dst_path + "\", type '" + ext + "': File exists.") @@ -258,31 +275,31 @@ class KittenGroomerBase(object): return False def _list_all_files(self, directory): - ''' Generate an iterator over all the files in a directory tree''' + """Generate an iterator over all the files in a directory tree.""" for root, dirs, files in os.walk(directory): for filename in files: filepath = os.path.join(root, filename) yield filepath def _print_log(self): - ''' - Print log, should be called after each file. + """ + Print log, should be called after each file. - You probably want to reimplement it in the subclass - ''' + You probably want to reimplement it in the subclass. + """ tmp_log = self.log_name.fields(**self.cur_file.log_details) tmp_log.info('It did a thing.') ####################### def processdir(self, src_dir=None, dst_dir=None): - ''' - Main function doing the work, you have to implement it yourself. - ''' - raise ImplementationRequired('You have to implement the result processdir.') + """ + Implement this function in your subclass to define file processing behavior. + """ + raise ImplementationRequired('Please implement processdir.') -def main(kg_implementation, description='Call the KittenGroomer implementation to do things on files present in the source directory to the destination directory'): +def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'): parser = argparse.ArgumentParser(prog='KittenGroomer', description=description) parser.add_argument('-s', '--source', type=str, help='Source directory') parser.add_argument('-d', '--destination', type=str, help='Destination directory') diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..a14510a --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys + +import pytest + +from kittengroomer import FileBase, KittenGroomerBase +from kittengroomer.helpers import ImplementationRequired + +PY3 = sys.version_info.major == 3 +skip = pytest.mark.skip +xfail = pytest.mark.xfail +fixture = pytest.fixture + + +# FileBase + +class TestFileBase: + + @fixture + def source_file(self): + return 'tests/src_simple/blah.conf' + + @fixture + def dest_file(self): + return 'tests/dst/blah.conf' + + @fixture + def generic_conf_file(self, source_file, dest_file): + return FileBase(source_file, dest_file) + + @fixture + def symlink(self, tmpdir): + file_path = tmpdir.join('test.txt') + file_path.write('testing') + file_path = file_path.strpath + symlink_path = tmpdir.join('symlinked.txt') + symlink_path = symlink_path.strpath + os.symlink(file_path, symlink_path) + return FileBase(symlink_path, symlink_path) + + @fixture + def temp_file(self, tmpdir): + file_path = tmpdir.join('test.txt') + file_path.write('testing') + file_path = file_path.strpath + return FileBase(file_path, file_path) + + @fixture + def temp_file_no_ext(self, tmpdir): + file_path = tmpdir.join('test') + file_path.write('testing') + file_path = file_path.strpath + return FileBase(file_path, file_path) + + @fixture + def file_marked_dangerous(self, generic_conf_file): + generic_conf_file.make_dangerous() + return generic_conf_file + + @fixture + def file_marked_unknown(self, generic_conf_file): + generic_conf_file.make_unknown() + return generic_conf_file + + @fixture + def file_marked_binary(self, generic_conf_file): + generic_conf_file.mark_binary() + return generic_conf_file + + @fixture(params=[ + FileBase.make_dangerous, + FileBase.make_unknown, + FileBase.make_binary + ]) + def file_marked_all_parameterized(self, request, generic_conf_file): + request.param(generic_conf_file) + return generic_conf_file + + # What are the various things that can go wrong with file paths? We should have fixtures for them + # What should FileBase do if it's given a path that isn't a file (doesn't exist or is a dir)? Currently magic throws an exception + # We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log) + + def test_create(self): + file = FileBase('tests/src_simple/blah.conf', '/tests/dst/blah.conf') + + def test_create_broken(self, tmpdir): + with pytest.raises(TypeError): + file_no_args = FileBase() + if PY3: + with pytest.raises(FileNotFoundError): + file_empty_args = FileBase('', '') + else: + with pytest.raises(IOError): + file_empty_args = FileBase('', '') + if PY3: + with pytest.raises(IsADirectoryError): + file_directory = FileBase(tmpdir.strpath, tmpdir.strpath) + else: + with pytest.raises(IOError): + file_directory = FileBase(tmpdir.strpath, tmpdir.strpath) + # are there other cases here? path to a file that doesn't exist? permissions? + + def test_init(self, generic_conf_file): + file = generic_conf_file + assert file.log_details + assert file.log_details['filepath'] == file.src_path + assert file.extension == '.conf' + copied_log = file.log_details.copy() + file.log_details = '' + # assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable + # we should probably check for more extensions here + + def test_mimetypes(self, generic_conf_file): + assert generic_conf_file.has_mimetype() + assert generic_conf_file.mimetype == 'text/plain' + assert generic_conf_file.main_type == 'text' + assert generic_conf_file.sub_type == 'plain' + # Need to test something without a mimetype + # Need to test something that's a directory + # Need to test something that causes the unicode exception + + def test_has_mimetype_no_main_type(self, generic_conf_file): + generic_conf_file.main_type = '' + assert generic_conf_file.has_mimetype() is False + + def test_has_mimetype_no_sub_type(self, generic_conf_file): + generic_conf_file.sub_type = '' + assert generic_conf_file.has_mimetype() is False + + def test_has_extension(self, temp_file, temp_file_no_ext): + assert temp_file.has_extension() is True + assert temp_file_no_ext.has_extension() is False + assert temp_file_no_ext.log_details.get('no_extension') is True + + def test_add_log_details(self, generic_conf_file): + generic_conf_file.add_log_details('test', True) + assert generic_conf_file.log_details['test'] is True + with pytest.raises(KeyError): + assert generic_conf_file.log_details['wrong'] is False + + def test_marked_dangerous(self, file_marked_all_parameterized): + file_marked_all_parameterized.make_dangerous() + assert file_marked_all_parameterized.is_dangerous() is True + # Should work regardless of weird paths?? + # Should check file path alteration behavior as well + + def test_generic_dangerous(self, generic_conf_file): + assert generic_conf_file.is_dangerous() is False + generic_conf_file.make_dangerous() + assert generic_conf_file.is_dangerous() is True + + def test_has_symlink(self, tmpdir): + file_path = tmpdir.join('test.txt') + file_path.write('testing') + file_path = file_path.strpath + symlink_path = tmpdir.join('symlinked.txt') + symlink_path = symlink_path.strpath + file_symlink = os.symlink(file_path, symlink_path) + file = FileBase(file_path, file_path) + symlink = FileBase(symlink_path, symlink_path) + assert file.is_symlink() is False + assert symlink.is_symlink() is True + + def test_has_symlink_fixture(self, symlink): + assert symlink.is_symlink() is True + + def test_generic_make_unknown(self, generic_conf_file): + assert generic_conf_file.log_details.get('unknown') is None + generic_conf_file.make_unknown() + assert generic_conf_file.log_details.get('unknown') is True + # given a FileBase object with no marking, should do the right things + + def test_marked_make_unknown(self, file_marked_all_parameterized): + file = file_marked_all_parameterized + if file.log_details.get('unknown'): + file.make_unknown() + assert file.log_details.get('unknown') is True + else: + assert file.log_details.get('unknown') is None + file.make_unknown() + assert file.log_details.get('unknown') is None + # given a FileBase object with an unrecognized marking, should ??? + + def test_generic_make_binary(self, generic_conf_file): + assert generic_conf_file.log_details.get('binary') is None + generic_conf_file.make_binary() + assert generic_conf_file.log_details.get('binary') is True + + def test_marked_make_binary(self, file_marked_all_parameterized): + file = file_marked_all_parameterized + if file.log_details.get('dangerous'): + file.make_binary() + assert file.log_details.get('binary') is None + else: + file.make_binary() + assert file.log_details.get('binary') is True + + def test_force_ext_change(self, generic_conf_file): + assert generic_conf_file.has_extension() + assert generic_conf_file.extension == '.conf' + assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf' + generic_conf_file.force_ext('.txt') + assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt' + assert generic_conf_file.log_details.get('force_ext') is True + # should make a file's extension change + # should be able to handle weird paths + + def test_force_ext_correct(self, generic_conf_file): + assert generic_conf_file.has_extension() + assert generic_conf_file.extension == '.conf' + generic_conf_file.force_ext('.conf') + assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf' + assert generic_conf_file.log_details.get('force_ext') is None + # shouldn't change a file's extension if it already is right + + +class TestKittenGroomerBase: + + @fixture + def source_directory(self): + return 'tests/src_complex' + + @fixture + def dest_directory(self): + return 'tests/dst' + + @fixture + def generic_groomer(self, source_directory, dest_directory): + return KittenGroomerBase(source_directory, dest_directory) + + def test_create(self, generic_groomer): + assert generic_groomer + + def test_instantiation(self, source_directory, dest_directory): + groomer = KittenGroomerBase(source_directory, dest_directory) + debug_groomer = KittenGroomerBase(source_directory, + dest_directory, + debug=True) + # we should maybe protect access to self.current_file in some way? + + def test_computehash(self, tmpdir): + file = tmpdir.join('test.txt') + file.write('testing') + simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) + simple_groomer._computehash(file.strpath) + + def test_tree(self, generic_groomer): + generic_groomer.tree(generic_groomer.src_root_dir) + + def test_safe_copy(self, tmpdir): + file = tmpdir.join('test.txt') + file.write('testing') + testdir = tmpdir.join('testdir') + os.mkdir(testdir.strpath) + filedest = testdir.join('test.txt') + simple_groomer = KittenGroomerBase(tmpdir.strpath, testdir.strpath) + simple_groomer.cur_file = FileBase(file.strpath, filedest.strpath) + assert simple_groomer._safe_copy() is True + #check that it handles weird file path inputs + + def test_safe_metadata_split(self, tmpdir): + file = tmpdir.join('test.txt') + file.write('testing') + simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) + simple_groomer.cur_file = FileBase(file.strpath, file.strpath) + metadata_file = simple_groomer._safe_metadata_split('metadata.log') + metadata_file.write('Have some metadata!') + metadata_file.close() + assert simple_groomer._safe_metadata_split('') is False + # if metadata file already exists + # if there is no metadata to write should this work? + + def test_list_all_files(self, tmpdir): + file = tmpdir.join('test.txt') + file.write('testing') + testdir = tmpdir.join('testdir') + os.mkdir(testdir.strpath) + simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath) + files = simple_groomer._list_all_files(simple_groomer.src_root_dir) + assert file.strpath in files + assert testdir.strpath not in files + + def test_print_log(self, generic_groomer): + with pytest.raises(AttributeError): + generic_groomer._print_log() + # Kind of a bad test, but this should be implemented by the user anyway + + def test_processdir(self, generic_groomer): + with pytest.raises(ImplementationRequired): + generic_groomer.processdir() diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..da1ed31 --- /dev/null +++ b/tox.ini @@ -0,0 +1,5 @@ +[tox] +envlist=py27,py35 +[testenv] +deps=-rdev-requirements.txt +commands= pytest tests/test_helpers.py --cov=kittengroomer