Merge pull request #7 from dputtick/test-unit

Docstrings and unit testing for helpers.py
pull/9/head
Raphaël Vinot 2016-12-15 10:13:39 +01:00 committed by GitHub
commit ff08dc1353
6 changed files with 395 additions and 79 deletions

View File

@ -1,13 +1,14 @@
language: python
python:
- "2.7_with_system_site_packages"
- "3.3"
- "3.4"
- "3.5"
- "nightly"
- 2.7
- 3.3
- 3.4
- 3.5
- nightly
sudo: required
# do we need sudo? should double check
dist: trusty
@ -44,18 +45,16 @@ install:
- sudo apt-get install libxml2-dev libxslt1-dev
- wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
- unzip pdfid_v0_2_1.zip
- pip install -U pip
- pip install lxml exifread pillow
- pip install git+https://github.com/Rafiot/officedissector.git
- |
if [[ "$TRAVIS_PYTHON_VERSION" == "2.7_with_system_site_packages" ]]; then
sudo pip install -U pip lxml exifread pillow
sudo pip install -U git+https://github.com/Rafiot/officedissector.git
sudo pip install -U oletools olefile coveralls codecov pytest-cov
else
pip install -U pip lxml exifread pillow
pip install -U git+https://github.com/Rafiot/officedissector.git
pip install -U coveralls codecov pytest-cov
if [[ "$TRAVIS_PYTHON_VERSION" == 2* ]]; then
pip install -U oletools olefile
fi
# Module dependencies
- pip install -r dev-requirements.txt
- pip install coveralls codecov
# Testing dependencies
- sudo apt-get install rar
# Prepare tests
@ -65,18 +64,18 @@ install:
- python unpackall.py
- popd
- mv theZoo/malwares/Binaries/out tests/src_complex/
# path traversal
# - hg clone https://bitbucket.org/jwilk/path-traversal-samples
# - pushd path-traversal-samples
# - pushd zip
# - make
# - popd
# - pushd rar
# - make
# - popd
# - popd
# - mv path-traversal-samples/zip/*.zip tests/src_complex/
# - mv path-traversal-samples/rar/*.rar tests/src_complex/
# Path traversal
- git clone https://github.com/jwilk/path-traversal-samples
- pushd path-traversal-samples
- pushd zip
- make
- popd
- pushd rar
- make
- popd
- popd
- mv path-traversal-samples/zip/*.zip tests/src_complex/
- mv path-traversal-samples/rar/*.rar tests/src_complex/
# Office docs
- git clone https://github.com/eea/odfpy.git
- mv odfpy/tests/examples/* tests/src_complex/

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import mimetypes
import shlex
import subprocess
@ -21,6 +22,7 @@ from pdfid import PDFiD, cPDFiD
from kittengroomer import FileBase, KittenGroomerBase, main
SEVENZ = '/usr/bin/7z'
PY3 = sys.version_info.major == 3
# Prepare application/<subtype>

View File

@ -258,7 +258,7 @@ class KittenGroomer(KittenGroomerBase):
self._safe_mkdir(tmpdir)
# The magic comes from here: http://svn.ghostscript.com/ghostscript/trunk/gs/doc/Ps2pdf.htm#PDFA
curdir = os.getcwd()
os.chdir(self.ressources_path)
os.chdir(self.resources_path)
gs_command = '{} -dPDFA -dQUIET -dSAFER -dBATCH -dNOPAUSE -dNOOUTERSAVE -sProcessColorModel=DeviceCMYK -sDEVICE=pdfwrite -sPDFACompatibilityPolicy=1 -sOutputFile="{}" ./PDFA_def.ps "{}"'.format(
GS, os.path.join(curdir, tmppath), os.path.join(curdir, self.cur_file.src_path))
self._run_process(gs_command)

View File

@ -1,43 +1,54 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains the base objects for use when creating a sanitizer using
PyCIRCLean. Subclass FileBase and KittenGroomerBase to implement your
desired behavior.
"""
import os
import sys
import magic
import hashlib
import shutil
from twiggy import quick_setup, log
import argparse
import magic
from twiggy import quick_setup, log
class KittenGroomerError(Exception):
"""Base KittenGroomer exception handler."""
def __init__(self, message):
'''
Base KittenGroomer exception handler.
'''
super(KittenGroomerError, self).__init__(message)
self.message = message
class ImplementationRequired(KittenGroomerError):
'''
Implementation required error
'''
"""Implementation required error."""
pass
class FileBase(object):
"""
Base object for individual files in the source directory. Contains file
attributes and various helper methods. Subclass and add attributes
or methods relevant to a given implementation.
"""
def __init__(self, src_path, dst_path):
'''
Contains base information for a file on the source USB key,
initialised with expected src and dest path
'''
"""Initialized with the source path and expected destination path."""
self.src_path = src_path
self.dst_path = dst_path
self.log_details = {'filepath': self.src_path}
self.log_string = ''
a, self.extension = os.path.splitext(self.src_path)
_, self.extension = os.path.splitext(self.src_path)
self._determine_mimetype()
def _determine_mimetype(self):
if os.path.islink(self.src_path):
# magic will throw an IOError on a broken symlink
self.mimetype = 'inode/symlink'
@ -52,7 +63,6 @@ class FileBase(object):
self.mimetype = mt.decode("utf-8")
except:
self.mimetype = mt
if self.mimetype and '/' in self.mimetype:
self.main_type, self.sub_type = self.mimetype.split('/')
else:
@ -60,40 +70,53 @@ class FileBase(object):
self.sub_type = ''
def has_mimetype(self):
"""
Returns True if file has a full mimetype, else False.
Returns False + updates log if self.main_type or self.sub_type
are not set.
"""
if not self.main_type or not self.sub_type:
self.log_details.update({'broken_mime': True})
return False
return True
def has_extension(self):
"""
Returns True if self.extension is set, else False.
Returns False + updates self.log_details if self.extension is not set.
"""
if not self.extension:
self.log_details.update({'no_extension': True})
return False
return True
def is_dangerous(self):
"""Returns True if self.log_details contains 'dangerous'."""
if self.log_details.get('dangerous'):
return True
return False
def is_symlink(self):
"""Returns True and updates log if file is a symlink."""
if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink':
self.log_details.update({'symlink': os.readlink(self.src_path)})
return True
return False
def add_log_details(self, key, value):
'''
Add an entry in the log dictionary
'''
"""Takes a key + a value and adds them to self.log_details."""
self.log_details[key] = value
def make_dangerous(self):
'''
This file should be considered as dangerous and never run.
Prepending and appending DANGEROUS to the destination
file name avoid double-click of death
'''
"""
Marks a file as dangerous.
Prepends and appends DANGEROUS to the destination file name
to avoid double-click of death.
"""
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
@ -102,11 +125,7 @@ class FileBase(object):
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
def make_unknown(self):
'''
This file has an unknown type and it was not possible to take
a decision. Theuser will have to decide what to do.
Prepending UNKNOWN
'''
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
if self.is_dangerous() or self.log_details.get('binary'):
# Already marked as dangerous or binary, do nothing
return
@ -115,11 +134,7 @@ class FileBase(object):
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
def make_binary(self):
'''
This file is a binary, and should probably not be run.
Appending .bin avoir double click of death but the user
will have to decide by itself.
'''
"""Marks a file as a binary and appends .bin to filename."""
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
@ -128,17 +143,17 @@ class FileBase(object):
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
def force_ext(self, ext):
"""If dst_path does not end in ext, appends the ext and updates log."""
if not self.dst_path.endswith(ext):
self.log_details['force_ext'] = True
self.dst_path += ext
class KittenGroomerBase(object):
"""Base object responsible for copy/sanitization process."""
def __init__(self, root_src, root_dst, debug=False):
'''
Setup the base options of the copy/convert setup
'''
"""Initialized with path to source and dest directories."""
self.src_root_dir = root_src
self.dst_root_dir = root_dst
self.log_root_dir = os.path.join(self.dst_root_dir, 'logs')
@ -150,8 +165,8 @@ class KittenGroomerBase(object):
quick_setup(file=self.log_processing)
self.log_name = log.name('files')
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
os.environ["PATH"] += os.pathsep + self.ressources_path
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
os.environ["PATH"] += os.pathsep + self.resources_path
self.cur_file = None
@ -164,6 +179,7 @@ class KittenGroomerBase(object):
self.log_debug_out = os.devnull
def _computehash(self, path):
"""Returns a sha1 hash of a file at a given path."""
s = hashlib.sha1()
with open(path, 'rb') as f:
while True:
@ -174,6 +190,7 @@ class KittenGroomerBase(object):
return s.hexdigest()
def tree(self, base_dir, padding=' '):
"""Writes a graphical tree to the log for a given directory."""
if sys.version_info.major == 2:
self.__tree_py2(base_dir, padding)
else:
@ -211,22 +228,22 @@ class KittenGroomerBase(object):
# ##### Helpers #####
def _safe_rmtree(self, directory):
'''Remove a directory tree if it exists'''
"""Remove a directory tree if it exists."""
if os.path.exists(directory):
shutil.rmtree(directory)
def _safe_remove(self, filepath):
'''Remove a file if it exists'''
"""Remove a file if it exists."""
if os.path.exists(filepath):
os.remove(filepath)
def _safe_mkdir(self, directory):
'''Make a directory if it does not exist'''
"""Make a directory if it does not exist."""
if not os.path.exists(directory):
os.makedirs(directory)
def _safe_copy(self, src=None, dst=None):
''' Copy a file and create directory if needed'''
"""Copy a file and create directory if needed."""
if src is None:
src = self.cur_file.src_path
if dst is None:
@ -242,10 +259,10 @@ class KittenGroomerBase(object):
return False
def _safe_metadata_split(self, ext):
'''Create a separate file to hold this file's metadata'''
"""Create a separate file to hold this file's metadata."""
dst = self.cur_file.dst_path
try:
if os.path.exists(self.cur_file.src_path + ext):
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
raise KittenGroomerError("Cannot create split metadata file for \"" +
self.cur_file.dst_path + "\", type '" +
ext + "': File exists.")
@ -258,31 +275,31 @@ class KittenGroomerBase(object):
return False
def _list_all_files(self, directory):
''' Generate an iterator over all the files in a directory tree'''
"""Generate an iterator over all the files in a directory tree."""
for root, dirs, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
yield filepath
def _print_log(self):
'''
Print log, should be called after each file.
"""
Print log, should be called after each file.
You probably want to reimplement it in the subclass
'''
You probably want to reimplement it in the subclass.
"""
tmp_log = self.log_name.fields(**self.cur_file.log_details)
tmp_log.info('It did a thing.')
#######################
def processdir(self, src_dir=None, dst_dir=None):
'''
Main function doing the work, you have to implement it yourself.
'''
raise ImplementationRequired('You have to implement the result processdir.')
"""
Implement this function in your subclass to define file processing behavior.
"""
raise ImplementationRequired('Please implement processdir.')
def main(kg_implementation, description='Call the KittenGroomer implementation to do things on files present in the source directory to the destination directory'):
def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'):
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')
parser.add_argument('-d', '--destination', type=str, help='Destination directory')

293
tests/test_helpers.py Normal file
View File

@ -0,0 +1,293 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pytest
from kittengroomer import FileBase, KittenGroomerBase
from kittengroomer.helpers import ImplementationRequired
PY3 = sys.version_info.major == 3
skip = pytest.mark.skip
xfail = pytest.mark.xfail
fixture = pytest.fixture
# FileBase
class TestFileBase:
@fixture
def source_file(self):
return 'tests/src_simple/blah.conf'
@fixture
def dest_file(self):
return 'tests/dst/blah.conf'
@fixture
def generic_conf_file(self, source_file, dest_file):
return FileBase(source_file, dest_file)
@fixture
def symlink(self, tmpdir):
file_path = tmpdir.join('test.txt')
file_path.write('testing')
file_path = file_path.strpath
symlink_path = tmpdir.join('symlinked.txt')
symlink_path = symlink_path.strpath
os.symlink(file_path, symlink_path)
return FileBase(symlink_path, symlink_path)
@fixture
def temp_file(self, tmpdir):
file_path = tmpdir.join('test.txt')
file_path.write('testing')
file_path = file_path.strpath
return FileBase(file_path, file_path)
@fixture
def temp_file_no_ext(self, tmpdir):
file_path = tmpdir.join('test')
file_path.write('testing')
file_path = file_path.strpath
return FileBase(file_path, file_path)
@fixture
def file_marked_dangerous(self, generic_conf_file):
generic_conf_file.make_dangerous()
return generic_conf_file
@fixture
def file_marked_unknown(self, generic_conf_file):
generic_conf_file.make_unknown()
return generic_conf_file
@fixture
def file_marked_binary(self, generic_conf_file):
generic_conf_file.mark_binary()
return generic_conf_file
@fixture(params=[
FileBase.make_dangerous,
FileBase.make_unknown,
FileBase.make_binary
])
def file_marked_all_parameterized(self, request, generic_conf_file):
request.param(generic_conf_file)
return generic_conf_file
# What are the various things that can go wrong with file paths? We should have fixtures for them
# What should FileBase do if it's given a path that isn't a file (doesn't exist or is a dir)? Currently magic throws an exception
# We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log)
def test_create(self):
file = FileBase('tests/src_simple/blah.conf', '/tests/dst/blah.conf')
def test_create_broken(self, tmpdir):
with pytest.raises(TypeError):
file_no_args = FileBase()
if PY3:
with pytest.raises(FileNotFoundError):
file_empty_args = FileBase('', '')
else:
with pytest.raises(IOError):
file_empty_args = FileBase('', '')
if PY3:
with pytest.raises(IsADirectoryError):
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
else:
with pytest.raises(IOError):
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
# are there other cases here? path to a file that doesn't exist? permissions?
def test_init(self, generic_conf_file):
file = generic_conf_file
assert file.log_details
assert file.log_details['filepath'] == file.src_path
assert file.extension == '.conf'
copied_log = file.log_details.copy()
file.log_details = ''
# assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable
# we should probably check for more extensions here
def test_mimetypes(self, generic_conf_file):
assert generic_conf_file.has_mimetype()
assert generic_conf_file.mimetype == 'text/plain'
assert generic_conf_file.main_type == 'text'
assert generic_conf_file.sub_type == 'plain'
# Need to test something without a mimetype
# Need to test something that's a directory
# Need to test something that causes the unicode exception
def test_has_mimetype_no_main_type(self, generic_conf_file):
generic_conf_file.main_type = ''
assert generic_conf_file.has_mimetype() is False
def test_has_mimetype_no_sub_type(self, generic_conf_file):
generic_conf_file.sub_type = ''
assert generic_conf_file.has_mimetype() is False
def test_has_extension(self, temp_file, temp_file_no_ext):
assert temp_file.has_extension() is True
assert temp_file_no_ext.has_extension() is False
assert temp_file_no_ext.log_details.get('no_extension') is True
def test_add_log_details(self, generic_conf_file):
generic_conf_file.add_log_details('test', True)
assert generic_conf_file.log_details['test'] is True
with pytest.raises(KeyError):
assert generic_conf_file.log_details['wrong'] is False
def test_marked_dangerous(self, file_marked_all_parameterized):
file_marked_all_parameterized.make_dangerous()
assert file_marked_all_parameterized.is_dangerous() is True
# Should work regardless of weird paths??
# Should check file path alteration behavior as well
def test_generic_dangerous(self, generic_conf_file):
assert generic_conf_file.is_dangerous() is False
generic_conf_file.make_dangerous()
assert generic_conf_file.is_dangerous() is True
def test_has_symlink(self, tmpdir):
file_path = tmpdir.join('test.txt')
file_path.write('testing')
file_path = file_path.strpath
symlink_path = tmpdir.join('symlinked.txt')
symlink_path = symlink_path.strpath
file_symlink = os.symlink(file_path, symlink_path)
file = FileBase(file_path, file_path)
symlink = FileBase(symlink_path, symlink_path)
assert file.is_symlink() is False
assert symlink.is_symlink() is True
def test_has_symlink_fixture(self, symlink):
assert symlink.is_symlink() is True
def test_generic_make_unknown(self, generic_conf_file):
assert generic_conf_file.log_details.get('unknown') is None
generic_conf_file.make_unknown()
assert generic_conf_file.log_details.get('unknown') is True
# given a FileBase object with no marking, should do the right things
def test_marked_make_unknown(self, file_marked_all_parameterized):
file = file_marked_all_parameterized
if file.log_details.get('unknown'):
file.make_unknown()
assert file.log_details.get('unknown') is True
else:
assert file.log_details.get('unknown') is None
file.make_unknown()
assert file.log_details.get('unknown') is None
# given a FileBase object with an unrecognized marking, should ???
def test_generic_make_binary(self, generic_conf_file):
assert generic_conf_file.log_details.get('binary') is None
generic_conf_file.make_binary()
assert generic_conf_file.log_details.get('binary') is True
def test_marked_make_binary(self, file_marked_all_parameterized):
file = file_marked_all_parameterized
if file.log_details.get('dangerous'):
file.make_binary()
assert file.log_details.get('binary') is None
else:
file.make_binary()
assert file.log_details.get('binary') is True
def test_force_ext_change(self, generic_conf_file):
assert generic_conf_file.has_extension()
assert generic_conf_file.extension == '.conf'
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
generic_conf_file.force_ext('.txt')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt'
assert generic_conf_file.log_details.get('force_ext') is True
# should make a file's extension change
# should be able to handle weird paths
def test_force_ext_correct(self, generic_conf_file):
assert generic_conf_file.has_extension()
assert generic_conf_file.extension == '.conf'
generic_conf_file.force_ext('.conf')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
assert generic_conf_file.log_details.get('force_ext') is None
# shouldn't change a file's extension if it already is right
class TestKittenGroomerBase:
@fixture
def source_directory(self):
return 'tests/src_complex'
@fixture
def dest_directory(self):
return 'tests/dst'
@fixture
def generic_groomer(self, source_directory, dest_directory):
return KittenGroomerBase(source_directory, dest_directory)
def test_create(self, generic_groomer):
assert generic_groomer
def test_instantiation(self, source_directory, dest_directory):
groomer = KittenGroomerBase(source_directory, dest_directory)
debug_groomer = KittenGroomerBase(source_directory,
dest_directory,
debug=True)
# we should maybe protect access to self.current_file in some way?
def test_computehash(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
simple_groomer._computehash(file.strpath)
def test_tree(self, generic_groomer):
generic_groomer.tree(generic_groomer.src_root_dir)
def test_safe_copy(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
testdir = tmpdir.join('testdir')
os.mkdir(testdir.strpath)
filedest = testdir.join('test.txt')
simple_groomer = KittenGroomerBase(tmpdir.strpath, testdir.strpath)
simple_groomer.cur_file = FileBase(file.strpath, filedest.strpath)
assert simple_groomer._safe_copy() is True
#check that it handles weird file path inputs
def test_safe_metadata_split(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
simple_groomer.cur_file = FileBase(file.strpath, file.strpath)
metadata_file = simple_groomer._safe_metadata_split('metadata.log')
metadata_file.write('Have some metadata!')
metadata_file.close()
assert simple_groomer._safe_metadata_split('') is False
# if metadata file already exists
# if there is no metadata to write should this work?
def test_list_all_files(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
testdir = tmpdir.join('testdir')
os.mkdir(testdir.strpath)
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
files = simple_groomer._list_all_files(simple_groomer.src_root_dir)
assert file.strpath in files
assert testdir.strpath not in files
def test_print_log(self, generic_groomer):
with pytest.raises(AttributeError):
generic_groomer._print_log()
# Kind of a bad test, but this should be implemented by the user anyway
def test_processdir(self, generic_groomer):
with pytest.raises(ImplementationRequired):
generic_groomer.processdir()

5
tox.ini Normal file
View File

@ -0,0 +1,5 @@
[tox]
envlist=py27,py35
[testenv]
deps=-rdev-requirements.txt
commands= pytest tests/test_helpers.py --cov=kittengroomer