Merge pull request #12 from dputtick/dev

API changes and (some) logging functionality
pull/13/head
Raphaël Vinot 2017-03-16 10:45:20 +01:00 committed by GitHub
commit 79b15fd7da
16 changed files with 912 additions and 884 deletions

4
.gitignore vendored
View File

@ -68,7 +68,11 @@ target/
# Project specific
tests/dst/*
tests/*_dst
tests/test_logs/*
!tests/**/.keepdir
!tests/src_invalid/*
!tests/src_valid/*
pdfid.py
# Plugins are pdfid stuff
plugin_*

View File

@ -66,8 +66,8 @@ install:
- rm fraunhoferlibrary.zip
- 7z x -p42 42.zip
# Some random samples
- wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3
- wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4
# - wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3
# - wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4
- wget http://thewalter.net/stef/software/rtfx/sample.rtf
- popd

115
README.md
View File

@ -1,13 +1,12 @@
[![Build Status](https://travis-ci.org/CIRCL/PyCIRCLean.svg?branch=master)](https://travis-ci.org/CIRCL/PyCIRCLean)
[![codecov.io](https://codecov.io/github/CIRCL/PyCIRCLean/coverage.svg?branch=master)](https://codecov.io/github/CIRCL/PyCIRCLean?branch=master)
[![Coverage Status](https://coveralls.io/repos/github/Rafiot/PyCIRCLean/badge.svg?branch=master)](https://coveralls.io/github/Rafiot/PyCIRCLean?branch=master)
# PyCIRCLean
PyCIRCLean is the core Python code used by [CIRCLean](https://github.com/CIRCL/Circlean/), an open-source
USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the
device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments
to trusted environments. PyCIRCLean is currently Python 3.3+ only.
USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the
device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments
to trusted environments. PyCIRCLean is currently Python 3.3+ compatible.
# Installation
@ -23,10 +22,13 @@ pip install .
# How to use PyCIRCLean
PyCIRCLean is a simple Python library to handle file checking and sanitization. PyCIRCLean is designed as a simple library
that can be overloaded to cover specific checking and sanitization workflows in different organizations like industrial
PyCIRCLean is a simple Python library to handle file checking and sanitization.
PyCIRCLean is designed to be extended to cover specific checking
and sanitization workflows in different organizations such as industrial
environments or restricted/classified ICT environments. A series of practical examples utilizing PyCIRCLean can be found
in the [./examples](./examples) directory.
in the [./examples](./examples) directory. Note: for commits beyond version 2.2.0 these
examples are not guaranteed to work with the PyCIRCLean API. Please check [helpers.py](./kittengroomer/helpers.py) or
[filecheck.py](./bin/filecheck.py) to see the new API interface.
The following simple example using PyCIRCLean will only copy files with a .conf extension matching the 'text/plain' MIME
type. If any other file is found in the source directory, the files won't be copied to the destination directory.
@ -41,94 +43,79 @@ from kittengroomer import FileBase, KittenGroomerBase, main
# Extension
configfiles = {'.conf': 'text/plain'}
class Config:
configfiles = {'.conf': 'text/plain'}
class FileSpec(FileBase):
def __init__(self, src_path, dst_path):
''' Init file object, set the extension '''
"""Init file object, set the extension."""
super(FileSpec, self).__init__(src_path, dst_path)
self.valid_files = {}
a, self.extension = os.path.splitext(self.src_path)
self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8")
# The initial version will only accept the file extensions/mimetypes listed here.
self.valid_files.update(Config.configfiles)
def check(self):
valid = True
expected_mime = self.valid_files.get(self.extension)
if expected_mime is None:
# Unexpected extension => disallowed
valid = False
compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys()))
elif self.mimetype != expected_mime:
# Unexpected mimetype => disallowed
valid = False
compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime)
self.add_log_details('valid', valid)
if valid:
self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype)
else:
self.should_copy = False
if compare_ext is not None:
self.add_log_string(compare_ext)
else:
self.add_log_string(compare_mime)
if self.should_copy:
self.safe_copy()
self.write_log()
class KittenGroomerSpec(KittenGroomerBase):
def __init__(self, root_src=None, root_dst=None):
'''
Initialize the basics of the copy
'''
"""Initialize the basics of the copy."""
if root_src is None:
root_src = os.path.join(os.sep, 'media', 'src')
if root_dst is None:
root_dst = os.path.join(os.sep, 'media', 'dst')
super(KittenGroomerSpec, self).__init__(root_src, root_dst)
self.valid_files = {}
# The initial version will only accept the file extensions/mimetypes listed here.
self.valid_files.update(configfiles)
def _print_log(self):
'''
Print the logs related to the current file being processed
'''
tmp_log = self.log_name.fields(**self.cur_file.log_details)
if not self.cur_file.log_details.get('valid'):
tmp_log.warning(self.cur_file.log_string)
else:
tmp_log.debug(self.cur_file.log_string)
def processdir(self):
'''
Main function doing the processing
'''
"""Main function doing the processing."""
to_copy = []
error = []
for srcpath in self._list_all_files(self.src_root_dir):
valid = True
self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))
expected_mime = self.valid_files.get(self.cur_file.extension)
if expected_mime is None:
# Unexpected extension => disallowed
valid = False
compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys()))
elif self.cur_file.mimetype != expected_mime:
# Unexpected mimetype => disallowed
valid = False
compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime)
self.cur_file.add_log_details('valid', valid)
if valid:
to_copy.append(self.cur_file)
self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype)
else:
error.append(self.cur_file)
if compare_ext is not None:
self.cur_file.log_string = compare_ext
else:
self.cur_file.log_string = compare_mime
if len(error) > 0:
for f in error + to_copy:
self.cur_file = f
self._print_log()
else:
for f in to_copy:
self.cur_file = f
self._safe_copy()
self._print_log()
dstpath = srcpath.replace(self.src_root_dir, self.dst_root_dir)
cur_file = FileSpec(srcpath, dstpath)
cur_file.check()
if __name__ == '__main__':
main(KittenGroomerSpec, ' Only copy some files, returns an error is anything else is found')
exit(0)
~~~
# How to contribute
We welcome contributions (including bug fixes, new code workflows) via pull requests. We are interested in any new workflows
that can be used to improve security in different organizations. If you see any potential enhancements required to support
your sanitization workflow, please feel free to open an issue. Read [CONTRIBUTING.md](/CONTRIBUTING.md) for more information.
We welcome contributions (including bug fixes, new example file processing
workflows) via pull requests. We are particularly interested in any new workflows
that can be used to improve security in different organizations. If you see any
potential enhancements required to support your sanitization workflow, please feel
free to open an issue. Read [CONTRIBUTING.md](/CONTRIBUTING.md) for more
information.
# License

File diff suppressed because it is too large Load Diff

View File

@ -339,7 +339,7 @@ class KittenGroomer(KittenGroomerBase):
archbomb_path = src_dir[:-len('_temp')]
self._safe_remove(archbomb_path)
for srcpath in self._list_all_files(src_dir):
for srcpath in self.list_all_files(src_dir):
self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir))
self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''),

View File

@ -54,7 +54,7 @@ class KittenGroomerPier9(KittenGroomerBase):
'''
Main function doing the processing
'''
for srcpath in self._list_all_files(self.src_root_dir):
for srcpath in self.list_all_files(self.src_root_dir):
self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
self.cur_file = FilePier9(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))
if not self.cur_file.is_dangerous() and self.cur_file.extension in self.authorized_extensions:

View File

@ -54,7 +54,7 @@ class KittenGroomerSpec(KittenGroomerBase):
'''
to_copy = []
error = []
for srcpath in self._list_all_files(self.src_root_dir):
for srcpath in self.list_all_files(self.src_root_dir):
valid = True
self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .helpers import FileBase, KittenGroomerBase, main
from .helpers import FileBase, KittenGroomerBase, GroomerLogger, main

View File

@ -9,13 +9,12 @@ desired behavior.
import os
import sys
import hashlib
import shutil
import argparse
import magic
from twiggy import quick_setup, log
import twiggy
class KittenGroomerError(Exception):
@ -28,197 +27,268 @@ class KittenGroomerError(Exception):
class ImplementationRequired(KittenGroomerError):
"""Implementation required error."""
pass
class FileBase(object):
"""
Base object for individual files in the source directory. Contains file
attributes and various helper methods. Subclass and add attributes
or methods relevant to a given implementation.
Base object for individual files in the source directory.
Contains file attributes and various helper methods.
"""
def __init__(self, src_path, dst_path):
"""Initialized with the source path and expected destination path."""
def __init__(self, src_path, dst_path, logger=None):
"""
Initialized with the source path and expected destination path.
self.logger should be a logging object with an add_file method.
Create various properties and determine the file's mimetype.
"""
self.src_path = src_path
self.dst_path = dst_path
self.log_details = {'filepath': self.src_path}
self.log_string = ''
self._determine_extension()
self._determine_mimetype()
self.filename = os.path.basename(self.src_path)
self.logger = logger
self._file_props = {
'filepath': self.src_path,
'filename': self.filename,
'file_size': self.size,
'maintype': None,
'subtype': None,
'extension': None,
'safety_category': None,
'symlink': False,
'copied': False,
'file_string_set': set(),
'errors': {},
'user_defined': {}
}
self.extension = self._determine_extension()
self.set_property('extension', self.extension)
self.mimetype = self._determine_mimetype()
self.should_copy = True
self.main_type = None
self.sub_type = None
if self.mimetype:
self.main_type, self.sub_type = self._split_subtypes(self.mimetype)
if self.main_type:
self.set_property('maintype', self.main_type)
if self.sub_type:
self.set_property('subtype', self.sub_type)
def _determine_extension(self):
_, ext = os.path.splitext(self.src_path)
self.extension = ext.lower()
ext = ext.lower()
if ext == '':
ext = None
return ext
def _determine_mimetype(self):
if os.path.islink(self.src_path):
# magic will throw an IOError on a broken symlink
self.mimetype = 'inode/symlink'
mimetype = 'inode/symlink'
self.set_property('symlink', os.readlink(self.src_path))
else:
try:
mt = magic.from_file(self.src_path, mime=True)
# magic will always return something, even if it's just 'data'
# Note: magic will always return something, even if it's just 'data'
except UnicodeEncodeError as e:
# FIXME: The encoding of the file is broken (possibly UTF-16)
mt = ''
self.log_details.update({'UnicodeError': e})
# Note: one of the Travis files will trigger this exception
self.add_error(e, '')
mt = None
try:
self.mimetype = mt.decode("utf-8")
mimetype = mt.decode("utf-8")
except:
self.mimetype = mt
if self.mimetype and '/' in self.mimetype:
self.main_type, self.sub_type = self.mimetype.split('/')
mimetype = mt
return mimetype
def _split_subtypes(self, mimetype):
if '/' in mimetype:
main_type, sub_type = mimetype.split('/')
else:
self.main_type = ''
self.sub_type = ''
main_type, sub_type = None, None
return main_type, sub_type
@property
def size(self):
"""Filesize in bytes as an int, 0 if file does not exist."""
try:
size = os.path.getsize(self.src_path)
except FileNotFoundError:
size = 0
return size
@property
def has_mimetype(self):
"""
Returns True if file has a full mimetype, else False.
Returns False + updates log if self.main_type or self.sub_type
are not set.
"""
"""True if file has a main and sub mimetype, else False."""
# TODO: broken mimetype checks should be done somewhere else.
# Should the check be by default or should we let the API consumer write it?
if not self.main_type or not self.sub_type:
self.log_details.update({'broken_mime': True})
return False
return True
def has_extension(self):
"""
Returns True if self.extension is set, else False.
Returns False + updates self.log_details if self.extension is not set.
"""
if self.extension == '':
self.log_details.update({'no_extension': True})
return False
return True
def is_dangerous(self):
"""Returns True if self.log_details contains 'dangerous'."""
return ('dangerous' in self.log_details)
def is_unknown(self):
"""Returns True if self.log_details contains 'unknown'."""
return ('unknown' in self.log_details)
def is_binary(self):
"""returns True if self.log_details contains 'binary'."""
return ('binary' in self.log_details)
def is_symlink(self):
"""Returns True and updates log if file is a symlink."""
if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink':
self.log_details.update({'symlink': os.readlink(self.src_path)})
else:
return True
return False
def add_log_details(self, key, value):
"""Takes a key + a value and adds them to self.log_details."""
self.log_details[key] = value
@property
def has_extension(self):
"""True if self.extension is set, else False."""
if self.extension is None:
return False
else:
return True
def make_dangerous(self):
@property
def is_dangerous(self):
"""True if file has been marked 'dangerous', else False."""
return self._file_props['safety_category'] is 'dangerous'
@property
def is_unknown(self):
"""True if file has been marked 'unknown', else False."""
return self._file_props['safety_category'] is 'unknown'
@property
def is_binary(self):
"""True if file has been marked 'binary', else False."""
return self._file_props['safety_category'] is 'binary'
@property
def is_symlink(self):
"""True if file is a symlink, else False."""
if self._file_props['symlink'] is False:
return False
else:
return True
def set_property(self, prop_string, value):
"""
Marks a file as dangerous.
Take a property and a value and add them to self._file_props.
Prepends and appends DANGEROUS to the destination file name
If prop_string is already in _file_props, set prop_string to value.
If prop_string not in _file_props, set prop_string to value in
_file_props['user_defined'].
"""
if prop_string in self._file_props.keys():
self._file_props[prop_string] = value
else:
self._file_props['user_defined'][prop_string] = value
def get_property(self, file_prop):
"""Get the value for a property in _file_props."""
# TODO: could probably be refactored
if file_prop in self._file_props:
return self._file_props[file_prop]
elif file_prop in self._file_props['user_defined']:
return self._file_props['user_defined'][file_prop]
else:
return None
def add_error(self, error, info):
"""Add an error: info pair to _file_props['errors']."""
self._file_props['errors'].update({error: info})
def add_file_string(self, file_string):
"""Add a file descriptor string to _file_props."""
self._file_props['file_string_set'].add(file_string)
def make_dangerous(self, reason_string=None):
"""
Mark file as dangerous.
Prepend and append DANGEROUS to the destination file name
to help prevent double-click of death.
"""
if self.is_dangerous():
if self.is_dangerous:
return
self.log_details['dangerous'] = True
self.set_property('safety_category', 'dangerous')
# LOG: store reason string somewhere and do something with it
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
def make_unknown(self):
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
if self.is_dangerous() or self.is_binary():
"""Mark file as an unknown type and prepend UNKNOWN to filename."""
if self.is_dangerous or self.is_binary:
return
self.log_details['unknown'] = True
self.set_property('safety_category', 'unknown')
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
def make_binary(self):
"""Marks a file as a binary and appends .bin to filename."""
if self.is_dangerous():
"""Mark file as a binary and append .bin to filename."""
if self.is_dangerous:
return
self.log_details['binary'] = True
self.set_property('safety_category', 'binary')
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
def safe_copy(self, src=None, dst=None):
"""Copy file and create destination directories if needed."""
if src is None:
src = self.src_path
if dst is None:
dst = self.dst_path
try:
dst_path, filename = os.path.split(dst)
if not os.path.exists(dst_path):
os.makedirs(dst_path)
shutil.copy(src, dst)
except Exception as e:
self.add_error(e, '')
def force_ext(self, ext):
"""If dst_path does not end in ext, appends the ext and updates log."""
"""If dst_path does not end in ext, change it and edit _file_props."""
if not self.dst_path.endswith(ext):
self.log_details['force_ext'] = True
self.set_property('force_ext', True)
self.dst_path += ext
if not self._file_props['extension'] == ext:
self.set_property('extension', ext)
def create_metadata_file(self, ext):
"""Create a separate file to hold metadata from this file."""
try:
# make sure we aren't overwriting anything
if os.path.exists(self.src_path + ext):
raise KittenGroomerError("Cannot create split metadata file for \"" +
self.dst_path + "\", type '" +
ext + "': File exists.")
else:
dst_dir_path, filename = os.path.split(self.dst_path)
if not os.path.exists(dst_dir_path):
os.makedirs(dst_dir_path)
# TODO: Check extension for leading "."
self.metadata_file_path = self.dst_path + ext
return self.metadata_file_path
except KittenGroomerError as e:
self.add_error(e, '')
return False
def write_log(self):
"""Write logs from file to self.logger."""
file_log = self.logger.add_file(self)
file_log.fields(**self._file_props)
class KittenGroomerBase(object):
"""Base object responsible for copy/sanitization process."""
class GroomerLogger(object):
"""Groomer logging interface."""
def __init__(self, root_src, root_dst, debug=False):
"""Initialized with path to source and dest directories."""
self.src_root_dir = root_src
self.dst_root_dir = root_dst
self.log_root_dir = os.path.join(self.dst_root_dir, 'logs')
self._safe_rmtree(self.log_root_dir)
self._safe_mkdir(self.log_root_dir)
self.log_processing = os.path.join(self.log_root_dir, 'processing.log')
self.log_content = os.path.join(self.log_root_dir, 'content.log')
self.tree(self.src_root_dir)
quick_setup(file=self.log_processing)
self.log_name = log.name('files')
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
os.environ["PATH"] += os.pathsep + self.resources_path
self.cur_file = None
self.debug = debug
if self.debug:
self.log_debug_err = os.path.join(self.log_root_dir, 'debug_stderr.log')
self.log_debug_out = os.path.join(self.log_root_dir, 'debug_stdout.log')
def __init__(self, root_dir_path, debug=False):
self.root_dir = root_dir_path
self.log_dir_path = os.path.join(root_dir_path, 'logs')
if os.path.exists(self.log_dir_path):
shutil.rmtree(self.log_dir_path)
os.makedirs(self.log_dir_path)
self.log_processing = os.path.join(self.log_dir_path, 'processing.log')
self.log_content = os.path.join(self.log_dir_path, 'content.log')
twiggy.quick_setup(file=self.log_processing)
self.log = twiggy.log.name('files')
if debug:
self.log_debug_err = os.path.join(self.log_dir_path, 'debug_stderr.log')
self.log_debug_out = os.path.join(self.log_dir_path, 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
def _computehash(self, path):
"""Returns a sha256 hash of a file at a given path."""
s = hashlib.sha256()
with open(path, 'rb') as f:
while True:
buf = f.read(0x100000)
if not buf:
break
s.update(buf)
return s.hexdigest()
def tree(self, base_dir, padding=' '):
"""Writes a graphical tree to the log for a given directory."""
if sys.version_info.major == 2:
self.__tree_py2(base_dir, padding)
else:
self.__tree_py3(base_dir, padding)
def __tree_py2(self, base_dir, padding=' '):
with open(self.log_content, 'ab') as lf:
lf.write('#' * 80 + '\n')
lf.write('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir))))
padding += '| '
files = sorted(os.listdir(base_dir))
for f in files:
curpath = os.path.join(base_dir, f)
if os.path.islink(curpath):
lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)))
elif os.path.isdir(curpath):
self.tree(curpath, padding)
elif os.path.isfile(curpath):
lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)))
def __tree_py3(self, base_dir, padding=' '):
"""Write a graphical tree to the log for `base_dir`."""
with open(self.log_content, 'ab') as lf:
lf.write(bytes('#' * 80 + '\n', 'UTF-8'))
lf.write(bytes('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)).encode()), 'utf8'))
@ -233,80 +303,64 @@ class KittenGroomerBase(object):
elif os.path.isfile(curpath):
lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)).encode(errors='ignore'))
# ##### Helpers #####
def _safe_rmtree(self, directory):
def _computehash(self, path):
"""Return a sha256 hash of a file at a given path."""
s = hashlib.sha256()
with open(path, 'rb') as f:
while True:
buf = f.read(0x100000)
if not buf:
break
s.update(buf)
return s.hexdigest()
def add_file(self, file):
"""Add a file to the log."""
return self.log.name('file.src_path')
class KittenGroomerBase(object):
"""Base object responsible for copy/sanitization process."""
def __init__(self, root_src, root_dst, debug=False):
"""Initialized with path to source and dest directories."""
self.src_root_dir = root_src
self.dst_root_dir = root_dst
self.debug = debug
self.cur_file = None
self.logger = GroomerLogger(self.dst_root_dir, debug)
def safe_rmtree(self, directory):
"""Remove a directory tree if it exists."""
if os.path.exists(directory):
shutil.rmtree(directory)
def _safe_remove(self, filepath):
def safe_remove(self, filepath):
"""Remove a file if it exists."""
if os.path.exists(filepath):
os.remove(filepath)
def _safe_mkdir(self, directory):
def safe_mkdir(self, directory):
"""Make a directory if it does not exist."""
if not os.path.exists(directory):
os.makedirs(directory)
def _safe_copy(self, src=None, dst=None):
"""Copy a file and create directory if needed."""
if src is None:
src = self.cur_file.src_path
if dst is None:
dst = self.cur_file.dst_path
try:
dst_path, filename = os.path.split(dst)
self._safe_mkdir(dst_path)
shutil.copy(src, dst)
return True
except Exception as e:
# TODO: Logfile
print(e)
return False
def _safe_metadata_split(self, ext):
"""Create a separate file to hold this file's metadata."""
# TODO: fix logic in this method
dst = self.cur_file.dst_path
try:
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
raise KittenGroomerError("Cannot create split metadata file for \"" +
self.cur_file.dst_path + "\", type '" +
ext + "': File exists.")
dst_path, filename = os.path.split(dst)
self._safe_mkdir(dst_path)
return open(dst + ext, 'w+')
except Exception as e:
# TODO: Logfile
print(e)
return False
def _list_all_files(self, directory):
"""Generate an iterator over all the files in a directory tree."""
def list_all_files(self, directory):
"""Generator yielding path to all of the files in a directory tree."""
for root, dirs, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
yield filepath
def _print_log(self):
"""
Print log, should be called after each file.
You probably want to reimplement it in the subclass.
"""
tmp_log = self.log_name.fields(**self.cur_file.log_details)
tmp_log.info('It did a thing.')
#######################
def processdir(self, src_dir=None, dst_dir=None):
"""
Implement this function in your subclass to define file processing behavior.
"""
# TODO: feels like this function doesn't need to exist if we move main()
def processdir(self, src_dir, dst_dir):
"""Implement this function to define file processing behavior."""
raise ImplementationRequired('Please implement processdir.')
# TODO: Maybe this shouldn't exist? It should probably get moved to filecheck since this isn't really API code
def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'):
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')

View File

@ -4,7 +4,7 @@ from setuptools import setup
setup(
name='kittengroomer',
version='2.1',
version='2.1.0',
author='Raphaël Vinot',
author_email='raphael.vinot@circl.lu',
maintainer='Raphaël Vinot',

View File

View File

@ -6,17 +6,17 @@ def save_logs(groomer, test_description):
test_log_path = 'tests/test_logs/{}.log'.format(test_description)
with open(test_log_path, 'w+') as test_log:
test_log.write(divider.format('TEST LOG'))
with open(groomer.log_processing, 'r') as logfile:
with open(groomer.logger.log_processing, 'r') as logfile:
log = logfile.read()
test_log.write(log)
if groomer.debug:
if os.path.exists(groomer.log_debug_err):
if os.path.exists(groomer.logger.log_debug_err):
test_log.write(divider.format('ERR LOG'))
with open(groomer.log_debug_err, 'r') as debug_err:
with open(groomer.logger.log_debug_err, 'r') as debug_err:
err = debug_err.read()
test_log.write(err)
if os.path.exists(groomer.log_debug_out):
if os.path.exists(groomer.logger.log_debug_out):
test_log.write(divider.format('OUT LOG'))
with open(groomer.log_debug_out, 'r') as debug_out:
with open(groomer.logger.log_debug_out, 'r') as debug_out:
out = debug_out.read()
test_log.write(out)

BIN
tests/src_valid/Example.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import os
import shutil
import pytest
@ -20,29 +21,46 @@ skipif_nodeps = pytest.mark.skipif(NODEPS,
class TestIntegration:
@pytest.fixture
def src_valid(self):
def src_valid_path(self):
return os.path.join(os.getcwd(), 'tests/src_valid')
@pytest.fixture
def src_invalid(self):
def src_invalid_path(self):
return os.path.join(os.getcwd(), 'tests/src_invalid')
@pytest.fixture
def dst(self):
return os.path.join(os.getcwd(), 'tests/dst')
def test_filecheck(self, src_invalid, dst):
groomer = KittenGroomerFileCheck(src_invalid, dst, debug=True)
groomer.processdir()
def test_filecheck_src_invalid(self, src_invalid_path):
dst_path = self.make_dst_dir_path(src_invalid_path)
groomer = KittenGroomerFileCheck(src_invalid_path, dst_path, debug=True)
groomer.run()
test_description = "filecheck_invalid"
save_logs(groomer, test_description)
def test_filecheck_2(self, src_valid, dst):
groomer = KittenGroomerFileCheck(src_valid, dst, debug=True)
groomer.processdir()
def test_filecheck_2(self, src_valid_path):
dst_path = self.make_dst_dir_path(src_valid_path)
groomer = KittenGroomerFileCheck(src_valid_path, dst_path, debug=True)
groomer.run()
test_description = "filecheck_valid"
save_logs(groomer, test_description)
def test_processdir(self):
pass
def test_handle_archives(self):
pass
def make_dst_dir_path(self, src_dir_path):
dst_path = src_dir_path + '_dst'
shutil.rmtree(dst_path, ignore_errors=True)
os.makedirs(dst_path, exist_ok=True)
return dst_path
class TestFileHandling:
pass
def test_autorun(self):
# Run on a single autorun file, confirm that it gets flagged as dangerous
# TODO: build out these and other methods for individual file cases
pass

View File

@ -5,7 +5,7 @@ import os
import pytest
from kittengroomer import FileBase, KittenGroomerBase
from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger
from kittengroomer.helpers import ImplementationRequired
skip = pytest.mark.skip
@ -30,7 +30,7 @@ class TestFileBase:
return FileBase(source_file, dest_file)
@fixture
def symlink(self, tmpdir):
def symlink_file(self, tmpdir):
file_path = tmpdir.join('test.txt')
file_path.write('testing')
file_path = file_path.strpath
@ -65,7 +65,7 @@ class TestFileBase:
@fixture
def file_marked_binary(self, generic_conf_file):
generic_conf_file.mark_binary()
generic_conf_file.make_binary()
return generic_conf_file
@fixture(params=[
@ -81,27 +81,17 @@ class TestFileBase:
# What should FileBase do if it's given a path that isn't a file (doesn't exist or is a dir)? Currently magic throws an exception
# We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log)
def test_create(self):
file = FileBase('tests/src_valid/blah.conf', '/tests/dst/blah.conf')
def test_create_broken(self, tmpdir):
with pytest.raises(TypeError):
file_no_args = FileBase()
FileBase()
with pytest.raises(FileNotFoundError):
file_empty_args = FileBase('', '')
FileBase('', '')
with pytest.raises(IsADirectoryError):
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
# are there other cases here? path to a file that doesn't exist? permissions?
FileBase(tmpdir.strpath, tmpdir.strpath)
# TODO: are there other cases here? path to a file that doesn't exist? permissions?
def test_init(self, generic_conf_file):
file = generic_conf_file
assert file.log_details
assert file.log_details['filepath'] == file.src_path
assert file.extension == '.conf'
copied_log = file.log_details.copy()
file.log_details = ''
# assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable
# we should probably check for more extensions here
generic_conf_file
def test_extension_uppercase(self, tmpdir):
file_path = tmpdir.join('TEST.TXT')
@ -111,43 +101,42 @@ class TestFileBase:
assert file.extension == '.txt'
def test_mimetypes(self, generic_conf_file):
assert generic_conf_file.has_mimetype()
assert generic_conf_file.mimetype == 'text/plain'
assert generic_conf_file.main_type == 'text'
assert generic_conf_file.sub_type == 'plain'
assert generic_conf_file.has_mimetype
# Need to test something without a mimetype
# Need to test something that's a directory
# Need to test something that causes the unicode exception
def test_has_mimetype_no_main_type(self, generic_conf_file):
generic_conf_file.main_type = ''
assert generic_conf_file.has_mimetype() is False
assert generic_conf_file.has_mimetype is False
def test_has_mimetype_no_sub_type(self, generic_conf_file):
generic_conf_file.sub_type = ''
assert generic_conf_file.has_mimetype() is False
assert generic_conf_file.has_mimetype is False
def test_has_extension(self, temp_file, temp_file_no_ext):
assert temp_file.has_extension() is True
assert temp_file_no_ext.has_extension() is False
assert temp_file_no_ext.log_details.get('no_extension') is True
assert temp_file.has_extension is True
print(temp_file_no_ext.extension)
assert temp_file_no_ext.has_extension is False
def test_add_log_details(self, generic_conf_file):
generic_conf_file.add_log_details('test', True)
assert generic_conf_file.log_details['test'] is True
with pytest.raises(KeyError):
assert generic_conf_file.log_details['wrong'] is False
def test_set_property(self, generic_conf_file):
generic_conf_file.set_property('test', True)
assert generic_conf_file.get_property('test') is True
assert generic_conf_file.get_property('wrong') is None
def test_marked_dangerous(self, file_marked_all_parameterized):
file_marked_all_parameterized.make_dangerous()
assert file_marked_all_parameterized.is_dangerous() is True
assert file_marked_all_parameterized.is_dangerous is True
# Should work regardless of weird paths??
# Should check file path alteration behavior as well
def test_generic_dangerous(self, generic_conf_file):
assert generic_conf_file.is_dangerous() is False
assert generic_conf_file.is_dangerous is False
generic_conf_file.make_dangerous()
assert generic_conf_file.is_dangerous() is True
assert generic_conf_file.is_dangerous is True
def test_has_symlink(self, tmpdir):
file_path = tmpdir.join('test.txt')
@ -155,64 +144,88 @@ class TestFileBase:
file_path = file_path.strpath
symlink_path = tmpdir.join('symlinked.txt')
symlink_path = symlink_path.strpath
file_symlink = os.symlink(file_path, symlink_path)
os.symlink(file_path, symlink_path)
file = FileBase(file_path, file_path)
symlink = FileBase(symlink_path, symlink_path)
assert file.is_symlink() is False
assert symlink.is_symlink() is True
assert file.is_symlink is False
assert symlink.is_symlink is True
def test_has_symlink_fixture(self, symlink):
assert symlink.is_symlink() is True
def test_has_symlink_fixture(self, symlink_file):
assert symlink_file.is_symlink is True
def test_generic_make_unknown(self, generic_conf_file):
assert generic_conf_file.log_details.get('unknown') is None
assert generic_conf_file.is_unknown is False
generic_conf_file.make_unknown()
assert generic_conf_file.log_details.get('unknown') is True
assert generic_conf_file.is_unknown
# given a FileBase object with no marking, should do the right things
def test_marked_make_unknown(self, file_marked_all_parameterized):
file = file_marked_all_parameterized
if file.log_details.get('unknown'):
if file.is_unknown:
file.make_unknown()
assert file.log_details.get('unknown') is True
assert file.is_unknown
else:
assert file.log_details.get('unknown') is None
assert file.is_unknown is False
file.make_unknown()
assert file.log_details.get('unknown') is None
assert file.is_unknown is False
# given a FileBase object with an unrecognized marking, should ???
def test_generic_make_binary(self, generic_conf_file):
assert generic_conf_file.log_details.get('binary') is None
assert generic_conf_file.is_binary is False
generic_conf_file.make_binary()
assert generic_conf_file.log_details.get('binary') is True
assert generic_conf_file.is_binary
def test_marked_make_binary(self, file_marked_all_parameterized):
file = file_marked_all_parameterized
if file.log_details.get('dangerous'):
if file.is_dangerous:
file.make_binary()
assert file.log_details.get('binary') is None
assert file.is_binary is False
else:
file.make_binary()
assert file.log_details.get('binary') is True
assert file.is_binary
def test_force_ext_change(self, generic_conf_file):
assert generic_conf_file.has_extension()
assert generic_conf_file.extension == '.conf'
assert generic_conf_file.has_extension
assert generic_conf_file.get_property('extension') == '.conf'
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
generic_conf_file.force_ext('.txt')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt'
assert generic_conf_file.log_details.get('force_ext') is True
# should make a file's extension change
assert generic_conf_file.get_property('force_ext') is True
assert generic_conf_file.get_property('extension') == '.txt'
# should be able to handle weird paths
def test_force_ext_correct(self, generic_conf_file):
assert generic_conf_file.has_extension()
assert generic_conf_file.extension == '.conf'
assert generic_conf_file.has_extension
assert generic_conf_file.get_property('extension') == '.conf'
generic_conf_file.force_ext('.conf')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
assert generic_conf_file.log_details.get('force_ext') is None
assert generic_conf_file.get_property('force_ext') is None
# shouldn't change a file's extension if it already is right
def test_create_metadata_file(self, temp_file):
# Try making a metadata file
metadata_file_path = temp_file.create_metadata_file('.metadata.txt')
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write('Have some metadata!')
# Shouldn't be able to make a metadata file with no extension
assert temp_file.create_metadata_file('') is False
# if metadata file already exists
# if there is no metadata to write should this work?
def test_safe_copy(self, generic_conf_file):
generic_conf_file.safe_copy()
# check that safe copy can handle weird file path inputs
class TestLogger:
@fixture
def generic_logger(self, tmpdir):
return GroomerLogger(tmpdir.strpath)
def test_tree(self, generic_logger):
generic_logger.tree(generic_logger.root_dir)
class TestKittenGroomerBase:
@ -236,39 +249,6 @@ class TestKittenGroomerBase:
debug_groomer = KittenGroomerBase(source_directory,
dest_directory,
debug=True)
# we should maybe protect access to self.current_file in some way?
def test_computehash(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
simple_groomer._computehash(file.strpath)
def test_tree(self, generic_groomer):
generic_groomer.tree(generic_groomer.src_root_dir)
def test_safe_copy(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
testdir = tmpdir.join('testdir')
os.mkdir(testdir.strpath)
filedest = testdir.join('test.txt')
simple_groomer = KittenGroomerBase(tmpdir.strpath, testdir.strpath)
simple_groomer.cur_file = FileBase(file.strpath, filedest.strpath)
assert simple_groomer._safe_copy() is True
#check that it handles weird file path inputs
def test_safe_metadata_split(self, tmpdir):
file = tmpdir.join('test.txt')
file.write('testing')
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
simple_groomer.cur_file = FileBase(file.strpath, file.strpath)
metadata_file = simple_groomer._safe_metadata_split('metadata.log')
metadata_file.write('Have some metadata!')
metadata_file.close()
assert simple_groomer._safe_metadata_split('') is False
# if metadata file already exists
# if there is no metadata to write should this work?
def test_list_all_files(self, tmpdir):
file = tmpdir.join('test.txt')
@ -276,15 +256,6 @@ class TestKittenGroomerBase:
testdir = tmpdir.join('testdir')
os.mkdir(testdir.strpath)
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
files = simple_groomer._list_all_files(simple_groomer.src_root_dir)
files = simple_groomer.list_all_files(simple_groomer.src_root_dir)
assert file.strpath in files
assert testdir.strpath not in files
def test_print_log(self, generic_groomer):
with pytest.raises(AttributeError):
generic_groomer._print_log()
# Kind of a bad test, but this should be implemented by the user anyway
def test_processdir(self, generic_groomer):
with pytest.raises(ImplementationRequired):
generic_groomer.processdir()

12
tests/testfile_catalog.md Normal file
View File

@ -0,0 +1,12 @@
src_invalid
===========
-
src_valid
=========
- Example.jpg: image/jpeg, obtained from wikipedia.org
- blah.conf: text file with a .conf extension