mirror of https://github.com/CIRCL/PyCIRCLean
Refactor metadata processing code
parent
e2af701ac9
commit
92d1b1cd93
|
@ -427,7 +427,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
# Metadata extractors
|
# Metadata extractors
|
||||||
def _metadata_exif(self, metadata_file):
|
def _metadata_exif(self, metadata_file_path):
|
||||||
img = open(self.cur_file.src_path, 'rb')
|
img = open(self.cur_file.src_path, 'rb')
|
||||||
tags = None
|
tags = None
|
||||||
|
|
||||||
|
@ -457,19 +457,22 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
printable = value
|
printable = value
|
||||||
else:
|
else:
|
||||||
printable = str(value)
|
printable = str(value)
|
||||||
|
|
||||||
|
with open(metadata_file_path, 'w+') as metadata_file:
|
||||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
|
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
|
||||||
self.cur_file.add_log_details('metadata', 'exif')
|
self.cur_file.add_log_details('metadata', 'exif')
|
||||||
img.close()
|
img.close()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _metadata_png(self, metadataFile):
|
def _metadata_png(self, metadata_file_path):
|
||||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||||
try:
|
try:
|
||||||
img = Image.open(self.cur_file.src_path)
|
img = Image.open(self.cur_file.src_path)
|
||||||
for tag in sorted(img.info.keys()):
|
for tag in sorted(img.info.keys()):
|
||||||
# These are long and obnoxious/binary
|
# These are long and obnoxious/binary
|
||||||
if tag not in ('icc_profile'):
|
if tag not in ('icc_profile'):
|
||||||
metadataFile.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
|
with open(metadata_file_path, 'w+') as metadata_file:
|
||||||
|
metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
|
||||||
self.cur_file.add_log_details('metadata', 'png')
|
self.cur_file.add_log_details('metadata', 'png')
|
||||||
img.close()
|
img.close()
|
||||||
# Catch decompression bombs
|
# Catch decompression bombs
|
||||||
|
@ -481,12 +484,12 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def extract_metadata(self):
|
def extract_metadata(self):
|
||||||
metadata_file = self._safe_metadata_split(".metadata.txt")
|
metadata_file_path = self.cur_file.create_metadata_file(".metadata.txt")
|
||||||
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadata_file)
|
# todo: write metadata to file
|
||||||
metadata_file.close()
|
mime = self.cur_file.mimetype
|
||||||
if not success:
|
metadata_processing_method = self.metadata_processing_options.get(mime)
|
||||||
# FIXME Delete empty metadata file
|
if metadata_processing_method:
|
||||||
pass
|
metadata_processing_method(metadata_file_path)
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
# ##### Media - audio and video aren't converted ######
|
# ##### Media - audio and video aren't converted ######
|
||||||
|
|
|
@ -154,6 +154,25 @@ class FileBase(object):
|
||||||
self.log_details['force_ext'] = True
|
self.log_details['force_ext'] = True
|
||||||
self.dst_path += ext
|
self.dst_path += ext
|
||||||
|
|
||||||
|
def create_metadata_file(self, ext):
|
||||||
|
"""Create a separate file to hold this file's metadata."""
|
||||||
|
try:
|
||||||
|
# make sure we aren't overwriting anything
|
||||||
|
if os.path.exists(self.src_path + ext):
|
||||||
|
raise KittenGroomerError("Cannot create split metadata file for \"" +
|
||||||
|
self.dst_path + "\", type '" +
|
||||||
|
ext + "': File exists.")
|
||||||
|
else:
|
||||||
|
# TODO: Uncomment these after object relationships are fixed
|
||||||
|
# dst_dir_path, filename = os.path.split(self.dst_path)
|
||||||
|
# self._safe_mkdir(dst_dir_path)
|
||||||
|
# TODO: Check extension for leading "."
|
||||||
|
self.metadata_file_path = self.dst_path + ext
|
||||||
|
return self.metadata_file_path
|
||||||
|
except KittenGroomerError as e:
|
||||||
|
# TODO: Write to log file
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class KittenGroomerBase(object):
|
class KittenGroomerBase(object):
|
||||||
"""Base object responsible for copy/sanitization process."""
|
"""Base object responsible for copy/sanitization process."""
|
||||||
|
@ -243,25 +262,8 @@ class KittenGroomerBase(object):
|
||||||
print(e)
|
print(e)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _safe_metadata_split(self, ext):
|
|
||||||
"""Create a separate file to hold this file's metadata."""
|
|
||||||
# TODO: fix logic in this method
|
|
||||||
dst = self.cur_file.dst_path
|
|
||||||
try:
|
|
||||||
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
|
|
||||||
raise KittenGroomerError("Cannot create split metadata file for \"" +
|
|
||||||
self.cur_file.dst_path + "\", type '" +
|
|
||||||
ext + "': File exists.")
|
|
||||||
dst_path, filename = os.path.split(dst)
|
|
||||||
self._safe_mkdir(dst_path)
|
|
||||||
return open(dst + ext, 'w+')
|
|
||||||
except Exception as e:
|
|
||||||
# TODO: Logfile
|
|
||||||
print(e)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _list_all_files(self, directory):
|
def _list_all_files(self, directory):
|
||||||
"""Generator yield path to all of the files in a directory tree."""
|
"""Generator yielding path to all of the files in a directory tree."""
|
||||||
for root, dirs, files in os.walk(directory):
|
for root, dirs, files in os.walk(directory):
|
||||||
for filename in files:
|
for filename in files:
|
||||||
filepath = os.path.join(root, filename)
|
filepath = os.path.join(root, filename)
|
||||||
|
|
|
@ -213,6 +213,16 @@ class TestFileBase:
|
||||||
assert generic_conf_file.log_details.get('force_ext') is None
|
assert generic_conf_file.log_details.get('force_ext') is None
|
||||||
# shouldn't change a file's extension if it already is right
|
# shouldn't change a file's extension if it already is right
|
||||||
|
|
||||||
|
def test_create_metadata_file(self, temp_file):
|
||||||
|
# Try making a metadata file
|
||||||
|
metadata_file_path = temp_file.create_metadata_file('.metadata.txt')
|
||||||
|
with open(metadata_file_path, 'w+') as metadata_file:
|
||||||
|
metadata_file.write('Have some metadata!')
|
||||||
|
# Shouldn't be able to make a metadata file with no extension
|
||||||
|
assert temp_file.create_metadata_file('') is False
|
||||||
|
# if metadata file already exists
|
||||||
|
# if there is no metadata to write should this work?
|
||||||
|
|
||||||
|
|
||||||
class TestKittenGroomerBase:
|
class TestKittenGroomerBase:
|
||||||
|
|
||||||
|
@ -258,18 +268,6 @@ class TestKittenGroomerBase:
|
||||||
assert simple_groomer._safe_copy() is True
|
assert simple_groomer._safe_copy() is True
|
||||||
#check that it handles weird file path inputs
|
#check that it handles weird file path inputs
|
||||||
|
|
||||||
def test_safe_metadata_split(self, tmpdir):
|
|
||||||
file = tmpdir.join('test.txt')
|
|
||||||
file.write('testing')
|
|
||||||
simple_groomer = KittenGroomerBase(tmpdir.strpath, tmpdir.strpath)
|
|
||||||
simple_groomer.cur_file = FileBase(file.strpath, file.strpath)
|
|
||||||
metadata_file = simple_groomer._safe_metadata_split('metadata.log')
|
|
||||||
metadata_file.write('Have some metadata!')
|
|
||||||
metadata_file.close()
|
|
||||||
assert simple_groomer._safe_metadata_split('') is False
|
|
||||||
# if metadata file already exists
|
|
||||||
# if there is no metadata to write should this work?
|
|
||||||
|
|
||||||
def test_list_all_files(self, tmpdir):
|
def test_list_all_files(self, tmpdir):
|
||||||
file = tmpdir.join('test.txt')
|
file = tmpdir.join('test.txt')
|
||||||
file.write('testing')
|
file.write('testing')
|
||||||
|
|
Loading…
Reference in New Issue