mirror of https://github.com/CIRCL/PyCIRCLean
First working version with methods in File object
- All tests now passing with file handling methods on File object instead of Groomer object. - Logging functionality still isn't finished.pull/12/head
parent
9aafe6e518
commit
781d0a76af
193
bin/filecheck.py
193
bin/filecheck.py
|
@ -86,15 +86,9 @@ class File(FileBase):
|
|||
def __init__(self, src_path, dst_path, logger):
|
||||
super(File, self).__init__(src_path, dst_path, logger)
|
||||
self.is_recursive = False
|
||||
self._check_dangerous()
|
||||
if self.is_dangerous():
|
||||
return
|
||||
|
||||
self.log_details.update({'maintype': self.main_type,
|
||||
'subtype': self.sub_type,
|
||||
'extension': self.extension})
|
||||
self._check_extension()
|
||||
self._check_mime()
|
||||
|
||||
subtypes_apps = [
|
||||
(Config.mimes_office, self._winoffice),
|
||||
|
@ -129,8 +123,7 @@ class File(FileBase):
|
|||
}
|
||||
|
||||
def _check_dangerous(self):
|
||||
if not self.has_mimetype():
|
||||
# No mimetype, should not happen.
|
||||
if not self.has_mimetype(): # No mimetype, should not happen.
|
||||
self.make_dangerous()
|
||||
if not self.has_extension():
|
||||
self.make_dangerous()
|
||||
|
@ -147,7 +140,8 @@ class File(FileBase):
|
|||
if self.extension in Config.override_ext:
|
||||
expected_mimetype = Config.override_ext[self.extension]
|
||||
else:
|
||||
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
|
||||
expected_mimetype, encoding = mimetypes.guess_type(self.src_path,
|
||||
strict=False)
|
||||
if expected_mimetype in Config.aliases:
|
||||
expected_mimetype = Config.aliases[expected_mimetype]
|
||||
is_known_extension = self.extension in mimetypes.types_map.keys()
|
||||
|
@ -155,7 +149,7 @@ class File(FileBase):
|
|||
self.log_details.update({'expected_mimetype': expected_mimetype})
|
||||
self.make_dangerous()
|
||||
|
||||
def _check_mime(self):
|
||||
def _check_mimetype(self):
|
||||
"""Takes the mimetype (as determined by libmagic) and determines
|
||||
whether the list of extensions that are normally associated with
|
||||
that extension contains the file's actual extension."""
|
||||
|
@ -163,13 +157,17 @@ class File(FileBase):
|
|||
mimetype = Config.aliases[self.mimetype]
|
||||
else:
|
||||
mimetype = self.mimetype
|
||||
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
|
||||
expected_extensions = mimetypes.guess_all_extensions(mimetype,
|
||||
strict=False)
|
||||
if expected_extensions:
|
||||
if len(self.extension) > 0 and self.extension not in expected_extensions:
|
||||
if self.has_extension() and self.extension not in expected_extensions:
|
||||
self.log_details.update({'expected_extensions': expected_extensions})
|
||||
self.make_dangerous()
|
||||
|
||||
def check(self):
|
||||
self._check_dangerous()
|
||||
self._check_extension()
|
||||
self._check_mimetype()
|
||||
if not self.is_dangerous():
|
||||
self.mime_processing_options.get(self.main_type, self.unknown)()
|
||||
|
||||
|
@ -182,7 +180,7 @@ class File(FileBase):
|
|||
dict_to_return[subtype] = method
|
||||
return dict_to_return
|
||||
|
||||
def _write_log(self):
|
||||
def write_log(self):
|
||||
"""Print the logs related to the current file being processed."""
|
||||
# TODO: move to helpers?
|
||||
tmp_log = self.logger.log.fields(**self.log_details)
|
||||
|
@ -209,6 +207,13 @@ class File(FileBase):
|
|||
return
|
||||
return True
|
||||
|
||||
def _make_tempdir(self):
|
||||
"""Make a temporary directory."""
|
||||
self.tempdir_path = self.dst_path + '_temp'
|
||||
if not os.path.exists(self.tempdir_path):
|
||||
os.makedirs(self.tempdir_path)
|
||||
return self.tempdir_path
|
||||
|
||||
#######################
|
||||
# ##### Discarded mimetypes, reason in the docstring ######
|
||||
def inode(self):
|
||||
|
@ -235,13 +240,11 @@ class File(FileBase):
|
|||
"""Process a message file."""
|
||||
self.log_string += 'Message file'
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
def model(self):
|
||||
"""Process a model file."""
|
||||
self.log_string += 'Model file'
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
# ##### Files that will be converted ######
|
||||
def text(self):
|
||||
|
@ -251,7 +254,6 @@ class File(FileBase):
|
|||
self.log_string += 'Rich Text file'
|
||||
# TODO: need a way to convert it to plain text
|
||||
self.force_ext('.txt')
|
||||
self._safe_copy()
|
||||
return
|
||||
for mt in Config.mimes_ooxml:
|
||||
if mt in self.sub_type:
|
||||
|
@ -260,7 +262,6 @@ class File(FileBase):
|
|||
return
|
||||
self.log_string += 'Text file'
|
||||
self.force_ext('.txt')
|
||||
self._safe_copy()
|
||||
|
||||
def application(self):
|
||||
"""Processes an application specific file according to its subtype."""
|
||||
|
@ -276,7 +277,6 @@ class File(FileBase):
|
|||
"""Processes an executable file."""
|
||||
self.add_log_details('processing_type', 'executable')
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
def _winoffice(self):
|
||||
"""Processes a winoffice file using olefile/oletools."""
|
||||
|
@ -315,7 +315,6 @@ class File(FileBase):
|
|||
elif i.id == 'flash' and i.value:
|
||||
self.add_log_details('flash', True)
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
def _ooxml(self):
|
||||
"""Processes an ooxml file."""
|
||||
|
@ -325,7 +324,6 @@ class File(FileBase):
|
|||
except Exception:
|
||||
# Invalid file
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
return
|
||||
# There are probably other potentially malicious features:
|
||||
# fonts, custom props, custom XML
|
||||
|
@ -342,7 +340,6 @@ class File(FileBase):
|
|||
if len(doc.features.embedded_packages) > 0:
|
||||
self.add_log_details('embedded_pack', True)
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
def _libreoffice(self):
|
||||
"""Processes a libreoffice file."""
|
||||
|
@ -359,7 +356,6 @@ class File(FileBase):
|
|||
fname.startswith('object') or fname.endswith('.bin'):
|
||||
self.add_log_details('macro', True)
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
def _pdf(self):
|
||||
"""Processes a PDF file."""
|
||||
|
@ -390,37 +386,15 @@ class File(FileBase):
|
|||
bombs."""
|
||||
self.add_log_details('processing_type', 'archive')
|
||||
self.is_recursive = True
|
||||
self.log_string += 'Archive extracted, processing content.'
|
||||
tmpdir = self.dst_path + '_temp'
|
||||
self._safe_mkdir(tmpdir)
|
||||
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.src_path, tmpdir)
|
||||
self._run_process(extract_command)
|
||||
self.recursive_archive_depth += 1
|
||||
self.logger.tree(tmpdir)
|
||||
self.process_dir(tmpdir, self.dst_path)
|
||||
self.recursive_archive_depth -= 1
|
||||
self._safe_rmtree(tmpdir)
|
||||
|
||||
def _handle_archivebomb(self, src_dir):
|
||||
self.make_dangerous()
|
||||
self.add_log_details('Archive Bomb', True)
|
||||
self.log_name.warning('ARCHIVE BOMB.')
|
||||
self.log_name.warning('The content of the archive contains recursively other archives.')
|
||||
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
|
||||
self._safe_rmtree(src_dir)
|
||||
if src_dir.endswith('_temp'):
|
||||
bomb_path = src_dir[:-len('_temp')]
|
||||
self._safe_remove(bomb_path)
|
||||
# self.log_string += 'Archive extracted, processing content.'
|
||||
|
||||
def _unknown_app(self):
|
||||
"""Processes an unknown file."""
|
||||
self.make_unknown()
|
||||
self._safe_copy()
|
||||
|
||||
def _binary_app(self):
|
||||
"""Processses an unknown binary file."""
|
||||
self.make_binary()
|
||||
self._safe_copy()
|
||||
|
||||
#######################
|
||||
# Metadata extractors
|
||||
|
@ -431,12 +405,14 @@ class File(FileBase):
|
|||
try:
|
||||
tags = exifread.process_file(img, debug=True)
|
||||
except Exception as e:
|
||||
# TODO: log instead of print
|
||||
print("Error while trying to grab full metadata for file {}; retrying for partial data.".format(self.src_path))
|
||||
print(e)
|
||||
if tags is None:
|
||||
try:
|
||||
tags = exifread.process_file(img, debug=True)
|
||||
except Exception as e:
|
||||
# TODO: log instead of print
|
||||
print("Failed to get any metadata for file {}.".format(self.src_path))
|
||||
print(e)
|
||||
img.close()
|
||||
|
@ -450,10 +426,7 @@ class File(FileBase):
|
|||
# Exifreader truncates data.
|
||||
if len(printable) > 25 and printable.endswith(", ... ]"):
|
||||
value = tags[tag].values
|
||||
if isinstance(value, str):
|
||||
printable = value
|
||||
else:
|
||||
printable = str(value)
|
||||
printable = str(value)
|
||||
|
||||
with open(metadata_file_path, 'w+') as metadata_file:
|
||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
|
||||
|
@ -474,10 +447,10 @@ class File(FileBase):
|
|||
img.close()
|
||||
# Catch decompression bombs
|
||||
except Exception as e:
|
||||
# TODO: log instead of print
|
||||
print("Caught exception processing metadata for {}".format(self.src_path))
|
||||
print(e)
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
return False
|
||||
|
||||
def extract_metadata(self):
|
||||
|
@ -503,42 +476,29 @@ class File(FileBase):
|
|||
def _media_processing(self):
|
||||
"""Generic way to process all media files."""
|
||||
self.add_log_details('processing_type', 'media')
|
||||
self._safe_copy()
|
||||
|
||||
def image(self):
|
||||
"""Processes an image.
|
||||
|
||||
Extracts metadata if metadata is present. Creates a temporary
|
||||
directory, opens the using PIL.Image, saves it to the temporary
|
||||
directory, and copies it to the destination."""
|
||||
Extracts metadata to dest key if metadata is present. Creates a
|
||||
temporary directory on dest key, opens the using PIL.Image,saves it to
|
||||
the temporary directory, and copies it to the destination."""
|
||||
# TODO: make sure this method works for png, gif, tiff
|
||||
if self.has_metadata():
|
||||
self.extract_metadata()
|
||||
|
||||
# FIXME make sure this works for png, gif, tiff
|
||||
# Create a temp directory
|
||||
dst_dir, filename = os.path.split(self.dst_path)
|
||||
tmpdir = os.path.join(dst_dir, 'temp')
|
||||
tmppath = os.path.join(tmpdir, filename)
|
||||
self._safe_mkdir(tmpdir)
|
||||
|
||||
# Do our image conversions
|
||||
tempdir_path = self._make_tempdir()
|
||||
tempfile_path = os.path.join(tempdir_path, self.filename)
|
||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||
try:
|
||||
imIn = Image.open(self.src_path)
|
||||
imOut = Image.frombytes(imIn.mode, imIn.size, imIn.tobytes())
|
||||
imOut.save(tmppath)
|
||||
|
||||
# Copy the file back out and cleanup
|
||||
self._safe_copy(tmppath)
|
||||
self._safe_rmtree(tmpdir)
|
||||
|
||||
# Catch decompression bombs
|
||||
except Exception as e:
|
||||
try: # Do image conversions
|
||||
img_in = Image.open(self.src_path)
|
||||
img_out = Image.frombytes(img_in.mode, img_in.size, img_in.tobytes())
|
||||
img_out.save(tempfile_path)
|
||||
self.src_path = tempfile_path
|
||||
except Exception as e: # Catch decompression bombs
|
||||
# TODO: change this from printing to logging
|
||||
print("Caught exception (possible decompression bomb?) while translating file {}.".format(self.src_path))
|
||||
print(e)
|
||||
self.make_dangerous()
|
||||
self._safe_copy()
|
||||
|
||||
self.log_string += 'Image file'
|
||||
self.add_log_details('processing_type', 'image')
|
||||
|
||||
|
@ -549,33 +509,72 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
|
||||
self.recursive_archive_depth = 0
|
||||
self.max_recursive_depth = max_recursive_depth
|
||||
self.log_name = self.logger.log
|
||||
|
||||
def process_file(self, srcpath, dstpath, relative_path):
|
||||
self.cur_file = File(srcpath, dstpath, self.logger)
|
||||
self.log_name.info('Processing {} ({}/{})',
|
||||
relative_path,
|
||||
self.cur_file.main_type,
|
||||
self.cur_file.sub_type)
|
||||
self.cur_file.check()
|
||||
if self.cur_file.is_archive:
|
||||
# Handle archive
|
||||
pass
|
||||
else:
|
||||
# TODO: Check if should be copied, maybe have an attribute for this?
|
||||
self._safe_copy()
|
||||
self._write_log()
|
||||
|
||||
def process_dir(self, src_dir, dst_dir):
|
||||
"""Main function coordinating file processing."""
|
||||
if self.recursive_archive_depth > 0:
|
||||
self._write_log()
|
||||
if self.recursive_archive_depth >= self.max_recursive_depth:
|
||||
self._handle_archivebomb(src_dir)
|
||||
# TODO: Think we want to move write_log elsewhere:
|
||||
# if self.recursive_archive_depth > 0:
|
||||
# self.write_log()
|
||||
# TODO: Can we clean up the way we handle relative_path?
|
||||
for srcpath in self.list_all_files(src_dir):
|
||||
dstpath = srcpath.replace(src_dir, dst_dir)
|
||||
relative_path = srcpath.replace(src_dir + '/', '')
|
||||
self.process_file(srcpath, dstpath, relative_path)
|
||||
self.cur_file = File(srcpath, dstpath, self.logger)
|
||||
# TODO: move this logging code elsewhere
|
||||
self.logger.log.info('Processing {} ({}/{})',
|
||||
relative_path,
|
||||
self.cur_file.main_type,
|
||||
self.cur_file.sub_type)
|
||||
self.process_file(self.cur_file)
|
||||
|
||||
def process_file(self, file):
|
||||
file.check()
|
||||
if file.is_recursive:
|
||||
self.process_archive(file)
|
||||
else:
|
||||
# TODO: Check if should be copied, make an attribute for should be copied True/False
|
||||
self._safe_copy()
|
||||
file.write_log()
|
||||
if hasattr(file, "tempdir_path"):
|
||||
self._safe_rmtree(file.tempdir_path)
|
||||
|
||||
def process_archive(self, file):
|
||||
"""Unpacks an archive using 7zip and processes contents.
|
||||
|
||||
Should be given a Kittengroomer file object whose src_path points
|
||||
to an archive."""
|
||||
self.recursive_archive_depth += 1
|
||||
# Check for archivebomb
|
||||
if self.recursive_archive_depth >= self.max_recursive_depth:
|
||||
self._handle_archivebomb(file)
|
||||
else:
|
||||
tempdir_path = file._make_tempdir()
|
||||
# Unpack the archive
|
||||
base_command = '{} -p1 x "{}" -o"{}" -bd -aoa'
|
||||
extract_command = base_command.format(SEVENZ_PATH, file.src_path, tempdir_path)
|
||||
file._run_process(extract_command)
|
||||
# Add it to the tree
|
||||
self.logger.tree(tempdir_path)
|
||||
# List all files, process them
|
||||
self.process_dir(tempdir_path, file.dst_path)
|
||||
# Clean up
|
||||
self._safe_rmtree(tempdir_path)
|
||||
self.recursive_archive_depth -= 1
|
||||
|
||||
|
||||
def _handle_archivebomb(self, file):
|
||||
file.make_dangerous()
|
||||
file.add_log_details('Archive Bomb', True)
|
||||
self.logger.log.warning('ARCHIVE BOMB.')
|
||||
self.logger.log.warning('The content of the archive contains recursively other archives.')
|
||||
self.logger.log.warning('This is a bad sign so the archive is not extracted to the destination key.')
|
||||
# TODO: are we sure we want to delete the archive on the source key? Commenting out for now
|
||||
# self._safe_rmtree(file.src_dir)
|
||||
# What is the goal of this code:
|
||||
# if file.src_dir.endswith('_temp'):
|
||||
# # TODO: change the way bomb_path is constructed and the way we check for tempdir
|
||||
# bomb_path = file.src_dir[:-len('_temp')]
|
||||
# self._safe_remove(bomb_path)
|
||||
|
||||
def run(self):
|
||||
self.process_dir(self.src_root_dir, self.dst_root_dir)
|
||||
|
|
|
@ -42,12 +42,13 @@ class FileBase(object):
|
|||
"""Initialized with the source path and expected destination path."""
|
||||
self.src_path = src_path
|
||||
self.dst_path = dst_path
|
||||
# TODO: rename this to file_properties
|
||||
# TODO: rename this to file_properties (and change in other groomers)
|
||||
self.log_details = {'filepath': self.src_path}
|
||||
self.log_string = ''
|
||||
self.extension = self._determine_extension()
|
||||
self._determine_mimetype()
|
||||
self.logger = logger
|
||||
self.filename = os.path.basename(self.src_path)
|
||||
|
||||
def _determine_extension(self):
|
||||
_, ext = os.path.splitext(self.src_path)
|
||||
|
|
Loading…
Reference in New Issue