Merge pull request #14 from dputtick/logging

Logging format improvements
pull/15/head
Raphaël Vinot 2017-04-13 22:44:39 +02:00 committed by GitHub
commit f5cc3d7533
11 changed files with 329 additions and 253 deletions

View File

@ -1,6 +1,19 @@
Changelog
=========
2.2.0 (in progress)
---
New features:
- Filecheck.py configuration information is now conveniently held in a Config
object instead of in globals
- New easier to read text-based logger (removed twiggy dependency)
- Various filetypes in filecheck.py now have improved descriptions for log
- Improved the interface for adding file descriptions to files
Fixes:
-
2.1.0
---

View File

@ -6,6 +6,7 @@ import shlex
import subprocess
import zipfile
import argparse
import shutil
import oletools.oleid
import olefile
@ -13,10 +14,11 @@ import officedissector
import warnings
import exifread
from PIL import Image
# TODO: why do we have this import? How does filecheck handle pngs?
# from PIL import PngImagePlugin
from pdfid import PDFiD, cPDFiD
from kittengroomer import FileBase, KittenGroomerBase
from kittengroomer import FileBase, KittenGroomerBase, Logging
SEVENZ_PATH = '/usr/bin/7z'
@ -86,8 +88,10 @@ class Config:
class File(FileBase):
def __init__(self, src_path, dst_path, logger):
super(File, self).__init__(src_path, dst_path, logger)
super(File, self).__init__(src_path, dst_path)
self.is_recursive = False
self.logger = logger
self.tempdir_path = self.dst_path + '_temp'
subtypes_apps = [
(Config.mimes_office, self._winoffice),
@ -123,11 +127,11 @@ class File(FileBase):
def _check_dangerous(self):
if not self.has_mimetype:
self.make_dangerous('no mimetype')
self.make_dangerous('File has no mimetype')
if not self.has_extension:
self.make_dangerous('no extension')
self.make_dangerous('File has no extension')
if self.extension in Config.malicious_exts:
self.make_dangerous('malicious_extension')
self.make_dangerous('Extension identifies file as potentially dangerous')
def _check_extension(self):
"""
@ -147,8 +151,7 @@ class File(FileBase):
expected_mimetype = Config.aliases[expected_mimetype]
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and expected_mimetype != self.mimetype:
# LOG: improve this string
self.make_dangerous('expected_mimetype')
self.make_dangerous('Mimetype does not match expected mimetype for this extension')
def _check_mimetype(self):
"""
@ -165,18 +168,17 @@ class File(FileBase):
strict=False)
if expected_extensions:
if self.has_extension and self.extension not in expected_extensions:
# LOG: improve this string
self.make_dangerous('expected extensions')
self.make_dangerous('Extension does not match expected extensions for this mimetype')
def _check_filename(self):
if self.filename[0] is '.':
# handle dotfiles
# TODO: handle dotfiles here
pass
right_to_left_override = u"\u202E"
if right_to_left_override in self.filename:
self.make_dangerous('Filename contains dangerous character')
self.dst_path = self.dst_path.replace(right_to_left_override, '')
# TODO: change self.filename and'filename' property?
# TODO: change self.filename and'filename' property? Or should those reflect the values on the source key
def check(self):
self._check_dangerous()
@ -188,6 +190,15 @@ class File(FileBase):
if not self.is_dangerous:
self.mime_processing_options.get(self.main_type, self.unknown)()
def write_log(self):
props = self.get_all_props()
if not self.is_recursive:
if os.path.exists(self.tempdir_path):
# Hack to make images appear at the correct tree depth in log
self.logger.add_file(self.src_path, props, in_tempdir=True)
return
self.logger.add_file(self.src_path, props)
# ##### Helper functions #####
def _make_method_dict(self, list_of_tuples):
"""Returns a dictionary with mimetype: method pairs."""
@ -206,7 +217,6 @@ class File(FileBase):
def make_tempdir(self):
"""Make a temporary directory at self.tempdir_path."""
self.tempdir_path = self.dst_path + '_temp'
if not os.path.exists(self.tempdir_path):
os.makedirs(self.tempdir_path)
return self.tempdir_path
@ -217,52 +227,50 @@ class File(FileBase):
"""Empty file or symlink."""
if self.is_symlink:
symlink_path = self.get_property('symlink')
self.add_file_string('Symlink to {}'.format(symlink_path))
self.add_description('File is a symlink to {}'.format(symlink_path))
else:
self.add_file_string('Inode file')
self.add_description('File is an inode (empty file)')
self.should_copy = False
def unknown(self):
"""Main type should never be unknown."""
self.add_file_string('Unknown file')
self.add_description('Unknown mimetype')
self.should_copy = False
def example(self):
"""Used in examples, should never be returned by libmagic."""
self.add_file_string('Example file')
self.add_description('Example file')
self.should_copy = False
def multipart(self):
"""Used in web apps, should never be returned by libmagic"""
self.add_file_string('Multipart file')
self.add_description('Multipart file - usually found in web apps')
self.should_copy = False
# ##### Treated as malicious, no reason to have it on a USB key ######
def message(self):
"""Process a message file."""
self.add_file_string('Message file')
self.make_dangerous('Message file')
self.make_dangerous('Message file - should not be found on USB key')
def model(self):
"""Process a model file."""
self.add_file_string('Model file')
self.make_dangerous('Model file')
self.make_dangerous('Model file - should not be found on USB key')
# ##### Files that will be converted ######
def text(self):
"""Process an rtf, ooxml, or plaintext file."""
for mt in Config.mimes_rtf:
if mt in self.sub_type:
self.add_file_string('Rich Text file')
self.add_description('Rich Text (rtf) file')
# TODO: need a way to convert it to plain text
self.force_ext('.txt')
return
for mt in Config.mimes_ooxml:
if mt in self.sub_type:
self.add_file_string('OOXML File')
self.add_description('OOXML (openoffice) file')
self._ooxml()
return
self.add_file_string('Text file')
self.add_description('Plain text file')
self.force_ext('.txt')
def application(self):
@ -272,103 +280,98 @@ class File(FileBase):
# TODO: should we change the logic so we don't iterate through all of the subtype methods?
# TODO: should these methods return a value?
method()
self.add_file_string('Application file')
return
self.add_file_string('Unknown Application file')
self._unknown_app()
def _executables(self):
"""Process an executable file."""
# LOG: change the processing_type property to some other name or include in file_string
self.set_property('processing_type', 'executable')
self.make_dangerous('executable')
self.make_dangerous('Executable file')
def _winoffice(self):
"""Process a winoffice file using olefile/oletools."""
# LOG: processing_type property
self.set_property('processing_type', 'WinOffice')
oid = oletools.oleid.OleID(self.src_path) # First assume a valid file
if not olefile.isOleFile(self.src_path):
# Manual processing, may already count as suspicious
try:
ole = olefile.OleFileIO(self.src_path, raise_defects=olefile.DEFECT_INCORRECT)
except:
self.make_dangerous('not parsable')
self.make_dangerous('Unparsable WinOffice file')
if ole.parsing_issues:
self.make_dangerous('parsing issues')
self.make_dangerous('Parsing issues with WinOffice file')
else:
if ole.exists('macros/vba') or ole.exists('Macros') \
or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'):
self.make_dangerous('macro')
self.make_dangerous('WinOffice file containing a macro')
else:
indicators = oid.check()
# Encrypted can be set by multiple checks on the script
if oid.encrypted.value:
self.make_dangerous('encrypted')
self.make_dangerous('Encrypted WinOffice file')
if oid.macros.value or oid.ole.exists('macros/vba') or oid.ole.exists('Macros') \
or oid.ole.exists('_VBA_PROJECT_CUR') or oid.ole.exists('VBA'):
self.make_dangerous('macro')
self.make_dangerous('WinOffice file containing a macro')
for i in indicators:
if i.id == 'ObjectPool' and i.value:
# TODO: Is it suspicious?
# TODO: is having an ObjectPool suspicious?
# LOG: user defined property
self.set_property('objpool', True)
self.add_description('WinOffice file containing an object pool')
elif i.id == 'flash' and i.value:
self.make_dangerous('flash')
self.make_dangerous('WinOffice file with embedded flash')
self.add_description('WinOffice file')
def _ooxml(self):
"""Process an ooxml file."""
# LOG: processing_type property
self.set_property('processing_type', 'ooxml')
try:
doc = officedissector.doc.Document(self.src_path)
except Exception:
self.make_dangerous('invalid ooxml file')
self.make_dangerous('Invalid ooxml file')
return
# There are probably other potentially malicious features:
# fonts, custom props, custom XML
if doc.is_macro_enabled or len(doc.features.macros) > 0:
self.make_dangerous('macro')
self.make_dangerous('Ooxml file containing macro')
if len(doc.features.embedded_controls) > 0:
self.make_dangerous('activex')
self.make_dangerous('Ooxml file with activex')
if len(doc.features.embedded_objects) > 0:
# Exploited by CVE-2014-4114 (OLE)
self.make_dangerous('embedded obj')
self.make_dangerous('Ooxml file with embedded objects')
if len(doc.features.embedded_packages) > 0:
self.make_dangerous('embedded pack')
self.make_dangerous('Ooxml file with embedded packages')
def _libreoffice(self):
"""Process a libreoffice file."""
self.set_property('processing_type', 'libreoffice')
# As long as there is no way to do a sanity check on the files => dangerous
try:
lodoc = zipfile.ZipFile(self.src_path, 'r')
except:
# TODO: are there specific exceptions we should catch here? Or is anything ok
self.make_dangerous('invalid libreoffice file')
# TODO: are there specific exceptions we should catch here? Or should it be everything
self.make_dangerous('Invalid libreoffice file')
for f in lodoc.infolist():
fname = f.filename.lower()
if fname.startswith('script') or fname.startswith('basic') or \
fname.startswith('object') or fname.endswith('.bin'):
self.make_dangerous('macro')
self.make_dangerous('Libreoffice file containing executable code')
if not self.is_dangerous:
self.add_description('Libreoffice file')
def _pdf(self):
"""Process a PDF file."""
# LOG: processing_type property
self.set_property('processing_type', 'pdf')
xmlDoc = PDFiD(self.src_path)
oPDFiD = cPDFiD(xmlDoc, True)
# TODO: are there other characteristics which should be dangerous?
# TODO: are there other pdf characteristics which should be dangerous?
if oPDFiD.encrypt.count > 0:
self.make_dangerous('encrypted pdf')
self.make_dangerous('Encrypted pdf')
if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0:
self.make_dangerous('pdf with javascript')
self.make_dangerous('Pdf with embedded javascript')
if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0:
self.make_dangerous('openaction')
self.make_dangerous('Pdf with openaction(s)')
if oPDFiD.richmedia.count > 0:
self.make_dangerous('flash')
self.make_dangerous('Pdf containing flash')
if oPDFiD.launch.count > 0:
self.make_dangerous('launch')
self.make_dangerous('Pdf with launch action(s)')
if not self.is_dangerous:
self.add_description('Pdf file')
def _archive(self):
"""
@ -378,24 +381,26 @@ class File(FileBase):
is called on that directory. The recursive archive depth is increased
to protect against archive bombs.
"""
# LOG: change this to something archive specific
self.set_property('processing_type', 'archive')
# TODO: change this to something archive type specific instead of generic 'Archive'
self.add_description('Archive')
self.should_copy = False
self.is_recursive = True
def _unknown_app(self):
"""Process an unknown file."""
self.add_description('Unknown application file')
self.make_unknown()
def _binary_app(self):
"""Process an unknown binary file."""
self.add_description('Unknown binary file')
self.make_binary()
#######################
# Metadata extractors
def _metadata_exif(self, metadata_file_path):
"""Read exif metadata from a jpg or tiff file using exifread."""
# TODO: this method is kind of long, can we shorten it somehow?
# TODO: can we shorten this method somehow?
img = open(self.src_path, 'rb')
tags = None
try:
@ -419,7 +424,7 @@ class File(FileBase):
tag_string = str(tag_value)
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string))
# LOG: how do we want to log metadata?
# TODO: how do we want to log metadata?
self.set_property('metadata', 'exif')
img.close()
return True
@ -437,8 +442,7 @@ class File(FileBase):
# LOG: handle metadata
self.set_property('metadata', 'png')
img.close()
# Catch decompression bombs
except Exception as e:
except Exception as e: # Catch decompression bombs
# TODO: only catch DecompressionBombWarnings here?
self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path))
self.make_dangerous('exception processing metadata')
@ -457,17 +461,17 @@ class File(FileBase):
# ##### Media - audio and video aren't converted ######
def audio(self):
"""Process an audio file."""
self.log_string += 'Audio file'
self.add_description('Audio file')
self._media_processing()
def video(self):
"""Process a video."""
self.log_string += 'Video file'
self.add_description('Video file')
self._media_processing()
def _media_processing(self):
"""Generic way to process all media files."""
self.set_property('processing_type', 'media')
self.add_description('Media file')
def image(self):
"""
@ -492,30 +496,113 @@ class File(FileBase):
except Exception as e: # Catch decompression bombs
# TODO: change this from all Exceptions to specific DecompressionBombWarning
self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path))
self.make_dangerous()
self.add_file_string('Image file')
self.set_property('processing_type', 'image')
self.make_dangerous('Image file containing decompression bomb')
if not self.is_dangerous:
self.add_description('Image file')
class GroomerLogger(object):
"""Groomer logging interface."""
def __init__(self, src_root_path, dst_root_path, debug=False):
self._src_root_path = src_root_path
self._dst_root_path = dst_root_path
self._log_dir_path = self._make_log_dir(dst_root_path)
self.log_path = os.path.join(self._log_dir_path, 'circlean_log.txt')
self._add_root_dir(src_root_path)
if debug:
self.log_debug_err = os.path.join(self._log_dir_path, 'debug_stderr.log')
self.log_debug_out = os.path.join(self._log_dir_path, 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
def _make_log_dir(self, root_dir_path):
"""Make the directory in the dest dir that will hold the logs"""
log_dir_path = os.path.join(root_dir_path, 'logs')
if os.path.exists(log_dir_path):
shutil.rmtree(log_dir_path)
os.makedirs(log_dir_path)
return log_dir_path
def _add_root_dir(self, root_path):
dirname = os.path.split(root_path)[1] + '/'
with open(self.log_path, mode='ab') as lf:
lf.write(bytes(dirname, 'utf-8'))
lf.write(b'\n')
def add_file(self, file_path, file_props, in_tempdir=False):
"""Add a file to the log. Takes a dict of file properties."""
# TODO: fix var names in this method
# TODO: handle symlinks better: symlink_string = '{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath))
props = file_props
depth = self._get_path_depth(file_path)
description_string = ', '.join(props['description_string'])
file_hash = Logging.computehash(file_path)[:6]
if props['safety_category'] is None:
descr_cat = "Normal"
else:
descr_cat = props['safety_category'].capitalize()
# TODO: make size adjust to MB/GB for large files
size = str(props['file_size']) + 'B'
file_template = "+- {name} ({sha_hash}): {size}, {mt}/{st}. {desc}: {desc_str}"
file_string = file_template.format(
name=props['filename'],
sha_hash=file_hash,
size=size,
mt=props['maintype'],
st=props['subtype'],
desc=descr_cat,
desc_str=description_string,
# errs='' # TODO: add errors in human readable form here
)
if in_tempdir:
depth -= 1
self._write_line_to_log(file_string, depth)
def add_dir(self, dir_path):
path_depth = self._get_path_depth(dir_path)
dirname = os.path.split(dir_path)[1] + '/'
log_line = '+- ' + dirname
self._write_line_to_log(log_line, path_depth)
def _get_path_depth(self, path):
if self._dst_root_path in path:
base_path = self._dst_root_path
elif self._src_root_path in path:
base_path = self._src_root_path
relpath = os.path.relpath(path, base_path)
path_depth = relpath.count(os.path.sep)
return path_depth
def _write_line_to_log(self, line, indentation_depth):
padding = b' '
padding += b'| ' * indentation_depth
line_bytes = os.fsencode(line)
with open(self.log_path, mode='ab') as lf:
lf.write(padding)
lf.write(line_bytes)
lf.write(b'\n')
class KittenGroomerFileCheck(KittenGroomerBase):
def __init__(self, root_src, root_dst, max_recursive_depth=2, debug=False):
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst)
self.recursive_archive_depth = 0
self.max_recursive_depth = max_recursive_depth
self.cur_file = None
self.logger = GroomerLogger(root_src, root_dst, debug)
def process_dir(self, src_dir, dst_dir):
"""Process a directory on the source key."""
self.logger.tree(src_dir)
for srcpath in self.list_all_files(src_dir):
dstpath = srcpath.replace(src_dir, dst_dir)
# TODO: Can we clean up the way we handle relative_path?
# Relative path is here so that when we print files in the log it
# shows only the file's path. Should we just pass it to the logger
# when we create it? Or let the logger figure it out?
# relative_path = srcpath.replace(src_dir + '/', '')
self.cur_file = File(srcpath, dstpath, self.logger)
self.process_file(self.cur_file)
for srcpath in self.list_files_dirs(src_dir):
if os.path.isdir(srcpath):
self.logger.add_dir(srcpath)
else:
dstpath = os.path.join(dst_dir, os.path.basename(srcpath))
self.cur_file = File(srcpath, dstpath, self.logger)
self.process_file(self.cur_file)
def process_file(self, file):
"""
@ -525,12 +612,13 @@ class KittenGroomerFileCheck(KittenGroomerBase):
the file to the destionation key, and clean up temporary directory.
"""
file.check()
if file.is_recursive:
self.process_archive(file)
elif file.should_copy:
if file.should_copy:
file.safe_copy()
file.set_property('copied', True)
file.write_log()
file.write_log()
if file.is_recursive:
self.process_archive(file)
# TODO: Can probably handle cleaning up the tempdir better
if hasattr(file, 'tempdir_path'):
self.safe_rmtree(file.tempdir_path)
@ -542,17 +630,17 @@ class KittenGroomerFileCheck(KittenGroomerBase):
to an archive.
"""
self.recursive_archive_depth += 1
# LOG: write_log or somehow log the archive file here
if self.recursive_archive_depth >= self.max_recursive_depth:
file.make_dangerous('Archive bomb')
else:
tempdir_path = file.make_tempdir()
# TODO: double check we are properly escaping file.src_path
# otherwise we are running unvalidated user input directly in the shell
# otherwise we are running unsanitized user input directly in the shell
command_str = '{} -p1 x "{}" -o"{}" -bd -aoa'
unpack_command = command_str.format(SEVENZ_PATH,
file.src_path, tempdir_path)
self._run_process(unpack_command)
file.write_log()
self.process_dir(tempdir_path, file.dst_path)
self.safe_rmtree(tempdir_path)
self.recursive_archive_depth -= 1
@ -567,8 +655,19 @@ class KittenGroomerFileCheck(KittenGroomerBase):
return
return True
def list_files_dirs(self, root_dir_path):
queue = []
for path in sorted(os.listdir(root_dir_path), key=lambda x: str.lower(x)):
full_path = os.path.join(root_dir_path, path)
if os.path.isdir(full_path):
queue.append(full_path)
queue += self.list_files_dirs(full_path) # if path is a dir, recurse through its contents
elif os.path.isfile(full_path):
queue.append(full_path)
return queue
def run(self):
self.process_dir(self.src_root_dir, self.dst_root_dir)
self.process_dir(self.src_root_path, self.dst_root_path)
def main(kg_implementation, description):

View File

@ -1,4 +1,3 @@
twiggy
python-magic
pytest
pytest-cov

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .helpers import FileBase, KittenGroomerBase, GroomerLogger, main
from .helpers import FileBase, KittenGroomerBase, Logging, main

View File

@ -3,8 +3,8 @@
"""
Contains the base objects for use when creating a sanitizer using
PyCIRCLean. Subclass FileBase and KittenGroomerBase to implement your
desired behavior.
PyCIRCLean. Subclass or import from FileBase/KittenGroomerBase and implement
your desired behavior.
"""
@ -14,7 +14,6 @@ import shutil
import argparse
import magic
import twiggy
class KittenGroomerError(Exception):
@ -37,17 +36,15 @@ class FileBase(object):
Contains file attributes and various helper methods.
"""
def __init__(self, src_path, dst_path, logger=None):
def __init__(self, src_path, dst_path):
"""
Initialized with the source path and expected destination path.
self.logger should be a logging object with an add_file method.
Create various properties and determine the file's mimetype.
"""
self.src_path = src_path
self.dst_path = dst_path
self.filename = os.path.basename(self.src_path)
self.logger = logger
self._file_props = {
'filepath': self.src_path,
'filename': self.filename,
@ -58,7 +55,7 @@ class FileBase(object):
'safety_category': None,
'symlink': False,
'copied': False,
'file_string_set': set(),
'description_string': [], # array of descriptions to be joined
'errors': {},
'user_defined': {}
}
@ -90,9 +87,9 @@ class FileBase(object):
else:
try:
mt = magic.from_file(self.src_path, mime=True)
# Note: magic will always return something, even if it's just 'data'
# Note: libmagic will always return something, even if it's just 'data'
except UnicodeEncodeError as e:
# FIXME: The encoding of the file is broken (possibly UTF-16)
# FIXME: The encoding of the file that triggers this is broken (possibly it's UTF-16 and Python expects utf8)
# Note: one of the Travis files will trigger this exception
self.add_error(e, '')
mt = None
@ -121,8 +118,6 @@ class FileBase(object):
@property
def has_mimetype(self):
"""True if file has a main and sub mimetype, else False."""
# TODO: broken mimetype checks should be done somewhere else.
# Should the check be by default or should we let the API consumer write it?
if not self.main_type or not self.sub_type:
return False
else:
@ -161,34 +156,47 @@ class FileBase(object):
def set_property(self, prop_string, value):
"""
Take a property and a value and add them to self._file_props.
Take a property and a value and add them to the file's property dict.
If prop_string is already in _file_props, set prop_string to value.
If prop_string not in _file_props, set prop_string to value in
_file_props['user_defined'].
If `prop_string` is part of the file property API, set it to `value`.
Otherwise, add `prop_string`: `value` to `user_defined` properties.
"""
if prop_string in self._file_props.keys():
if prop_string is 'description_string':
if prop_string not in self._file_props['description_string']:
self._file_props['description_string'].append(value)
elif prop_string in self._file_props.keys():
self._file_props[prop_string] = value
else:
self._file_props['user_defined'][prop_string] = value
def get_property(self, file_prop):
"""Get the value for a property in _file_props."""
# TODO: could probably be refactored
if file_prop in self._file_props:
return self._file_props[file_prop]
elif file_prop in self._file_props['user_defined']:
return self._file_props['user_defined'][file_prop]
def get_property(self, prop_string):
"""
Get the value for a property stored on the file.
Returns `None` if `prop_string` cannot be found on the file.
"""
if prop_string in self._file_props:
return self._file_props[prop_string]
elif prop_string in self._file_props['user_defined']:
return self._file_props['user_defined'][prop_string]
else:
return None
def add_error(self, error, info):
"""Add an error: info pair to _file_props['errors']."""
self._file_props['errors'].update({error: info})
def get_all_props(self):
"""Return a dict containing all stored properties of this file."""
return self._file_props
def add_file_string(self, file_string):
"""Add a file descriptor string to _file_props."""
self._file_props['file_string_set'].add(file_string)
def add_error(self, error, info_string):
"""Add an `error`: `info_string` pair to the file."""
self._file_props['errors'].update({error: info_string})
def add_description(self, description_string):
"""
Add a description string to the file.
If `description_string` is already present, will prevent duplicates.
"""
self.set_property('description_string', description_string)
def make_dangerous(self, reason_string=None):
"""
@ -198,9 +206,10 @@ class FileBase(object):
to help prevent double-click of death.
"""
if self.is_dangerous:
self.set_property('description_string', reason_string)
return
self.set_property('safety_category', 'dangerous')
# LOG: store reason string somewhere and do something with it
self.set_property('description_string', reason_string)
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
@ -235,76 +244,50 @@ class FileBase(object):
self.add_error(e, '')
def force_ext(self, ext):
"""If dst_path does not end in ext, change it and edit _file_props."""
"""If dst_path does not end in ext, append .ext to it."""
ext = self._check_leading_dot(ext)
if not self.dst_path.endswith(ext):
self.set_property('force_ext', True)
# LOG: do we want to log that the extension was changed as below?
# self.set_property('force_ext', True)
self.dst_path += ext
if not self._file_props['extension'] == ext:
self.set_property('extension', ext)
def create_metadata_file(self, ext):
"""Create a separate file to hold metadata from this file."""
"""
Create a separate file to hold extracted metadata.
The string `ext` will be used as the extension for the metadata file.
"""
ext = self._check_leading_dot(ext)
try:
# make sure we aren't overwriting anything
if os.path.exists(self.src_path + ext):
raise KittenGroomerError("Cannot create split metadata file for \"" +
self.dst_path + "\", type '" +
ext + "': File exists.")
err_str = ("Could not create metadata file for \"" +
self.filename +
"\": a file with that path already exists.")
raise KittenGroomerError(err_str)
else:
dst_dir_path, filename = os.path.split(self.dst_path)
if not os.path.exists(dst_dir_path):
os.makedirs(dst_dir_path)
# TODO: Check extension for leading "."
self.metadata_file_path = self.dst_path + ext
return self.metadata_file_path
except KittenGroomerError as e:
self.add_error(e, '')
return False
def write_log(self):
"""Write logs from file to self.logger."""
file_log = self.logger.add_file(self)
file_log.fields(**self._file_props)
def _check_leading_dot(self, ext):
if len(ext) > 0:
if not ext.startswith('.'):
return '.' + ext
return ext
class GroomerLogger(object):
"""Groomer logging interface."""
class Logging(object):
def __init__(self, root_dir_path, debug=False):
self.root_dir = root_dir_path
self.log_dir_path = os.path.join(root_dir_path, 'logs')
if os.path.exists(self.log_dir_path):
shutil.rmtree(self.log_dir_path)
os.makedirs(self.log_dir_path)
self.log_processing = os.path.join(self.log_dir_path, 'processing.log')
self.log_content = os.path.join(self.log_dir_path, 'content.log')
twiggy.quick_setup(file=self.log_processing)
self.log = twiggy.log.name('files')
if debug:
self.log_debug_err = os.path.join(self.log_dir_path, 'debug_stderr.log')
self.log_debug_out = os.path.join(self.log_dir_path, 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
def tree(self, base_dir, padding=' '):
"""Write a graphical tree to the log for `base_dir`."""
with open(self.log_content, 'ab') as lf:
lf.write(bytes('#' * 80 + '\n', 'UTF-8'))
lf.write(bytes('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)).encode()), 'utf8'))
padding += '| '
files = sorted(os.listdir(base_dir))
for f in files:
curpath = os.path.join(base_dir, f)
if os.path.islink(curpath):
lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)).encode(errors='ignore'))
elif os.path.isdir(curpath):
self.tree(curpath, padding)
elif os.path.isfile(curpath):
lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)).encode(errors='ignore'))
def _computehash(self, path):
"""Return a sha256 hash of a file at a given path."""
@staticmethod
def computehash(path):
"""Return the sha256 hash of a file at a given path."""
s = hashlib.sha256()
with open(path, 'rb') as f:
while True:
@ -314,53 +297,47 @@ class GroomerLogger(object):
s.update(buf)
return s.hexdigest()
def add_file(self, file):
"""Add a file to the log."""
return self.log.name('file.src_path')
class KittenGroomerBase(object):
"""Base object responsible for copy/sanitization process."""
def __init__(self, root_src, root_dst, debug=False):
def __init__(self, src_root_path, dst_root_path):
"""Initialized with path to source and dest directories."""
self.src_root_dir = root_src
self.dst_root_dir = root_dst
self.debug = debug
self.cur_file = None
self.logger = GroomerLogger(self.dst_root_dir, debug)
self.src_root_path = os.path.abspath(src_root_path)
self.dst_root_path = os.path.abspath(dst_root_path)
def safe_rmtree(self, directory):
def safe_rmtree(self, directory_path):
"""Remove a directory tree if it exists."""
if os.path.exists(directory):
shutil.rmtree(directory)
if os.path.exists(directory_path):
shutil.rmtree(directory_path)
def safe_remove(self, filepath):
"""Remove a file if it exists."""
if os.path.exists(filepath):
os.remove(filepath)
def safe_remove(self, file_path):
"""Remove file at file_path if it exists."""
if os.path.exists(file_path):
os.remove(file_path)
def safe_mkdir(self, directory):
def safe_mkdir(self, directory_path):
"""Make a directory if it does not exist."""
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
def list_all_files(self, directory):
def list_all_files(self, directory_path):
"""Generator yielding path to all of the files in a directory tree."""
for root, dirs, files in os.walk(directory):
for root, dirs, files in os.walk(directory_path):
# files is a list anyway so we don't get much from using a generator here
for filename in files:
filepath = os.path.join(root, filename)
yield filepath
#######################
# TODO: feels like this function doesn't need to exist if we move main()
# TODO: if we move main() we can get rid of this as well
def processdir(self, src_dir, dst_dir):
"""Implement this function to define file processing behavior."""
raise ImplementationRequired('Please implement processdir.')
# TODO: Maybe this shouldn't exist? It should probably get moved to filecheck since this isn't really API code
# TODO: Should this get moved to filecheck? It isn't really API code and somebody can implement it themselves
def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'):
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')

View File

@ -23,5 +23,5 @@ setup(
'Topic :: Communications :: File Sharing',
'Topic :: Security',
],
install_requires=['twiggy', 'python-magic'],
install_requires=['python-magic'],
)

View File

@ -1,22 +1,28 @@
import os
from datetime import datetime
def save_logs(groomer, test_description):
divider = ('=' * 10 + '{}' + '=' * 10 + '\n')
test_log_path = 'tests/test_logs/{}.log'.format(test_description)
with open(test_log_path, 'w+') as test_log:
test_log.write(divider.format('TEST LOG'))
with open(groomer.logger.log_processing, 'r') as logfile:
time_now = str(datetime.now().time()) + '\n'
with open(test_log_path, 'wb+') as test_log:
log_header = divider.format('TEST LOG')
test_log.write(bytes(log_header, encoding='utf-8'))
test_log.write(bytes(time_now, encoding='utf-8'))
test_log.write(bytes(test_description, encoding='utf-8'))
test_log.write(b'\n')
test_log.write(b'-' * 20 + b'\n')
with open(groomer.logger.log_path, 'rb') as logfile:
log = logfile.read()
test_log.write(log)
if groomer.debug:
if os.path.exists(groomer.logger.log_debug_err):
test_log.write(divider.format('ERR LOG'))
with open(groomer.logger.log_debug_err, 'r') as debug_err:
err = debug_err.read()
test_log.write(err)
if os.path.exists(groomer.logger.log_debug_out):
test_log.write(divider.format('OUT LOG'))
with open(groomer.logger.log_debug_out, 'r') as debug_out:
out = debug_out.read()
test_log.write(out)
if os.path.exists(groomer.logger.log_debug_err):
test_log.write(bytes(divider.format('ERR LOG'), encoding='utf-8'))
with open(groomer.logger.log_debug_err, 'rb') as debug_err:
err = debug_err.read()
test_log.write(err)
if os.path.exists(groomer.logger.log_debug_out):
test_log.write(bytes(divider.format('OUT LOG'), encoding='utf-8'))
with open(groomer.logger.log_debug_out, 'rb') as debug_out:
out = debug_out.read()
test_log.write(out)

View File

@ -0,0 +1 @@
This is a test.

BIN
tests/src_valid/test.zip Normal file

Binary file not shown.

View File

@ -13,44 +13,26 @@ try:
except ImportError:
NODEPS = True
fixture = pytest.fixture
skip = pytest.mark.skip
skipif_nodeps = pytest.mark.skipif(NODEPS,
reason="Dependencies aren't installed")
@skipif_nodeps
class TestIntegration:
class TestSystem:
@pytest.fixture
def src_valid_path(self):
return os.path.join(os.getcwd(), 'tests/src_valid')
@fixture
def valid_groomer(self):
src_path = os.path.join(os.getcwd(), 'tests/src_valid')
dst_path = self.make_dst_dir_path(src_path)
return KittenGroomerFileCheck(src_path, dst_path, debug=True)
@pytest.fixture
def src_invalid_path(self):
return os.path.join(os.getcwd(), 'tests/src_invalid')
@pytest.fixture
def dst(self):
return os.path.join(os.getcwd(), 'tests/dst')
def test_filecheck_src_invalid(self, src_invalid_path):
dst_path = self.make_dst_dir_path(src_invalid_path)
groomer = KittenGroomerFileCheck(src_invalid_path, dst_path, debug=True)
groomer.run()
test_description = "filecheck_invalid"
save_logs(groomer, test_description)
def test_filecheck_2(self, src_valid_path):
dst_path = self.make_dst_dir_path(src_valid_path)
groomer = KittenGroomerFileCheck(src_valid_path, dst_path, debug=True)
groomer.run()
test_description = "filecheck_valid"
save_logs(groomer, test_description)
def test_processdir(self):
pass
def test_handle_archives(self):
pass
@fixture
def invalid_groomer(self):
src_path = os.path.join(os.getcwd(), 'tests/src_invalid')
dst_path = self.make_dst_dir_path(src_path)
return KittenGroomerFileCheck(src_path, dst_path, debug=True)
def make_dst_dir_path(self, src_dir_path):
dst_path = src_dir_path + '_dst'
@ -58,6 +40,16 @@ class TestIntegration:
os.makedirs(dst_path, exist_ok=True)
return dst_path
def test_filecheck_src_valid(self, valid_groomer):
valid_groomer.run()
test_description = "filecheck_valid"
save_logs(valid_groomer, test_description)
def test_filecheck_src_invalid(self, invalid_groomer):
invalid_groomer.run()
test_description = "filecheck_invalid"
save_logs(invalid_groomer, test_description)
class TestFileHandling:
def test_autorun(self):

View File

@ -5,8 +5,7 @@ import os
import pytest
from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger
from kittengroomer.helpers import ImplementationRequired
from kittengroomer import FileBase, KittenGroomerBase
skip = pytest.mark.skip
xfail = pytest.mark.xfail
@ -190,7 +189,6 @@ class TestFileBase:
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
generic_conf_file.force_ext('.txt')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt'
assert generic_conf_file.get_property('force_ext') is True
assert generic_conf_file.get_property('extension') == '.txt'
# should be able to handle weird paths
@ -203,7 +201,6 @@ class TestFileBase:
# shouldn't change a file's extension if it already is right
def test_create_metadata_file(self, temp_file):
# Try making a metadata file
metadata_file_path = temp_file.create_metadata_file('.metadata.txt')
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write('Have some metadata!')
@ -219,12 +216,7 @@ class TestFileBase:
class TestLogger:
@fixture
def generic_logger(self, tmpdir):
return GroomerLogger(tmpdir.strpath)
def test_tree(self, generic_logger):
generic_logger.tree(generic_logger.root_dir)
pass
class TestKittenGroomerBase:
@ -245,10 +237,7 @@ class TestKittenGroomerBase:
assert generic_groomer
def test_instantiation(self, source_directory, dest_directory):
groomer = KittenGroomerBase(source_directory, dest_directory)
debug_groomer = KittenGroomerBase(source_directory,
dest_directory,
debug=True)
KittenGroomerBase(source_directory, dest_directory)
def test_list_all_files(self, tmpdir):
file = tmpdir.join('test.txt')
@ -256,6 +245,6 @@ class TestKittenGroomerBase:
testdir = tmpdir.join('testdir')
os.mkdir(testdir.strpath)
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
files = simple_groomer.list_all_files(simple_groomer.src_root_dir)
files = simple_groomer.list_all_files(simple_groomer.src_root_path)
assert file.strpath in files
assert testdir.strpath not in files