mirror of https://github.com/CIRCL/PyCIRCLean
chg: Put all open in with statements
parent
9d65f2b79c
commit
56f0ebb442
|
@ -493,47 +493,44 @@ class File(FileBase):
|
||||||
def _metadata_exif(self, metadata_file_path):
|
def _metadata_exif(self, metadata_file_path):
|
||||||
"""Read exif metadata from a jpg or tiff file using exifread."""
|
"""Read exif metadata from a jpg or tiff file using exifread."""
|
||||||
# TODO: can we shorten this method somehow?
|
# TODO: can we shorten this method somehow?
|
||||||
img = open(self.src_path, 'rb')
|
with open(self.src_path, 'rb') as img:
|
||||||
tags = None
|
tags = None
|
||||||
try:
|
|
||||||
tags = exifread.process_file(img, debug=True)
|
|
||||||
except Exception as e:
|
|
||||||
self.add_error(e, "Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path))
|
|
||||||
if tags is None:
|
|
||||||
try:
|
try:
|
||||||
tags = exifread.process_file(img, debug=True)
|
tags = exifread.process_file(img, debug=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.add_error(e, "Failed to get any metadata for file {}.".format(self.src_path))
|
self.add_error(e, "Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path))
|
||||||
img.close()
|
if tags is None:
|
||||||
return False
|
try:
|
||||||
for tag in sorted(tags.keys()):
|
tags = exifread.process_file(img, debug=True)
|
||||||
# These tags are long and obnoxious/binary so we don't add them
|
except Exception as e:
|
||||||
if tag not in ('JPEGThumbnail', 'TIFFThumbnail'):
|
self.add_error(e, "Failed to get any metadata for file {}.".format(self.src_path))
|
||||||
tag_string = str(tags[tag])
|
return False
|
||||||
# Exifreader truncates data.
|
for tag in sorted(tags.keys()):
|
||||||
if len(tag_string) > 25 and tag_string.endswith(", ... ]"):
|
# These tags are long and obnoxious/binary so we don't add them
|
||||||
tag_value = tags[tag].values
|
if tag not in ('JPEGThumbnail', 'TIFFThumbnail'):
|
||||||
tag_string = str(tag_value)
|
tag_string = str(tags[tag])
|
||||||
with open(metadata_file_path, 'w+') as metadata_file:
|
# Exifreader truncates data.
|
||||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string))
|
if len(tag_string) > 25 and tag_string.endswith(", ... ]"):
|
||||||
# TODO: how do we want to log metadata?
|
tag_value = tags[tag].values
|
||||||
self.set_property('metadata', 'exif')
|
tag_string = str(tag_value)
|
||||||
img.close()
|
with open(metadata_file_path, 'w+') as metadata_file:
|
||||||
|
metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string))
|
||||||
|
# TODO: how do we want to log metadata?
|
||||||
|
self.set_property('metadata', 'exif')
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _metadata_png(self, metadata_file_path):
|
def _metadata_png(self, metadata_file_path):
|
||||||
"""Extract metadata from a png file using PIL/Pillow."""
|
"""Extract metadata from a png file using PIL/Pillow."""
|
||||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||||
try:
|
try:
|
||||||
img = Image.open(self.src_path)
|
with Image.open(self.src_path) as img:
|
||||||
for tag in sorted(img.info.keys()):
|
for tag in sorted(img.info.keys()):
|
||||||
# These are long and obnoxious/binary
|
# These are long and obnoxious/binary
|
||||||
if tag not in ('icc_profile'):
|
if tag not in ('icc_profile'):
|
||||||
with open(metadata_file_path, 'w+') as metadata_file:
|
with open(metadata_file_path, 'w+') as metadata_file:
|
||||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
|
metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
|
||||||
# LOG: handle metadata
|
# LOG: handle metadata
|
||||||
self.set_property('metadata', 'png')
|
self.set_property('metadata', 'png')
|
||||||
img.close()
|
|
||||||
except Exception as e: # Catch decompression bombs
|
except Exception as e: # Catch decompression bombs
|
||||||
# TODO: only catch DecompressionBombWarnings here?
|
# TODO: only catch DecompressionBombWarnings here?
|
||||||
self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path))
|
self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path))
|
||||||
|
@ -580,10 +577,10 @@ class File(FileBase):
|
||||||
tempfile_path = os.path.join(tempdir_path, self.filename)
|
tempfile_path = os.path.join(tempdir_path, self.filename)
|
||||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||||
try: # Do image conversions
|
try: # Do image conversions
|
||||||
img_in = Image.open(self.src_path)
|
with Image.open(self.src_path) as img_in:
|
||||||
img_out = Image.frombytes(img_in.mode, img_in.size, img_in.tobytes())
|
with Image.frombytes(img_in.mode, img_in.size, img_in.tobytes()) as img_out:
|
||||||
img_out.save(tempfile_path)
|
img_out.save(tempfile_path)
|
||||||
self.src_path = tempfile_path
|
self.src_path = tempfile_path
|
||||||
except Exception as e: # Catch decompression bombs
|
except Exception as e: # Catch decompression bombs
|
||||||
# TODO: change this from all Exceptions to specific DecompressionBombWarning
|
# TODO: change this from all Exceptions to specific DecompressionBombWarning
|
||||||
self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path))
|
self.add_error(e, "Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path))
|
||||||
|
|
Loading…
Reference in New Issue