mirror of https://github.com/CIRCL/PyCIRCLean
Merge pull request #2 from Dymaxion00/master
Initial working version of EXIF splitting and image format validation…pull/9/head
commit
34e7075609
117
bin/filecheck.py
117
bin/filecheck.py
|
@ -11,6 +11,11 @@ import oletools.oleid
|
|||
import olefile
|
||||
import officedissector
|
||||
|
||||
import warnings
|
||||
import exifread
|
||||
from PIL import Image
|
||||
from PIL import PngImagePlugin
|
||||
|
||||
from pdfid import PDFiD, cPDFiD
|
||||
|
||||
from kittengroomer import FileBase, KittenGroomerBase, main
|
||||
|
@ -30,6 +35,13 @@ mimes_compressed = ['zip', 'rar', 'bzip2', 'lzip', 'lzma', 'lzop',
|
|||
'xz', 'compress', 'gzip', 'tar']
|
||||
mimes_data = ['octet-stream']
|
||||
|
||||
# Prepare image/<subtype>
|
||||
mimes_exif = ['image/jpeg', 'image/tiff']
|
||||
mimes_png = ['image/png']
|
||||
|
||||
# Mime types we can pull metadata from
|
||||
mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png']
|
||||
|
||||
# Aliases
|
||||
aliases = {
|
||||
# Win executables
|
||||
|
@ -120,6 +132,11 @@ class File(FileBase):
|
|||
# there are no known extensions associated to this mimetype.
|
||||
pass
|
||||
|
||||
def has_metadata(self):
|
||||
if self.mimetype in mimes_metadata:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class KittenGroomerFileCheck(KittenGroomerBase):
|
||||
|
||||
|
@ -149,6 +166,12 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
]
|
||||
self.subtypes_application = self._init_subtypes_application(subtypes_apps)
|
||||
|
||||
types_metadata = [
|
||||
(mimes_exif, self._metadata_exif),
|
||||
(mimes_png, self._metadata_png),
|
||||
]
|
||||
self.metadata_processing_options = self._init_subtypes_application(types_metadata)
|
||||
|
||||
self.mime_processing_options = {
|
||||
'text': self.text,
|
||||
'audio': self.audio,
|
||||
|
@ -401,17 +424,109 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
self._safe_copy()
|
||||
|
||||
#######################
|
||||
# Metadata extractors
|
||||
def _metadata_exif(self, metadataFile):
|
||||
img = open(self.cur_file.src_path, 'rb')
|
||||
tags = None
|
||||
|
||||
try:
|
||||
tags = exifread.process_file(img, debug=True)
|
||||
except Exception as e:
|
||||
print("Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.cur_file.src_path))
|
||||
print(e)
|
||||
if tags == None:
|
||||
try:
|
||||
tags = exifread.process_file(img, debug=True)
|
||||
except Exception as e:
|
||||
print("Failed to get any metadata for file {}.".format(self.cur_file.src_path))
|
||||
print(e)
|
||||
img.close()
|
||||
return False
|
||||
|
||||
for tag in sorted(tags.keys()):
|
||||
# These are long and obnoxious/binary
|
||||
if tag not in ('JPEGThumbnail', 'TIFFThumbnail'):
|
||||
printable = str(tags[tag])
|
||||
|
||||
#Exifreader truncates data.
|
||||
if len(printable) > 25 and printable.endswith(", ... ]"):
|
||||
value = tags[tag].values
|
||||
if isinstance(value, basestring):
|
||||
printable = value
|
||||
else:
|
||||
printable = str(value)
|
||||
metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable))
|
||||
self.cur_file.add_log_details('metadata', 'exif')
|
||||
img.close()
|
||||
return True
|
||||
|
||||
def _metadata_png(self, metadataFile):
|
||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||
try:
|
||||
img = Image.open(self.cur_file.src_path)
|
||||
for tag in sorted(img.info.keys()):
|
||||
# These are long and obnoxious/binary
|
||||
if tag not in ('icc_profile'):
|
||||
metadataFile.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
|
||||
self.cur_file.add_log_details('metadata', 'png')
|
||||
img.close()
|
||||
# Catch decompression bombs
|
||||
except Exception as e:
|
||||
print("Caught exception processing metadata for {}".format(self.cur_file.src_path))
|
||||
print(e)
|
||||
self.cur_file.make_dangerous()
|
||||
self._safe_copy()
|
||||
return False
|
||||
|
||||
|
||||
def extract_metadata(self):
|
||||
metadataFile = self._safe_metadata_split(".metadata.txt")
|
||||
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile)
|
||||
metadataFile.close()
|
||||
if not success:
|
||||
pass #FIXME Delete empty metadata file
|
||||
|
||||
#######################
|
||||
# ##### Not converted, checking the mime type ######
|
||||
def audio(self):
|
||||
'''Way to process an audio file'''
|
||||
self.cur_file.log_string += 'Audio file'
|
||||
self._media_processing()
|
||||
|
||||
|
||||
def image(self):
|
||||
'''Way to process an image'''
|
||||
if self.cur_file.has_metadata():
|
||||
self.extract_metadata()
|
||||
|
||||
## FIXME make sure this works for png, gif, tiff
|
||||
# Create a temp directory
|
||||
dst_dir, filename = os.path.split(self.cur_file.dst_path)
|
||||
tmpdir = os.path.join(dst_dir, 'temp')
|
||||
tmppath = os.path.join(tmpdir, filename)
|
||||
self._safe_mkdir(tmpdir)
|
||||
|
||||
# Do our image conversions
|
||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||
try:
|
||||
imIn = Image.open(self.cur_file.src_path)
|
||||
imOut = Image.frombytes(imIn.mode, imIn.size, imIn.tobytes())
|
||||
imOut.save(tmppath)
|
||||
|
||||
#Copy the file back out and cleanup
|
||||
self._safe_copy(tmppath)
|
||||
self._safe_rmtree(tmpdir)
|
||||
|
||||
# Catch decompression bombs
|
||||
except Exception as e:
|
||||
print("Caught exception (possible decompression bomb?) while translating file {}.".format(self.cur_file.src_path))
|
||||
print(e)
|
||||
self.cur_file.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
self.cur_file.log_string += 'Image file'
|
||||
self._media_processing()
|
||||
self.cur_file.add_log_details('processing_type', 'image')
|
||||
|
||||
|
||||
def video(self):
|
||||
'''Way to process a video'''
|
||||
|
|
|
@ -194,12 +194,12 @@ class KittenGroomerBase(object):
|
|||
os.remove(filepath)
|
||||
|
||||
def _safe_mkdir(self, directory):
|
||||
'''Remove a directory if it exists'''
|
||||
'''Make a directory if it does not exist'''
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
|
||||
def _safe_copy(self, src=None, dst=None):
|
||||
''' Copy a file and create directory if needed '''
|
||||
''' Copy a file and create directory if needed'''
|
||||
if src is None:
|
||||
src = self.cur_file.src_path
|
||||
if dst is None:
|
||||
|
@ -214,8 +214,24 @@ class KittenGroomerBase(object):
|
|||
print(e)
|
||||
return False
|
||||
|
||||
def _safe_metadata_split(self, ext):
|
||||
'''Create a separate file to hold this file's metadata'''
|
||||
dst = self.cur_file.dst_path
|
||||
try:
|
||||
if os.path.exists(self.cur_file.src_path+ext):
|
||||
raise KittenGroomerError("Cannot create split metadata file for \"" +
|
||||
self.cur_file.dst_path + "\", type '"
|
||||
+ ext + "': File exists.")
|
||||
dst_path, filename = os.path.split(dst)
|
||||
self._safe_mkdir(dst_path)
|
||||
return open(dst+ext, 'w+')
|
||||
except Exception as e:
|
||||
# TODO: Logfile
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def _list_all_files(self, directory):
|
||||
''' Generate an iterator over all the files in a directory tree '''
|
||||
''' Generate an iterator over all the files in a directory tree'''
|
||||
for root, dirs, files in os.walk(directory):
|
||||
for filename in files:
|
||||
filepath = os.path.join(root, filename)
|
||||
|
|
Loading…
Reference in New Issue