From 173a844b692aedab09473a1f2713fd99a3377ce9 Mon Sep 17 00:00:00 2001 From: Dan Puttick Date: Fri, 16 Dec 2016 17:18:53 -0500 Subject: [PATCH] Some reorganization of filecheck.py, adding docstrings --- bin/filecheck.py | 201 ++++++++++++++++++++++++----------------------- 1 file changed, 103 insertions(+), 98 deletions(-) diff --git a/bin/filecheck.py b/bin/filecheck.py index 14fe1a4..e9798b3 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os -import sys import mimetypes import shlex import subprocess @@ -21,8 +20,7 @@ from pdfid import PDFiD, cPDFiD from kittengroomer import FileBase, KittenGroomerBase, main -SEVENZ = '/usr/bin/7z' -PY3 = sys.version_info.major == 3 +SEVENZ_PATH = '/usr/bin/7z' # Prepare application/ @@ -41,7 +39,7 @@ mimes_data = ['octet-stream'] mimes_exif = ['image/jpeg', 'image/tiff'] mimes_png = ['image/png'] -# Mime types we can pull metadata from +# Mimetypes we can pull metadata from mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png'] # Aliases @@ -62,7 +60,7 @@ propertype = {'.gz': 'application/gzip'} # Commonly used malicious extensions # Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/ # https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java -mal_ext = ( +MAL_EXTS = ( # Applications ".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr", ".hta", ".cpl", ".msc", ".jar", @@ -86,55 +84,58 @@ mal_ext = ( class File(FileBase): def __init__(self, src_path, dst_path): - ''' Init file object, set the mimetype ''' super(File, self).__init__(src_path, dst_path) - self.is_recursive = False - if not self.has_mimetype(): - # No mimetype, should not happen. - self.make_dangerous() - - if not self.has_extension(): - self.make_dangerous() - - if self.extension in mal_ext: - self.log_details.update({'malicious_extension': self.extension}) - self.make_dangerous() - + self._check_dangerous() if self.is_dangerous(): return self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension}) + self._check_extension() + self._check_mime() - # Check correlation known extension => actual mime type + def _check_dangerous(self): + if not self.has_mimetype(): + # No mimetype, should not happen. + self.make_dangerous() + if not self.has_extension(): + self.make_dangerous() + if self.extension in MAL_EXTS: + self.log_details.update({'malicious_extension': self.extension}) + self.make_dangerous() + + def _check_extension(self): + """Guesses the file's mimetype based on its extension. If the file's + mimetype (as determined by libmagic) is contained in the mimetype + module's list of valid mimetypes and the expected mimetype based on its + extension differs from the mimetype determined by libmagic, then it + marks the file as dangerous.""" if propertype.get(self.extension) is not None: expected_mimetype = propertype.get(self.extension) else: expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False) if aliases.get(expected_mimetype) is not None: expected_mimetype = aliases.get(expected_mimetype) - is_known_extension = self.extension in mimetypes.types_map.keys() if is_known_extension and expected_mimetype != self.mimetype: self.log_details.update({'expected_mimetype': expected_mimetype}) self.make_dangerous() - # check correlation actual mime type => known extensions + def _check_mime(self): + """Takes the mimetype (as determined by libmagic) and determines + whether the list of extensions that are normally associated with + that extension contains the file's actual extension.""" if aliases.get(self.mimetype) is not None: mimetype = aliases.get(self.mimetype) else: mimetype = self.mimetype - expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False) if expected_extensions: if len(self.extension) > 0 and self.extension not in expected_extensions: self.log_details.update({'expected_extensions': expected_extensions}) self.make_dangerous() - else: - # there are no known extensions associated to this mimetype. - pass def has_metadata(self): if self.mimetype in mimes_metadata: @@ -144,18 +145,14 @@ class File(FileBase): class KittenGroomerFileCheck(KittenGroomerBase): - def __init__(self, root_src=None, root_dst=None, max_recursive=2, debug=False): - ''' - Initialize the basics of the conversion process - ''' + def __init__(self, root_src=None, root_dst=None, max_recursive_depth=2, debug=False): if root_src is None: root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug) - - self.recursive = 0 - self.max_recursive = max_recursive + self.recursive_archive_depth = 0 + self.max_recursive_depth = max_recursive_depth subtypes_apps = [ (mimes_office, self._winoffice), @@ -189,21 +186,17 @@ class KittenGroomerFileCheck(KittenGroomerBase): 'inode': self.inode, } - # ##### Helpers ##### + # ##### Helper functions ##### def _init_subtypes_application(self, subtypes_application): - ''' - Create the Dict to pick the right function based on the sub mime type - ''' - to_return = {} - for list_subtypes, fct in subtypes_application: + """Creates a dictionary with the right method based on the sub mime type.""" + subtype_dict = {} + for list_subtypes, func in subtypes_application: for st in list_subtypes: - to_return[st] = fct - return to_return + subtype_dict[st] = func + return subtype_dict def _print_log(self): - ''' - Print the logs related to the current file being processed - ''' + """Print the logs related to the current file being processed.""" tmp_log = self.log_name.fields(**self.cur_file.log_details) if self.cur_file.is_dangerous(): tmp_log.warning(self.cur_file.log_string) @@ -212,13 +205,13 @@ class KittenGroomerFileCheck(KittenGroomerBase): else: tmp_log.debug(self.cur_file.log_string) - def _run_process(self, command_line, timeout=0, background=False): - '''Run subprocess, wait until it finishes''' + def _run_process(self, command_string, timeout=0, background=False): + """Run command_string in a subprocess, wait until it finishes.""" if timeout != 0: deadline = time.time() + timeout else: deadline = None - args = shlex.split(command_line) + args = shlex.split(command_string) with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout: p = subprocess.Popen(args, stdout=stdout, stderr=stderr) if background: @@ -236,42 +229,42 @@ class KittenGroomerFileCheck(KittenGroomerBase): return True ####################### - - # ##### Discarded mime types, reason in the comments ###### + # ##### Discarded mimetypes, reason in the docstring ###### def inode(self): - ''' Usually empty file. No reason (?) to copy it on the dest key''' + """Empty file or symlink.""" if self.cur_file.is_symlink(): self.cur_file.log_string += 'Symlink to {}'.format(self.cur_file.log_details['symlink']) else: self.cur_file.log_string += 'Inode file' def unknown(self): - ''' This main type is unknown, that should not happen ''' + """Main type should never be unknown.""" self.cur_file.log_string += 'Unknown file' def example(self): - '''Used in examples, should never be returned by libmagic''' + """Used in examples, should never be returned by libmagic.""" self.cur_file.log_string += 'Example file' def multipart(self): - '''Used in web apps, should never be returned by libmagic''' + """Used in web apps, should never be returned by libmagic""" self.cur_file.log_string += 'Multipart file' - # ##### Threated as malicious, no reason to have it on a USB key ###### + # ##### Treated as malicious, no reason to have it on a USB key ###### def message(self): - '''Way to process message file''' + """Process a message file.""" self.cur_file.log_string += 'Message file' self.cur_file.make_dangerous() self._safe_copy() def model(self): - '''Way to process model file''' + """Process a model file.""" self.cur_file.log_string += 'Model file' self.cur_file.make_dangerous() self._safe_copy() - # ##### Converted ###### + # ##### Files that will be converted ###### def text(self): + """Process an rtf, ooxml, or plaintext file.""" for r in mimes_rtf: if r in self.cur_file.sub_type: self.cur_file.log_string += 'Rich Text file' @@ -289,7 +282,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): self._safe_copy() def application(self): - ''' Everything can be there, using the subtype to decide ''' + """Processes an application specific file according to its subtype.""" for subtype, fct in self.subtypes_application.items(): if subtype in self.cur_file.sub_type: fct() @@ -299,12 +292,13 @@ class KittenGroomerFileCheck(KittenGroomerBase): self._unknown_app() def _executables(self): - '''Way to process executable file''' + """Processes an executable file.""" self.cur_file.add_log_details('processing_type', 'executable') self.cur_file.make_dangerous() self._safe_copy() def _winoffice(self): + """Processes a winoffice file using olefile/oletools.""" self.cur_file.add_log_details('processing_type', 'WinOffice') # Try as if it is a valid document oid = oletools.oleid.OleID(self.cur_file.src_path) @@ -343,6 +337,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): self._safe_copy() def _ooxml(self): + """Processes an ooxml file.""" self.cur_file.add_log_details('processing_type', 'ooxml') try: doc = officedissector.doc.Document(self.cur_file.src_path) @@ -369,6 +364,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): self._safe_copy() def _libreoffice(self): + """Processes a libreoffice file.""" self.cur_file.add_log_details('processing_type', 'libreoffice') # As long as there ar no way to do a sanity check on the files => dangerous try: @@ -385,7 +381,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): self._safe_copy() def _pdf(self): - '''Way to process PDF file''' + """Processes a PDF file.""" self.cur_file.add_log_details('processing_type', 'pdf') xmlDoc = PDFiD(self.cur_file.src_path) oPDFiD = cPDFiD(xmlDoc, True) @@ -407,33 +403,47 @@ class KittenGroomerFileCheck(KittenGroomerBase): self.cur_file.make_dangerous() def _archive(self): - '''Way to process Archive''' + """Processes an archive using 7zip. The archive is extracted to a + temporary directory and self.processdir is called on that directory. + The recursive archive depth is increased to protect against archive + bombs.""" self.cur_file.add_log_details('processing_type', 'archive') self.cur_file.is_recursive = True self.cur_file.log_string += 'Archive extracted, processing content.' tmpdir = self.cur_file.dst_path + '_temp' self._safe_mkdir(tmpdir) - extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ, self.cur_file.src_path, tmpdir) + extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir) self._run_process(extract_command) - self.recursive += 1 + self.recursive_archive_depth += 1 self.tree(tmpdir) self.processdir(tmpdir, self.cur_file.dst_path) - self.recursive -= 1 + self.recursive_archive_depth -= 1 self._safe_rmtree(tmpdir) + def _handle_archivebomb(self, src_dir): + self.cur_file.make_dangerous() + self.cur_file.add_log_details('Archive Bomb', True) + self.log_name.warning('ARCHIVE BOMB.') + self.log_name.warning('The content of the archive contains recursively other archives.') + self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.') + self._safe_rmtree(src_dir) + if src_dir.endswith('_temp'): + bomb_path = src_dir[:-len('_temp')] + self._safe_remove(bomb_path) + def _unknown_app(self): - '''Way to process an unknown file''' + """Processes an unknown file.""" self.cur_file.make_unknown() self._safe_copy() def _binary_app(self): - '''Way to process an unknown binary file''' + """Processses an unknown binary file.""" self.cur_file.make_binary() self._safe_copy() ####################### # Metadata extractors - def _metadata_exif(self, metadataFile): + def _metadata_exif(self, metadata_file): img = open(self.cur_file.src_path, 'rb') tags = None @@ -463,7 +473,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): printable = value else: printable = str(value) - metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable)) + metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable)) self.cur_file.add_log_details('metadata', 'exif') img.close() return True @@ -487,22 +497,36 @@ class KittenGroomerFileCheck(KittenGroomerBase): return False def extract_metadata(self): - metadataFile = self._safe_metadata_split(".metadata.txt") - success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile) - metadataFile.close() + metadata_file = self._safe_metadata_split(".metadata.txt") + success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadata_file) + metadata_file.close() if not success: # FIXME Delete empty metadata file pass ####################### - # ##### Not converted, checking the mime type ###### + # ##### Media - audio and video aren't converted ###### def audio(self): - '''Way to process an audio file''' + """Processes an audio file.""" self.cur_file.log_string += 'Audio file' self._media_processing() + def video(self): + """Processes a video.""" + self.cur_file.log_string += 'Video file' + self._media_processing() + + def _media_processing(self): + """Generic way to process all media files.""" + self.cur_file.add_log_details('processing_type', 'media') + self._safe_copy() + def image(self): - '''Way to process an image''' + """Processes an image. + + Extracts metadata if metadata is present. Creates a temporary + directory, opens the using PIL.Image, saves it to the temporary + directory, and copies it to the destination.""" if self.cur_file.has_metadata(): self.extract_metadata() @@ -534,40 +558,20 @@ class KittenGroomerFileCheck(KittenGroomerBase): self.cur_file.log_string += 'Image file' self.cur_file.add_log_details('processing_type', 'image') - def video(self): - '''Way to process a video''' - self.cur_file.log_string += 'Video file' - self._media_processing() - - def _media_processing(self): - '''Generic way to process all the media files''' - self.cur_file.add_log_details('processing_type', 'media') - self._safe_copy() - ####################### def processdir(self, src_dir=None, dst_dir=None): - ''' - Main function doing the processing - ''' + """Main function coordinating file processing.""" if src_dir is None: src_dir = self.src_root_dir if dst_dir is None: dst_dir = self.dst_root_dir - if self.recursive > 0: + if self.recursive_archive_depth > 0: self._print_log() - if self.recursive >= self.max_recursive: - self.cur_file.make_dangerous() - self.cur_file.add_log_details('Archive Bomb', True) - self.log_name.warning('ARCHIVE BOMB.') - self.log_name.warning('The content of the archive contains recursively other archives.') - self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.') - self._safe_rmtree(src_dir) - if src_dir.endswith('_temp'): - archbomb_path = src_dir[:-len('_temp')] - self._safe_remove(archbomb_path) + if self.recursive_archive_depth >= self.max_recursive_depth: + self._handle_archivebomb(src_dir) for srcpath in self._list_all_files(src_dir): self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir)) @@ -581,5 +585,6 @@ class KittenGroomerFileCheck(KittenGroomerBase): if not self.cur_file.is_recursive: self._print_log() + if __name__ == '__main__': - main(KittenGroomerFileCheck, 'Generic version of the KittenGroomer. Convert and rename files.') + main(KittenGroomerFileCheck, 'File sanitizer used in CIRCLean. Renames potentially dangerous files.')