diff --git a/bin/filecheck.py b/bin/filecheck.py index c5ceeaf..4da756e 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -11,6 +11,11 @@ import oletools.oleid import olefile import officedissector +import warnings +import exifread +from PIL import Image +from PIL import PngImagePlugin + from pdfid import PDFiD, cPDFiD from kittengroomer import FileBase, KittenGroomerBase, main @@ -30,6 +35,13 @@ mimes_compressed = ['zip', 'rar', 'bzip2', 'lzip', 'lzma', 'lzop', 'xz', 'compress', 'gzip', 'tar'] mimes_data = ['octet-stream'] +# Prepare image/ +mimes_exif = ['image/jpeg', 'image/tiff'] +mimes_png = ['image/png'] + +# Mime types we can pull metadata from +mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png'] + # Aliases aliases = { # Win executables @@ -120,6 +132,11 @@ class File(FileBase): # there are no known extensions associated to this mimetype. pass + def has_metadata(self): + if self.mimetype in mimes_metadata: + return True + return False + class KittenGroomerFileCheck(KittenGroomerBase): @@ -149,6 +166,12 @@ class KittenGroomerFileCheck(KittenGroomerBase): ] self.subtypes_application = self._init_subtypes_application(subtypes_apps) + types_metadata = [ + (mimes_exif, self._metadata_exif), + (mimes_png, self._metadata_png), + ] + self.metadata_processing_options = self._init_subtypes_application(types_metadata) + self.mime_processing_options = { 'text': self.text, 'audio': self.audio, @@ -401,17 +424,109 @@ class KittenGroomerFileCheck(KittenGroomerBase): self._safe_copy() ####################### + # Metadata extractors + def _metadata_exif(self, metadataFile): + img = open(self.cur_file.src_path, 'rb') + tags = None + + try: + tags = exifread.process_file(img, debug=True) + except Exception as e: + print("Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.cur_file.src_path)) + print(e) + if tags == None: + try: + tags = exifread.process_file(img, debug=True) + except Exception as e: + print("Failed to get any metadata for file {}.".format(self.cur_file.src_path)) + print(e) + img.close() + return False + + for tag in sorted(tags.keys()): + # These are long and obnoxious/binary + if tag not in ('JPEGThumbnail', 'TIFFThumbnail'): + printable = str(tags[tag]) + #Exifreader truncates data. + if len(printable) > 25 and printable.endswith(", ... ]"): + value = tags[tag].values + if isinstance(value, basestring): + printable = value + else: + printable = str(value) + metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable)) + self.cur_file.add_log_details('metadata', 'exif') + img.close() + return True + + def _metadata_png(self, metadataFile): + warnings.simplefilter('error', Image.DecompressionBombWarning) + try: + img = Image.open(self.cur_file.src_path) + for tag in sorted(img.info.keys()): + # These are long and obnoxious/binary + if tag not in ('icc_profile'): + metadataFile.write("Key: {}\tValue: {}\n".format(tag, img.info[tag])) + self.cur_file.add_log_details('metadata', 'png') + img.close() + # Catch decompression bombs + except Exception as e: + print("Caught exception processing metadata for {}".format(self.cur_file.src_path)) + print(e) + self.cur_file.make_dangerous() + self._safe_copy() + return False + + + def extract_metadata(self): + metadataFile = self._safe_metadata_split(".metadata.txt") + success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile) + metadataFile.close() + if not success: + pass #FIXME Delete empty metadata file + + ####################### # ##### Not converted, checking the mime type ###### def audio(self): '''Way to process an audio file''' self.cur_file.log_string += 'Audio file' self._media_processing() + def image(self): '''Way to process an image''' + if self.cur_file.has_metadata(): + self.extract_metadata() + + ## FIXME make sure this works for png, gif, tiff + # Create a temp directory + dst_dir, filename = os.path.split(self.cur_file.dst_path) + tmpdir = os.path.join(dst_dir, 'temp') + tmppath = os.path.join(tmpdir, filename) + self._safe_mkdir(tmpdir) + + # Do our image conversions + warnings.simplefilter('error', Image.DecompressionBombWarning) + try: + imIn = Image.open(self.cur_file.src_path) + imOut = Image.frombytes(imIn.mode, imIn.size, imIn.tobytes()) + imOut.save(tmppath) + + #Copy the file back out and cleanup + self._safe_copy(tmppath) + self._safe_rmtree(tmpdir) + + # Catch decompression bombs + except Exception as e: + print("Caught exception (possible decompression bomb?) while translating file {}.".format(self.cur_file.src_path)) + print(e) + self.cur_file.make_dangerous() + self._safe_copy() + self.cur_file.log_string += 'Image file' - self._media_processing() + self.cur_file.add_log_details('processing_type', 'image') + def video(self): '''Way to process a video''' diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 5c578a7..90c7919 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -194,12 +194,12 @@ class KittenGroomerBase(object): os.remove(filepath) def _safe_mkdir(self, directory): - '''Remove a directory if it exists''' + '''Make a directory if it does not exist''' if not os.path.exists(directory): os.makedirs(directory) def _safe_copy(self, src=None, dst=None): - ''' Copy a file and create directory if needed ''' + ''' Copy a file and create directory if needed''' if src is None: src = self.cur_file.src_path if dst is None: @@ -214,8 +214,24 @@ class KittenGroomerBase(object): print(e) return False + def _safe_metadata_split(self, ext): + '''Create a separate file to hold this file's metadata''' + dst = self.cur_file.dst_path + try: + if os.path.exists(self.cur_file.src_path+ext): + raise KittenGroomerError("Cannot create split metadata file for \"" + + self.cur_file.dst_path + "\", type '" + + ext + "': File exists.") + dst_path, filename = os.path.split(dst) + self._safe_mkdir(dst_path) + return open(dst+ext, 'w+') + except Exception as e: + # TODO: Logfile + print(e) + return False + def _list_all_files(self, directory): - ''' Generate an iterator over all the files in a directory tree ''' + ''' Generate an iterator over all the files in a directory tree''' for root, dirs, files in os.walk(directory): for filename in files: filepath = os.path.join(root, filename)