Change FileBase.log_details to Filebase._file_props

* _file_props is a dict that will hold all information about the file
* Updated filecheck.py to reflect this
* Potentially will change contents of file_props to being attributes on the
file in the future. This change would be easy since all access to _file_props
is now via set_property and get_property methods.
* Add filename to _file_props
pull/12/head
Dan Puttick 2017-03-06 15:02:29 -05:00
parent 1c58a7347e
commit 12d5624b4d
4 changed files with 121 additions and 111 deletions

View File

@ -86,9 +86,6 @@ class File(FileBase):
def __init__(self, src_path, dst_path, logger):
super(File, self).__init__(src_path, dst_path, logger)
self.is_recursive = False
self.log_details.update({'maintype': self.main_type,
'subtype': self.sub_type,
'extension': self.extension})
subtypes_apps = [
(Config.mimes_office, self._winoffice),
@ -128,7 +125,7 @@ class File(FileBase):
if not self.has_extension():
self.make_dangerous()
if self.extension in Config.malicious_exts:
self.log_details.update({'malicious_extension': self.extension})
self.set_property('malicious_extension', self.extension)
self.make_dangerous()
def _check_extension(self):
@ -146,7 +143,7 @@ class File(FileBase):
expected_mimetype = Config.aliases[expected_mimetype]
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and expected_mimetype != self.mimetype:
self.log_details.update({'expected_mimetype': expected_mimetype})
self.set_property('expected_mimetype', expected_mimetype)
self.make_dangerous()
def _check_mimetype(self):
@ -161,7 +158,7 @@ class File(FileBase):
strict=False)
if expected_extensions:
if self.has_extension() and self.extension not in expected_extensions:
self.log_details.update({'expected_extensions': expected_extensions})
self.set_property('expected_extensions', expected_extensions)
self.make_dangerous()
def check(self):
@ -182,11 +179,11 @@ class File(FileBase):
def write_log(self):
"""Print the logs related to the current file being processed."""
# LOG: move to helpers.py?
tmp_log = self.logger.log.fields(**self.log_details)
# LOG: move to helpers.py and change
tmp_log = self.logger.log.fields(**self._file_props)
if self.is_dangerous():
tmp_log.warning(self.log_string)
elif self.log_details.get('unknown') or self.log_details.get('binary'):
elif self.get_property('unknown') or self.get_property('binary'):
tmp_log.info(self.log_string)
else:
tmp_log.debug(self.log_string)
@ -209,7 +206,7 @@ class File(FileBase):
def inode(self):
"""Empty file or symlink."""
if self.is_symlink():
self.log_string += 'Symlink to {}'.format(self.log_details['symlink'])
self.log_string += 'Symlink to {}'.format(self.get_property('symlink'))
else:
self.log_string += 'Inode file'
@ -265,12 +262,12 @@ class File(FileBase):
def _executables(self):
"""Processes an executable file."""
self.add_log_details('processing_type', 'executable')
self.set_property('processing_type', 'executable')
self.make_dangerous()
def _winoffice(self):
"""Processes a winoffice file using olefile/oletools."""
self.add_log_details('processing_type', 'WinOffice')
self.set_property('processing_type', 'WinOffice')
# Try as if it is a valid document
oid = oletools.oleid.OleID(self.src_path)
if not olefile.isOleFile(self.src_path):
@ -278,37 +275,37 @@ class File(FileBase):
try:
ole = olefile.OleFileIO(self.src_path, raise_defects=olefile.DEFECT_INCORRECT)
except:
self.add_log_details('not_parsable', True)
self.set_property('not_parsable', True)
self.make_dangerous()
if ole.parsing_issues:
self.add_log_details('parsing_issues', True)
self.set_property('parsing_issues', True)
self.make_dangerous()
else:
if ole.exists('macros/vba') or ole.exists('Macros') \
or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'):
self.add_log_details('macro', True)
self.set_property('macro', True)
self.make_dangerous()
else:
indicators = oid.check()
# Encrypted ban be set by multiple checks on the script
if oid.encrypted.value:
self.add_log_details('encrypted', True)
self.set_property('encrypted', True)
self.make_dangerous()
if oid.macros.value or oid.ole.exists('macros/vba') or oid.ole.exists('Macros') \
or oid.ole.exists('_VBA_PROJECT_CUR') or oid.ole.exists('VBA'):
self.add_log_details('macro', True)
self.set_property('macro', True)
self.make_dangerous()
for i in indicators:
if i.id == 'ObjectPool' and i.value:
# FIXME: Is it suspicious?
self.add_log_details('objpool', True)
self.set_property('objpool', True)
elif i.id == 'flash' and i.value:
self.add_log_details('flash', True)
self.set_property('flash', True)
self.make_dangerous()
def _ooxml(self):
"""Processes an ooxml file."""
self.add_log_details('processing_type', 'ooxml')
self.set_property('processing_type', 'ooxml')
try:
doc = officedissector.doc.Document(self.src_path)
except Exception:
@ -318,56 +315,56 @@ class File(FileBase):
# There are probably other potentially malicious features:
# fonts, custom props, custom XML
if doc.is_macro_enabled or len(doc.features.macros) > 0:
self.add_log_details('macro', True)
self.set_property('macro', True)
self.make_dangerous()
if len(doc.features.embedded_controls) > 0:
self.add_log_details('activex', True)
self.set_property('activex', True)
self.make_dangerous()
if len(doc.features.embedded_objects) > 0:
# Exploited by CVE-2014-4114 (OLE)
self.add_log_details('embedded_obj', True)
self.set_property('embedded_obj', True)
self.make_dangerous()
if len(doc.features.embedded_packages) > 0:
self.add_log_details('embedded_pack', True)
self.set_property('embedded_pack', True)
self.make_dangerous()
def _libreoffice(self):
"""Processes a libreoffice file."""
self.add_log_details('processing_type', 'libreoffice')
self.set_property('processing_type', 'libreoffice')
# As long as there ar no way to do a sanity check on the files => dangerous
try:
lodoc = zipfile.ZipFile(self.src_path, 'r')
except:
# TODO: are there specific exceptions we should catch here? Or is anything ok
self.add_log_details('invalid', True)
self.set_property('invalid', True)
self.make_dangerous()
for f in lodoc.infolist():
fname = f.filename.lower()
if fname.startswith('script') or fname.startswith('basic') or \
fname.startswith('object') or fname.endswith('.bin'):
self.add_log_details('macro', True)
self.set_property('macro', True)
self.make_dangerous()
def _pdf(self):
"""Processes a PDF file."""
self.add_log_details('processing_type', 'pdf')
self.set_property('processing_type', 'pdf')
xmlDoc = PDFiD(self.src_path)
oPDFiD = cPDFiD(xmlDoc, True)
# TODO: are there other characteristics which should be dangerous?
if oPDFiD.encrypt.count > 0:
self.add_log_details('encrypted', True)
self.set_property('encrypted', True)
self.make_dangerous()
if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0:
self.add_log_details('javascript', True)
self.set_property('javascript', True)
self.make_dangerous()
if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0:
self.add_log_details('openaction', True)
self.set_property('openaction', True)
self.make_dangerous()
if oPDFiD.richmedia.count > 0:
self.add_log_details('flash', True)
self.set_property('flash', True)
self.make_dangerous()
if oPDFiD.launch.count > 0:
self.add_log_details('launch', True)
self.set_property('launch', True)
self.make_dangerous()
def _archive(self):
@ -375,7 +372,7 @@ class File(FileBase):
temporary directory and self.process_dir is called on that directory.
The recursive archive depth is increased to protect against archive
bombs."""
self.add_log_details('processing_type', 'archive')
self.set_property('processing_type', 'archive')
self.is_recursive = True
# self.log_string += 'Archive extracted, processing content.'
@ -422,7 +419,7 @@ class File(FileBase):
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
self.add_log_details('metadata', 'exif')
self.set_property('metadata', 'exif')
img.close()
return True
@ -435,7 +432,7 @@ class File(FileBase):
if tag not in ('icc_profile'):
with open(metadata_file_path, 'w+') as metadata_file:
metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
self.add_log_details('metadata', 'png')
self.set_property('metadata', 'png')
img.close()
# Catch decompression bombs
except Exception as e:
@ -467,7 +464,7 @@ class File(FileBase):
def _media_processing(self):
"""Generic way to process all media files."""
self.add_log_details('processing_type', 'media')
self.set_property('processing_type', 'media')
def image(self):
"""Processes an image.
@ -493,7 +490,7 @@ class File(FileBase):
print(e)
self.make_dangerous()
self.log_string += 'Image file'
self.add_log_details('processing_type', 'image')
self.set_property('processing_type', 'image')
class KittenGroomerFileCheck(KittenGroomerBase):
@ -554,7 +551,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
def _handle_archivebomb(self, file):
file.make_dangerous()
file.add_log_details('Archive Bomb', True)
file.set_property('Archive Bomb', True)
self.logger.log.warning('ARCHIVE BOMB.')
self.logger.log.warning('The content of the archive contains recursively other archives.')
self.logger.log.warning('This is a bad sign so the archive is not extracted to the destination key.')

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .helpers import FileBase, KittenGroomerBase, GroomerLog, main
from .helpers import FileBase, KittenGroomerBase, GroomerLogger, main

View File

@ -41,15 +41,18 @@ class FileBase(object):
"""Initialized with the source path and expected destination path."""
self.src_path = src_path
self.dst_path = dst_path
# TODO: rename this to file_properties (and change in other groomers)
self.log_details = {'filepath': self.src_path}
self.filename = os.path.basename(self.src_path)
self.log_string = ''
self.logger = logger
self.extension = self._determine_extension()
self.mimetype = self._determine_mimetype()
if self.mimetype:
self.main_type, self.sub_type = self._split_subtypes(self.mimetype)
self.logger = logger
self.filename = os.path.basename(self.src_path)
self._file_props = {'filepath': self.src_path,
'filename': self.filename,
'maintype': self.main_type,
'subtype': self.sub_type,
'extension': self.extension}
def _determine_extension(self):
_, ext = os.path.splitext(self.src_path)
@ -68,8 +71,8 @@ class FileBase(object):
# Note: magic will always return something, even if it's just 'data'
except UnicodeEncodeError as e:
# FIXME: The encoding of the file is broken (possibly UTF-16)
# LOG: log this error
mt = None
self.log_details.update({'UnicodeError': e})
try:
mimetype = mt.decode("utf-8")
except:
@ -84,50 +87,50 @@ class FileBase(object):
return main_type, sub_type
def has_mimetype(self):
"""
Returns True if file has a full mimetype, else False.
Returns False + updates log if self.main_type or self.sub_type
are not set.
"""
"""Returns True if file has a full mimetype, else False."""
if not self.main_type or not self.sub_type:
self.log_details.update({'broken_mime': True})
# LOG: log this as an error
# self._file_props.update({'broken_mime': True})
return False
return True
else:
return True
def has_extension(self):
"""
Returns True if self.extension is set, else False.
Returns False + updates self.log_details if self.extension is not set.
"""
if self.extension == '':
self.log_details.update({'no_extension': True})
"""Returns True if self.extension is set, else False."""
if self.extension is None:
# TODO: move this props updating to some other method
# self.set_property('no_extension', True)
return False
return True
else:
return True
def is_dangerous(self):
"""Returns True if self.log_details contains 'dangerous'."""
return ('dangerous' in self.log_details)
"""True if file has been marked 'dangerous' else False."""
return ('dangerous' in self._file_props)
def is_unknown(self):
"""Returns True if self.log_details contains 'unknown'."""
return ('unknown' in self.log_details)
"""True if file has been marked 'unknown' else False."""
return ('unknown' in self._file_props)
def is_binary(self):
"""returns True if self.log_details contains 'binary'."""
return ('binary' in self.log_details)
"""True if file has been marked 'binary' else False."""
return ('binary' in self._file_props)
def is_symlink(self):
"""Returns True and updates log if file is a symlink."""
if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink':
self.log_details.update({'symlink': os.readlink(self.src_path)})
# TODO: move this props updating somewhere else
# self.set_property('symlink', os.readlink(self.src_path))
return True
return False
else:
return False
def add_log_details(self, key, value):
"""Takes a key + a value and adds them to self.log_details."""
self.log_details[key] = value
def set_property(self, prop_string, value):
"""Takes a property + a value and adds them to self._file_props."""
self._file_props[prop_string] = value
def get_property(self, file_prop):
return self._file_props.get(file_prop, None)
def make_dangerous(self):
"""
@ -138,7 +141,7 @@ class FileBase(object):
"""
if self.is_dangerous():
return
self.log_details['dangerous'] = True
self.set_property('dangerous', True)
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
@ -146,7 +149,7 @@ class FileBase(object):
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
if self.is_dangerous() or self.is_binary():
return
self.log_details['unknown'] = True
self.set_property('unknown', True)
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
@ -154,15 +157,17 @@ class FileBase(object):
"""Marks a file as a binary and appends .bin to filename."""
if self.is_dangerous():
return
self.log_details['binary'] = True
self.set_property('binary', True)
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
def force_ext(self, ext):
"""If dst_path does not end in ext, appends the ext and updates log."""
"""If dst_path does not end in ext, changes it and edits _file_props."""
if not self.dst_path.endswith(ext):
self.log_details['force_ext'] = True
self.set_property('force_ext', True)
self.dst_path += ext
if not self._file_props['extension'] == ext:
self.set_property('extension', ext)
def create_metadata_file(self, ext):
"""Create a separate file to hold this file's metadata."""
@ -187,8 +192,9 @@ class FileBase(object):
class GroomerLogger(object):
"""Groomer logging interface"""
def __init__(self, root_dir, debug=False):
self.log_dir_path = os.path.join(root_dir, 'logs')
def __init__(self, root_dir_path, debug=False):
self.root_dir = root_dir_path
self.log_dir_path = os.path.join(root_dir_path, 'logs')
if os.path.exists(self.log_dir_path):
shutil.rmtree(self.log_dir_path)
os.makedirs(self.log_dir_path)
@ -252,21 +258,25 @@ class KittenGroomerBase(object):
def _safe_rmtree(self, directory):
"""Remove a directory tree if it exists."""
# TODO: should be a public method
if os.path.exists(directory):
shutil.rmtree(directory)
def _safe_remove(self, filepath):
"""Remove a file if it exists."""
# TODO: should be a public method
if os.path.exists(filepath):
os.remove(filepath)
def _safe_mkdir(self, directory):
"""Make a directory if it does not exist."""
# TODO: should be a public method
if not os.path.exists(directory):
os.makedirs(directory)
def _safe_copy(self, src=None, dst=None):
"""Copy a file and create directory if needed."""
# TODO: should be a public method
if src is None:
src = self.cur_file.src_path
if dst is None:

View File

@ -5,7 +5,7 @@ import os
import pytest
from kittengroomer import FileBase, KittenGroomerBase, GroomerLog
from kittengroomer import FileBase, KittenGroomerBase, GroomerLogger
from kittengroomer.helpers import ImplementationRequired
skip = pytest.mark.skip
@ -95,13 +95,10 @@ class TestFileBase:
def test_init(self, generic_conf_file):
file = generic_conf_file
assert file.log_details
assert file.log_details['filepath'] == file.src_path
assert file.extension == '.conf'
copied_log = file.log_details.copy()
file.log_details = ''
# assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable
# we should probably check for more extensions here
assert file.get_property('filepath') == file.src_path
assert file.get_property('extension') == '.conf'
# TODO: should we make log_details undeletable?
# TODO: we should probably check for more extensions here
def test_extension_uppercase(self, tmpdir):
file_path = tmpdir.join('TEST.TXT')
@ -129,13 +126,13 @@ class TestFileBase:
def test_has_extension(self, temp_file, temp_file_no_ext):
assert temp_file.has_extension() is True
print(temp_file_no_ext.extension)
assert temp_file_no_ext.has_extension() is False
def test_add_log_details(self, generic_conf_file):
generic_conf_file.add_log_details('test', True)
assert generic_conf_file.log_details['test'] is True
with pytest.raises(KeyError):
assert generic_conf_file.log_details['wrong'] is False
def test_set_property(self, generic_conf_file):
generic_conf_file.set_property('test', True)
assert generic_conf_file.get_property('test') is True
assert generic_conf_file.get_property('wrong') is None
def test_marked_dangerous(self, file_marked_all_parameterized):
file_marked_all_parameterized.make_dangerous()
@ -164,52 +161,54 @@ class TestFileBase:
assert symlink.is_symlink() is True
def test_generic_make_unknown(self, generic_conf_file):
assert generic_conf_file.log_details.get('unknown') is None
assert generic_conf_file.is_unknown() is False
generic_conf_file.make_unknown()
assert generic_conf_file.log_details.get('unknown') is True
assert generic_conf_file.is_unknown()
# given a FileBase object with no marking, should do the right things
def test_marked_make_unknown(self, file_marked_all_parameterized):
file = file_marked_all_parameterized
if file.log_details.get('unknown'):
if file.is_unknown():
file.make_unknown()
assert file.log_details.get('unknown') is True
assert file.is_unknown()
else:
assert file.log_details.get('unknown') is None
assert file.is_unknown() is False
file.make_unknown()
assert file.log_details.get('unknown') is None
assert file.is_unknown() is False
# given a FileBase object with an unrecognized marking, should ???
def test_generic_make_binary(self, generic_conf_file):
assert generic_conf_file.log_details.get('binary') is None
assert generic_conf_file.is_binary() is False
generic_conf_file.make_binary()
assert generic_conf_file.log_details.get('binary') is True
assert generic_conf_file.is_binary() is True
def test_marked_make_binary(self, file_marked_all_parameterized):
file = file_marked_all_parameterized
if file.log_details.get('dangerous'):
if file.is_dangerous():
file.make_binary()
assert file.log_details.get('binary') is None
assert file.is_binary() is False
assert file.get_property('binary') is None
else:
file.make_binary()
assert file.log_details.get('binary') is True
assert file.is_binary()
assert file.get_property('binary') is True
def test_force_ext_change(self, generic_conf_file):
assert generic_conf_file.has_extension()
assert generic_conf_file.extension == '.conf'
assert generic_conf_file.get_property('extension') == '.conf'
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
generic_conf_file.force_ext('.txt')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.txt'
assert generic_conf_file.log_details.get('force_ext') is True
# should make a file's extension change
assert generic_conf_file.get_property('force_ext') is True
assert generic_conf_file.get_property('extension') == '.txt'
# should be able to handle weird paths
def test_force_ext_correct(self, generic_conf_file):
assert generic_conf_file.has_extension()
assert generic_conf_file.extension == '.conf'
assert generic_conf_file.get_property('extension') == '.conf'
generic_conf_file.force_ext('.conf')
assert os.path.splitext(generic_conf_file.dst_path)[1] == '.conf'
assert generic_conf_file.log_details.get('force_ext') is None
assert generic_conf_file.get_property('force_ext') is None
# shouldn't change a file's extension if it already is right
def test_create_metadata_file(self, temp_file):
@ -224,9 +223,13 @@ class TestFileBase:
class TestLogger:
@xfail
def test_tree(self, tmpdir):
GroomerLog.tree(tmpdir)
@fixture
def generic_logger(self, tmpdir):
return GroomerLogger(tmpdir.strpath)
def test_tree(self, generic_logger):
generic_logger.tree(generic_logger.root_dir)
class TestKittenGroomerBase: