Some reorganization of filecheck.py, adding docstrings

pull/8/head
Dan Puttick 2016-12-16 17:18:53 -05:00
parent ecb4f56710
commit 173a844b69
1 changed files with 103 additions and 98 deletions

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import mimetypes
import shlex
import subprocess
@ -21,8 +20,7 @@ from pdfid import PDFiD, cPDFiD
from kittengroomer import FileBase, KittenGroomerBase, main
SEVENZ = '/usr/bin/7z'
PY3 = sys.version_info.major == 3
SEVENZ_PATH = '/usr/bin/7z'
# Prepare application/<subtype>
@ -41,7 +39,7 @@ mimes_data = ['octet-stream']
mimes_exif = ['image/jpeg', 'image/tiff']
mimes_png = ['image/png']
# Mime types we can pull metadata from
# Mimetypes we can pull metadata from
mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png']
# Aliases
@ -62,7 +60,7 @@ propertype = {'.gz': 'application/gzip'}
# Commonly used malicious extensions
# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/
# https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java
mal_ext = (
MAL_EXTS = (
# Applications
".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr",
".hta", ".cpl", ".msc", ".jar",
@ -86,55 +84,58 @@ mal_ext = (
class File(FileBase):
def __init__(self, src_path, dst_path):
''' Init file object, set the mimetype '''
super(File, self).__init__(src_path, dst_path)
self.is_recursive = False
if not self.has_mimetype():
# No mimetype, should not happen.
self.make_dangerous()
if not self.has_extension():
self.make_dangerous()
if self.extension in mal_ext:
self.log_details.update({'malicious_extension': self.extension})
self.make_dangerous()
self._check_dangerous()
if self.is_dangerous():
return
self.log_details.update({'maintype': self.main_type,
'subtype': self.sub_type,
'extension': self.extension})
self._check_extension()
self._check_mime()
# Check correlation known extension => actual mime type
def _check_dangerous(self):
if not self.has_mimetype():
# No mimetype, should not happen.
self.make_dangerous()
if not self.has_extension():
self.make_dangerous()
if self.extension in MAL_EXTS:
self.log_details.update({'malicious_extension': self.extension})
self.make_dangerous()
def _check_extension(self):
"""Guesses the file's mimetype based on its extension. If the file's
mimetype (as determined by libmagic) is contained in the mimetype
module's list of valid mimetypes and the expected mimetype based on its
extension differs from the mimetype determined by libmagic, then it
marks the file as dangerous."""
if propertype.get(self.extension) is not None:
expected_mimetype = propertype.get(self.extension)
else:
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
if aliases.get(expected_mimetype) is not None:
expected_mimetype = aliases.get(expected_mimetype)
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and expected_mimetype != self.mimetype:
self.log_details.update({'expected_mimetype': expected_mimetype})
self.make_dangerous()
# check correlation actual mime type => known extensions
def _check_mime(self):
"""Takes the mimetype (as determined by libmagic) and determines
whether the list of extensions that are normally associated with
that extension contains the file's actual extension."""
if aliases.get(self.mimetype) is not None:
mimetype = aliases.get(self.mimetype)
else:
mimetype = self.mimetype
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
if expected_extensions:
if len(self.extension) > 0 and self.extension not in expected_extensions:
self.log_details.update({'expected_extensions': expected_extensions})
self.make_dangerous()
else:
# there are no known extensions associated to this mimetype.
pass
def has_metadata(self):
if self.mimetype in mimes_metadata:
@ -144,18 +145,14 @@ class File(FileBase):
class KittenGroomerFileCheck(KittenGroomerBase):
def __init__(self, root_src=None, root_dst=None, max_recursive=2, debug=False):
'''
Initialize the basics of the conversion process
'''
def __init__(self, root_src=None, root_dst=None, max_recursive_depth=2, debug=False):
if root_src is None:
root_src = os.path.join(os.sep, 'media', 'src')
if root_dst is None:
root_dst = os.path.join(os.sep, 'media', 'dst')
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
self.recursive = 0
self.max_recursive = max_recursive
self.recursive_archive_depth = 0
self.max_recursive_depth = max_recursive_depth
subtypes_apps = [
(mimes_office, self._winoffice),
@ -189,21 +186,17 @@ class KittenGroomerFileCheck(KittenGroomerBase):
'inode': self.inode,
}
# ##### Helpers #####
# ##### Helper functions #####
def _init_subtypes_application(self, subtypes_application):
'''
Create the Dict to pick the right function based on the sub mime type
'''
to_return = {}
for list_subtypes, fct in subtypes_application:
"""Creates a dictionary with the right method based on the sub mime type."""
subtype_dict = {}
for list_subtypes, func in subtypes_application:
for st in list_subtypes:
to_return[st] = fct
return to_return
subtype_dict[st] = func
return subtype_dict
def _print_log(self):
'''
Print the logs related to the current file being processed
'''
"""Print the logs related to the current file being processed."""
tmp_log = self.log_name.fields(**self.cur_file.log_details)
if self.cur_file.is_dangerous():
tmp_log.warning(self.cur_file.log_string)
@ -212,13 +205,13 @@ class KittenGroomerFileCheck(KittenGroomerBase):
else:
tmp_log.debug(self.cur_file.log_string)
def _run_process(self, command_line, timeout=0, background=False):
'''Run subprocess, wait until it finishes'''
def _run_process(self, command_string, timeout=0, background=False):
"""Run command_string in a subprocess, wait until it finishes."""
if timeout != 0:
deadline = time.time() + timeout
else:
deadline = None
args = shlex.split(command_line)
args = shlex.split(command_string)
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
if background:
@ -236,42 +229,42 @@ class KittenGroomerFileCheck(KittenGroomerBase):
return True
#######################
# ##### Discarded mime types, reason in the comments ######
# ##### Discarded mimetypes, reason in the docstring ######
def inode(self):
''' Usually empty file. No reason (?) to copy it on the dest key'''
"""Empty file or symlink."""
if self.cur_file.is_symlink():
self.cur_file.log_string += 'Symlink to {}'.format(self.cur_file.log_details['symlink'])
else:
self.cur_file.log_string += 'Inode file'
def unknown(self):
''' This main type is unknown, that should not happen '''
"""Main type should never be unknown."""
self.cur_file.log_string += 'Unknown file'
def example(self):
'''Used in examples, should never be returned by libmagic'''
"""Used in examples, should never be returned by libmagic."""
self.cur_file.log_string += 'Example file'
def multipart(self):
'''Used in web apps, should never be returned by libmagic'''
"""Used in web apps, should never be returned by libmagic"""
self.cur_file.log_string += 'Multipart file'
# ##### Threated as malicious, no reason to have it on a USB key ######
# ##### Treated as malicious, no reason to have it on a USB key ######
def message(self):
'''Way to process message file'''
"""Process a message file."""
self.cur_file.log_string += 'Message file'
self.cur_file.make_dangerous()
self._safe_copy()
def model(self):
'''Way to process model file'''
"""Process a model file."""
self.cur_file.log_string += 'Model file'
self.cur_file.make_dangerous()
self._safe_copy()
# ##### Converted ######
# ##### Files that will be converted ######
def text(self):
"""Process an rtf, ooxml, or plaintext file."""
for r in mimes_rtf:
if r in self.cur_file.sub_type:
self.cur_file.log_string += 'Rich Text file'
@ -289,7 +282,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def application(self):
''' Everything can be there, using the subtype to decide '''
"""Processes an application specific file according to its subtype."""
for subtype, fct in self.subtypes_application.items():
if subtype in self.cur_file.sub_type:
fct()
@ -299,12 +292,13 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._unknown_app()
def _executables(self):
'''Way to process executable file'''
"""Processes an executable file."""
self.cur_file.add_log_details('processing_type', 'executable')
self.cur_file.make_dangerous()
self._safe_copy()
def _winoffice(self):
"""Processes a winoffice file using olefile/oletools."""
self.cur_file.add_log_details('processing_type', 'WinOffice')
# Try as if it is a valid document
oid = oletools.oleid.OleID(self.cur_file.src_path)
@ -343,6 +337,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def _ooxml(self):
"""Processes an ooxml file."""
self.cur_file.add_log_details('processing_type', 'ooxml')
try:
doc = officedissector.doc.Document(self.cur_file.src_path)
@ -369,6 +364,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def _libreoffice(self):
"""Processes a libreoffice file."""
self.cur_file.add_log_details('processing_type', 'libreoffice')
# As long as there ar no way to do a sanity check on the files => dangerous
try:
@ -385,7 +381,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def _pdf(self):
'''Way to process PDF file'''
"""Processes a PDF file."""
self.cur_file.add_log_details('processing_type', 'pdf')
xmlDoc = PDFiD(self.cur_file.src_path)
oPDFiD = cPDFiD(xmlDoc, True)
@ -407,33 +403,47 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self.cur_file.make_dangerous()
def _archive(self):
'''Way to process Archive'''
"""Processes an archive using 7zip. The archive is extracted to a
temporary directory and self.processdir is called on that directory.
The recursive archive depth is increased to protect against archive
bombs."""
self.cur_file.add_log_details('processing_type', 'archive')
self.cur_file.is_recursive = True
self.cur_file.log_string += 'Archive extracted, processing content.'
tmpdir = self.cur_file.dst_path + '_temp'
self._safe_mkdir(tmpdir)
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ, self.cur_file.src_path, tmpdir)
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir)
self._run_process(extract_command)
self.recursive += 1
self.recursive_archive_depth += 1
self.tree(tmpdir)
self.processdir(tmpdir, self.cur_file.dst_path)
self.recursive -= 1
self.recursive_archive_depth -= 1
self._safe_rmtree(tmpdir)
def _handle_archivebomb(self, src_dir):
self.cur_file.make_dangerous()
self.cur_file.add_log_details('Archive Bomb', True)
self.log_name.warning('ARCHIVE BOMB.')
self.log_name.warning('The content of the archive contains recursively other archives.')
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
self._safe_rmtree(src_dir)
if src_dir.endswith('_temp'):
bomb_path = src_dir[:-len('_temp')]
self._safe_remove(bomb_path)
def _unknown_app(self):
'''Way to process an unknown file'''
"""Processes an unknown file."""
self.cur_file.make_unknown()
self._safe_copy()
def _binary_app(self):
'''Way to process an unknown binary file'''
"""Processses an unknown binary file."""
self.cur_file.make_binary()
self._safe_copy()
#######################
# Metadata extractors
def _metadata_exif(self, metadataFile):
def _metadata_exif(self, metadata_file):
img = open(self.cur_file.src_path, 'rb')
tags = None
@ -463,7 +473,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
printable = value
else:
printable = str(value)
metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable))
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
self.cur_file.add_log_details('metadata', 'exif')
img.close()
return True
@ -487,22 +497,36 @@ class KittenGroomerFileCheck(KittenGroomerBase):
return False
def extract_metadata(self):
metadataFile = self._safe_metadata_split(".metadata.txt")
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile)
metadataFile.close()
metadata_file = self._safe_metadata_split(".metadata.txt")
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadata_file)
metadata_file.close()
if not success:
# FIXME Delete empty metadata file
pass
#######################
# ##### Not converted, checking the mime type ######
# ##### Media - audio and video aren't converted ######
def audio(self):
'''Way to process an audio file'''
"""Processes an audio file."""
self.cur_file.log_string += 'Audio file'
self._media_processing()
def video(self):
"""Processes a video."""
self.cur_file.log_string += 'Video file'
self._media_processing()
def _media_processing(self):
"""Generic way to process all media files."""
self.cur_file.add_log_details('processing_type', 'media')
self._safe_copy()
def image(self):
'''Way to process an image'''
"""Processes an image.
Extracts metadata if metadata is present. Creates a temporary
directory, opens the using PIL.Image, saves it to the temporary
directory, and copies it to the destination."""
if self.cur_file.has_metadata():
self.extract_metadata()
@ -534,40 +558,20 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self.cur_file.log_string += 'Image file'
self.cur_file.add_log_details('processing_type', 'image')
def video(self):
'''Way to process a video'''
self.cur_file.log_string += 'Video file'
self._media_processing()
def _media_processing(self):
'''Generic way to process all the media files'''
self.cur_file.add_log_details('processing_type', 'media')
self._safe_copy()
#######################
def processdir(self, src_dir=None, dst_dir=None):
'''
Main function doing the processing
'''
"""Main function coordinating file processing."""
if src_dir is None:
src_dir = self.src_root_dir
if dst_dir is None:
dst_dir = self.dst_root_dir
if self.recursive > 0:
if self.recursive_archive_depth > 0:
self._print_log()
if self.recursive >= self.max_recursive:
self.cur_file.make_dangerous()
self.cur_file.add_log_details('Archive Bomb', True)
self.log_name.warning('ARCHIVE BOMB.')
self.log_name.warning('The content of the archive contains recursively other archives.')
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
self._safe_rmtree(src_dir)
if src_dir.endswith('_temp'):
archbomb_path = src_dir[:-len('_temp')]
self._safe_remove(archbomb_path)
if self.recursive_archive_depth >= self.max_recursive_depth:
self._handle_archivebomb(src_dir)
for srcpath in self._list_all_files(src_dir):
self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir))
@ -581,5 +585,6 @@ class KittenGroomerFileCheck(KittenGroomerBase):
if not self.cur_file.is_recursive:
self._print_log()
if __name__ == '__main__':
main(KittenGroomerFileCheck, 'Generic version of the KittenGroomer. Convert and rename files.')
main(KittenGroomerFileCheck, 'File sanitizer used in CIRCLean. Renames potentially dangerous files.')