#!/usr/bin/env python # -*- coding: utf-8 -*- import os import mimetypes import shlex import subprocess import time import zipfile import oletools.oleid import olefile import officedissector import warnings import exifread from PIL import Image from PIL import PngImagePlugin from pdfid import PDFiD, cPDFiD from kittengroomer import FileBase, KittenGroomerBase, main SEVENZ = '/usr/bin/7z' # Prepare application/ mimes_ooxml = ['vnd.openxmlformats-officedocument.'] mimes_office = ['msword', 'vnd.ms-'] mimes_libreoffice = ['vnd.oasis.opendocument'] mimes_rtf = ['rtf', 'richtext'] mimes_pdf = ['pdf', 'postscript'] mimes_xml = ['xml'] mimes_ms = ['dosexec'] mimes_compressed = ['zip', 'rar', 'bzip2', 'lzip', 'lzma', 'lzop', 'xz', 'compress', 'gzip', 'tar'] mimes_data = ['octet-stream'] # Prepare image/ mimes_exif = ['image/jpeg', 'image/tiff'] mimes_png = ['image/png'] # Mime types we can pull metadata from mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png'] # Aliases aliases = { # Win executables 'application/x-msdos-program': 'application/x-dosexec', 'application/x-dosexec': 'application/x-msdos-program' } # Sometimes, mimetypes.guess_type is giving unexpected results, such as for the .tar.gz files: # In [12]: mimetypes.guess_type('toot.tar.gz', strict=False) # Out[12]: ('application/x-tar', 'gzip') # It works as expected if you do mimetypes.guess_type('application/gzip', strict=False) propertype = {'.gz': 'application/gzip'} # Commonly used malicious extensions # Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/ # https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java mal_ext = ( # Applications ".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr", ".hta", ".cpl", ".msc", ".jar", # Scripts ".bat", ".cmd", ".vb", ".vbs", ".vbe", ".js", ".jse", ".ws", ".wsf", ".wsc", ".wsh", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2", ".msh", ".msh1", ".msh2", ".mshxml", ".msh1xml", ".msh2xml", # Shortcuts ".scf", ".lnk", ".inf", # Other ".reg", ".dll", # Office macro (OOXML with macro enabled) ".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm", ".potm", ".ppam", ".ppsm", ".sldm", # banned from wirecode ".asf", ".asx", ".au", ".htm", ".html", ".mht", ".vbs", ".wax", ".wm", ".wma", ".wmd", ".wmv", ".wmx", ".wmz", ".wvx", ) class File(FileBase): def __init__(self, src_path, dst_path): ''' Init file object, set the mimetype ''' super(File, self).__init__(src_path, dst_path) self.is_recursive = False if not self.has_mimetype(): # No mimetype, should not happen. self.make_dangerous() if not self.has_extension(): self.make_dangerous() if self.extension in mal_ext: self.log_details.update({'malicious_extension': self.extension}) self.make_dangerous() if self.is_dangerous(): return self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension}) # Check correlation known extension => actual mime type if propertype.get(self.extension) is not None: expected_mimetype = propertype.get(self.extension) else: expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False) if aliases.get(expected_mimetype) is not None: expected_mimetype = aliases.get(expected_mimetype) is_known_extension = self.extension in mimetypes.types_map.keys() if is_known_extension and expected_mimetype != self.mimetype: self.log_details.update({'expected_mimetype': expected_mimetype}) self.make_dangerous() # check correlation actual mime type => known extensions if aliases.get(self.mimetype) is not None: mimetype = aliases.get(self.mimetype) else: mimetype = self.mimetype expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False) if expected_extensions: if len(self.extension) > 0 and self.extension not in expected_extensions: self.log_details.update({'expected_extensions': expected_extensions}) self.make_dangerous() else: # there are no known extensions associated to this mimetype. pass def has_metadata(self): if self.mimetype in mimes_metadata: return True return False class KittenGroomerFileCheck(KittenGroomerBase): def __init__(self, root_src=None, root_dst=None, max_recursive=5, debug=False): ''' Initialize the basics of the conversion process ''' if root_src is None: root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug) self.recursive = 0 self.max_recursive = max_recursive subtypes_apps = [ (mimes_office, self._winoffice), (mimes_ooxml, self._ooxml), (mimes_rtf, self.text), (mimes_libreoffice, self._libreoffice), (mimes_pdf, self._pdf), (mimes_xml, self.text), (mimes_ms, self._executables), (mimes_compressed, self._archive), (mimes_data, self._binary_app), ] self.subtypes_application = self._init_subtypes_application(subtypes_apps) types_metadata = [ (mimes_exif, self._metadata_exif), (mimes_png, self._metadata_png), ] self.metadata_processing_options = self._init_subtypes_application(types_metadata) self.mime_processing_options = { 'text': self.text, 'audio': self.audio, 'image': self.image, 'video': self.video, 'application': self.application, 'example': self.example, 'message': self.message, 'model': self.model, 'multipart': self.multipart, 'inode': self.inode, } # ##### Helpers ##### def _init_subtypes_application(self, subtypes_application): ''' Create the Dict to pick the right function based on the sub mime type ''' to_return = {} for list_subtypes, fct in subtypes_application: for st in list_subtypes: to_return[st] = fct return to_return def _print_log(self): ''' Print the logs related to the current file being processed ''' tmp_log = self.log_name.fields(**self.cur_file.log_details) if self.cur_file.is_dangerous(): tmp_log.warning(self.cur_file.log_string) elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'): tmp_log.info(self.cur_file.log_string) else: tmp_log.debug(self.cur_file.log_string) def _run_process(self, command_line, timeout=0, background=False): '''Run subprocess, wait until it finishes''' if timeout != 0: deadline = time.time() + timeout else: deadline = None args = shlex.split(command_line) with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout: p = subprocess.Popen(args, stdout=stdout, stderr=stderr) if background: # This timer is here to make sure the unoconv listener is properly started. time.sleep(10) return True while True: code = p.poll() if code is not None: break if deadline is not None and time.time() > deadline: p.kill() break time.sleep(1) return True ####################### # ##### Discarded mime types, reason in the comments ###### def inode(self): ''' Usually empty file. No reason (?) to copy it on the dest key''' if self.cur_file.is_symlink(): self.cur_file.log_string += 'Symlink to {}'.format(self.log_details['symlink']) else: self.cur_file.log_string += 'Inode file' def unknown(self): ''' This main type is unknown, that should not happen ''' self.cur_file.log_string += 'Unknown file' # ##### Threated as malicious, no reason to have it on a USB key ###### def example(self): '''Way to process example file''' self.cur_file.log_string += 'Example file' self.cur_file.make_dangerous() self._safe_copy() def message(self): '''Way to process message file''' self.cur_file.log_string += 'Message file' self.cur_file.make_dangerous() self._safe_copy() def model(self): '''Way to process model file''' self.cur_file.log_string += 'Model file' self.cur_file.make_dangerous() self._safe_copy() def multipart(self): '''Way to process multipart file''' self.cur_file.log_string += 'Multipart file' self.cur_file.make_dangerous() self._safe_copy() ####################### # ##### Converted ###### def text(self): for r in mimes_rtf: if r in self.cur_file.sub_type: self.cur_file.log_string += 'Rich Text file' # TODO: need a way to convert it to plain text self.cur_file.force_ext('.txt') self._safe_copy() return for o in mimes_ooxml: if o in self.cur_file.sub_type: self.cur_file.log_string += 'OOXML File' self._ooxml() return self.cur_file.log_string += 'Text file' self.cur_file.force_ext('.txt') self._safe_copy() def application(self): ''' Everything can be there, using the subtype to decide ''' for subtype, fct in self.subtypes_application.items(): if subtype in self.cur_file.sub_type: fct() self.cur_file.log_string += 'Application file' return self.cur_file.log_string += 'Unknown Application file' self._unknown_app() def _executables(self): '''Way to process executable file''' self.cur_file.add_log_details('processing_type', 'executable') self.cur_file.make_dangerous() self._safe_copy() def _winoffice(self): self.cur_file.add_log_details('processing_type', 'WinOffice') # Try as if it is a valid document oid = oletools.oleid.OleID(self.cur_file.src_path) if not olefile.isOleFile(self.cur_file.src_path): # Manual processing, may already count as suspicious try: ole = olefile.OleFileIO(self.cur_file.src_path, raise_defects=olefile.DEFECT_INCORRECT) except: self.cur_file.add_log_details('not_parsable', True) self.cur_file.make_dangerous() if ole.parsing_issues: self.cur_file.add_log_details('parsing_issues', True) self.cur_file.make_dangerous() else: if ole.exists('macros/vba') or ole.exists('Macros') \ or ole.exists('_VBA_PROJECT_CUR') or ole.exists('VBA'): self.cur_file.add_log_details('macro', True) self.cur_file.make_dangerous() else: indicators = oid.check() # Encrypted ban be set by multiple checks on the script if oid.encrypted.value: self.cur_file.add_log_details('encrypted', True) self.cur_file.make_dangerous() if oid.macros.value or oid.ole.exists('macros/vba') or oid.ole.exists('Macros') \ or oid.ole.exists('_VBA_PROJECT_CUR') or oid.ole.exists('VBA'): self.cur_file.add_log_details('macro', True) self.cur_file.make_dangerous() for i in indicators: if i._id == 'ObjectPool' and i.value: # FIXME: Is it suspicious? self.cur_file.add_log_details('objpool', True) elif i._id == 'flash' and i.value: self.cur_file.add_log_details('flash', True) self.cur_file.make_dangerous() self._safe_copy() def _ooxml(self): self.cur_file.add_log_details('processing_type', 'ooxml') try: doc = officedissector.doc.Document(self.cur_file.src_path) except Exception: # Invalid file self.cur_file.make_dangerous() self._safe_copy() return # There are probably other potentially malicious features: # fonts, custom props, custom XML if doc.is_macro_enabled or len(doc.features.macros) > 0: self.cur_file.add_log_details('macro', True) self.cur_file.make_dangerous() if len(doc.features.embedded_controls) > 0: self.cur_file.add_log_details('activex', True) self.cur_file.make_dangerous() if len(doc.features.embedded_objects) > 0: # Exploited by CVE-2014-4114 (OLE) self.cur_file.add_log_details('embedded_obj', True) self.cur_file.make_dangerous() if len(doc.features.embedded_packages) > 0: self.cur_file.add_log_details('embedded_pack', True) self.cur_file.make_dangerous() self._safe_copy() def _libreoffice(self): self.cur_file.add_log_details('processing_type', 'libreoffice') # As long as there ar no way to do a sanity check on the files => dangerous try: lodoc = zipfile.ZipFile(self.cur_file.src_path, 'r') except: self.cur_file.add_log_details('invalid', True) self.cur_file.make_dangerous() for f in lodoc.infolist(): fname = f.filename.lower() if fname.startswith('script') or fname.startswith('basic') or \ fname.startswith('object') or fname.endswith('.bin'): self.cur_file.add_log_details('macro', True) self.cur_file.make_dangerous() self._safe_copy() def _pdf(self): '''Way to process PDF file''' self.cur_file.add_log_details('processing_type', 'pdf') xmlDoc = PDFiD(self.cur_file.src_path) oPDFiD = cPDFiD(xmlDoc, True) # TODO: other keywords? if oPDFiD.encrypt > 0: self.cur_file.add_log_details('encrypted', True) self.cur_file.make_dangerous() if oPDFiD.js > 0 or oPDFiD.javascript > 0: self.cur_file.add_log_details('javascript', True) self.cur_file.make_dangerous() if oPDFiD.aa > 0 or oPDFiD.openaction > 0: self.cur_file.add_log_details('openaction', True) self.cur_file.make_dangerous() if oPDFiD.richmedia > 0: self.cur_file.add_log_details('flash', True) self.cur_file.make_dangerous() if oPDFiD.launch > 0: self.cur_file.add_log_details('launch', True) self.cur_file.make_dangerous() def _archive(self): '''Way to process Archive''' self.cur_file.add_log_details('processing_type', 'archive') self.cur_file.is_recursive = True self.cur_file.log_string += 'Archive extracted, processing content.' tmpdir = self.cur_file.dst_path + '_temp' self._safe_mkdir(tmpdir) extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ, self.cur_file.src_path, tmpdir) self._run_process(extract_command) self.recursive += 1 self.tree(tmpdir) self.processdir(tmpdir, self.cur_file.dst_path) self.recursive -= 1 self._safe_rmtree(tmpdir) def _unknown_app(self): '''Way to process an unknown file''' self.cur_file.make_unknown() self._safe_copy() def _binary_app(self): '''Way to process an unknown binary file''' self.cur_file.make_binary() self._safe_copy() ####################### # Metadata extractors def _metadata_exif(self, metadataFile): img = open(self.cur_file.src_path, 'rb') tags = None try: tags = exifread.process_file(img, debug=True) except Exception as e: print("Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.cur_file.src_path)) print(e) if tags == None: try: tags = exifread.process_file(img, debug=True) except Exception as e: print("Failed to get any metadata for file {}.".format(self.cur_file.src_path)) print(e) img.close() return False for tag in sorted(tags.keys()): # These are long and obnoxious/binary if tag not in ('JPEGThumbnail', 'TIFFThumbnail'): printable = str(tags[tag]) #Exifreader truncates data. if len(printable) > 25 and printable.endswith(", ... ]"): value = tags[tag].values if isinstance(value, basestring): printable = value else: printable = str(value) metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable)) self.cur_file.add_log_details('metadata', 'exif') img.close() return True def _metadata_png(self, metadataFile): warnings.simplefilter('error', Image.DecompressionBombWarning) try: img = Image.open(self.cur_file.src_path) for tag in sorted(img.info.keys()): # These are long and obnoxious/binary if tag not in ('icc_profile'): metadataFile.write("Key: {}\tValue: {}\n".format(tag, img.info[tag])) self.cur_file.add_log_details('metadata', 'png') img.close() # Catch decompression bombs except Exception as e: print("Caught exception processing metadata for {}".format(self.cur_file.src_path)) print(e) self.cur_file.make_dangerous() self._safe_copy() return False def extract_metadata(self): metadataFile = self._safe_metadata_split(".metadata.txt") success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile) metadataFile.close() if not success: pass #FIXME Delete empty metadata file ####################### # ##### Not converted, checking the mime type ###### def audio(self): '''Way to process an audio file''' self.cur_file.log_string += 'Audio file' self._media_processing() def image(self): '''Way to process an image''' if self.cur_file.has_metadata(): self.extract_metadata() ## FIXME make sure this works for png, gif, tiff # Create a temp directory dst_dir, filename = os.path.split(self.cur_file.dst_path) tmpdir = os.path.join(dst_dir, 'temp') tmppath = os.path.join(tmpdir, filename) self._safe_mkdir(tmpdir) # Do our image conversions warnings.simplefilter('error', Image.DecompressionBombWarning) try: imIn = Image.open(self.cur_file.src_path) imOut = Image.frombytes(imIn.mode, imIn.size, imIn.tobytes()) imOut.save(tmppath) #Copy the file back out and cleanup self._safe_copy(tmppath) self._safe_rmtree(tmpdir) # Catch decompression bombs except Exception as e: print("Caught exception (possible decompression bomb?) while translating file {}.".format(self.cur_file.src_path)) print(e) self.cur_file.make_dangerous() self._safe_copy() self.cur_file.log_string += 'Image file' self.cur_file.add_log_details('processing_type', 'image') def video(self): '''Way to process a video''' self.cur_file.log_string += 'Video file' self._media_processing() def _media_processing(self): '''Generic way to process all the media files''' self.cur_file.add_log_details('processing_type', 'media') self._safe_copy() ####################### def processdir(self, src_dir=None, dst_dir=None): ''' Main function doing the processing ''' if src_dir is None: src_dir = self.src_root_dir if dst_dir is None: dst_dir = self.dst_root_dir if self.recursive > 0: self._print_log() if self.recursive >= self.max_recursive: self.cur_log.warning('ARCHIVE BOMB.') self.cur_log.warning('The content of the archive contains recursively other archives.') self.cur_log.warning('This is a bad sign so the archive is not extracted to the destination key.') self._safe_rmtree(src_dir) if src_dir.endswith('_temp'): archbomb_path = src_dir[:-len('_temp')] self._safe_remove(archbomb_path) for srcpath in self._list_all_files(src_dir): self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir)) self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''), self.cur_file.main_type, self.cur_file.sub_type) if not self.cur_file.is_dangerous(): self.mime_processing_options.get(self.cur_file.main_type, self.unknown)() else: self._safe_copy() if not self.cur_file.is_recursive: self._print_log() if __name__ == '__main__': main(KittenGroomerFileCheck, 'Generic version of the KittenGroomer. Convert and rename files.')