mirror of https://github.com/CIRCL/PyCIRCLean
Add TODOs and clarify various logging messages
parent
963a2feef4
commit
0175ee48e5
|
@ -143,7 +143,7 @@ class File(FileBase):
|
|||
is_known_extension = self.extension in mimetypes.types_map.keys()
|
||||
if is_known_extension and expected_mimetype != self.mimetype:
|
||||
# LOG: improve this string
|
||||
self.make_dangerous('expected_mimetyped')
|
||||
self.make_dangerous('expected_mimetype')
|
||||
|
||||
def _check_mimetype(self):
|
||||
"""Takes the mimetype (as determined by libmagic) and determines
|
||||
|
@ -249,7 +249,8 @@ class File(FileBase):
|
|||
"""Processes an application specific file according to its subtype."""
|
||||
for subtype, method in self.app_subtype_methods.items():
|
||||
if subtype in self.sub_type:
|
||||
# TODO: should this return a value?
|
||||
# TODO: should we change the logic so we don't iterate through all of the subtype methods?
|
||||
# TODO: should these methods return a value?
|
||||
method()
|
||||
self.add_file_string('Application file')
|
||||
return
|
||||
|
@ -258,16 +259,15 @@ class File(FileBase):
|
|||
|
||||
def _executables(self):
|
||||
"""Processes an executable file."""
|
||||
# LOG: change this property
|
||||
# LOG: change the processing_type property to some other name or include in file_string
|
||||
self.set_property('processing_type', 'executable')
|
||||
self.make_dangerous('executable')
|
||||
|
||||
def _winoffice(self):
|
||||
"""Processes a winoffice file using olefile/oletools."""
|
||||
# LOG: change this property
|
||||
# LOG: processing_type property
|
||||
self.set_property('processing_type', 'WinOffice')
|
||||
# Try as if it is a valid document
|
||||
oid = oletools.oleid.OleID(self.src_path)
|
||||
oid = oletools.oleid.OleID(self.src_path) # First assume a valid file
|
||||
if not olefile.isOleFile(self.src_path):
|
||||
# Manual processing, may already count as suspicious
|
||||
try:
|
||||
|
@ -291,12 +291,14 @@ class File(FileBase):
|
|||
for i in indicators:
|
||||
if i.id == 'ObjectPool' and i.value:
|
||||
# TODO: Is it suspicious?
|
||||
# LOG: user defined property
|
||||
self.set_property('objpool', True)
|
||||
elif i.id == 'flash' and i.value:
|
||||
self.make_dangerous('flash')
|
||||
|
||||
def _ooxml(self):
|
||||
"""Processes an ooxml file."""
|
||||
# LOG: processing_type property
|
||||
self.set_property('processing_type', 'ooxml')
|
||||
try:
|
||||
doc = officedissector.doc.Document(self.src_path)
|
||||
|
@ -318,7 +320,7 @@ class File(FileBase):
|
|||
def _libreoffice(self):
|
||||
"""Processes a libreoffice file."""
|
||||
self.set_property('processing_type', 'libreoffice')
|
||||
# As long as there ar no way to do a sanity check on the files => dangerous
|
||||
# As long as there is no way to do a sanity check on the files => dangerous
|
||||
try:
|
||||
lodoc = zipfile.ZipFile(self.src_path, 'r')
|
||||
except:
|
||||
|
@ -332,6 +334,7 @@ class File(FileBase):
|
|||
|
||||
def _pdf(self):
|
||||
"""Processes a PDF file."""
|
||||
# LOG: processing_type property
|
||||
self.set_property('processing_type', 'pdf')
|
||||
xmlDoc = PDFiD(self.src_path)
|
||||
oPDFiD = cPDFiD(xmlDoc, True)
|
||||
|
@ -352,9 +355,10 @@ class File(FileBase):
|
|||
temporary directory and self.process_dir is called on that directory.
|
||||
The recursive archive depth is increased to protect against archive
|
||||
bombs."""
|
||||
# LOG: change this to something archive specific
|
||||
self.set_property('processing_type', 'archive')
|
||||
self.should_copy = False
|
||||
self.is_recursive = True
|
||||
# self.log_string += 'Archive extracted, processing content.'
|
||||
|
||||
def _unknown_app(self):
|
||||
"""Processes an unknown file."""
|
||||
|
@ -367,10 +371,9 @@ class File(FileBase):
|
|||
#######################
|
||||
# Metadata extractors
|
||||
def _metadata_exif(self, metadata_file_path):
|
||||
# TODO: this method is kind of long, can we shorten it?
|
||||
# TODO: this method is kind of long, can we shorten it somehow?
|
||||
img = open(self.src_path, 'rb')
|
||||
tags = None
|
||||
|
||||
try:
|
||||
tags = exifread.process_file(img, debug=True)
|
||||
except Exception as e:
|
||||
|
@ -382,19 +385,17 @@ class File(FileBase):
|
|||
self.add_error(e, "Failed to get any metadata for file {}.".format(self.src_path))
|
||||
img.close()
|
||||
return False
|
||||
|
||||
for tag in sorted(tags.keys()):
|
||||
# These are long and obnoxious/binary
|
||||
# These tags are long and obnoxious/binary so we don't add them
|
||||
if tag not in ('JPEGThumbnail', 'TIFFThumbnail'):
|
||||
printable = str(tags[tag])
|
||||
|
||||
tag_string = str(tags[tag])
|
||||
# Exifreader truncates data.
|
||||
if len(printable) > 25 and printable.endswith(", ... ]"):
|
||||
value = tags[tag].values
|
||||
printable = str(value)
|
||||
|
||||
if len(tag_string) > 25 and tag_string.endswith(", ... ]"):
|
||||
tag_value = tags[tag].values
|
||||
tag_string = str(tag_value)
|
||||
with open(metadata_file_path, 'w+') as metadata_file:
|
||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
|
||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, tag_string))
|
||||
# LOG: how do we want to log metadata?
|
||||
self.set_property('metadata', 'exif')
|
||||
img.close()
|
||||
return True
|
||||
|
@ -408,10 +409,12 @@ class File(FileBase):
|
|||
if tag not in ('icc_profile'):
|
||||
with open(metadata_file_path, 'w+') as metadata_file:
|
||||
metadata_file.write("Key: {}\tValue: {}\n".format(tag, img.info[tag]))
|
||||
# LOG: handle metadata
|
||||
self.set_property('metadata', 'png')
|
||||
img.close()
|
||||
# Catch decompression bombs
|
||||
except Exception as e:
|
||||
# TODO: only catch DecompressionBombWarnings here?
|
||||
self.add_error(e, "Caught exception processing metadata for {}".format(self.src_path))
|
||||
self.make_dangerous('exception processing metadata')
|
||||
return False
|
||||
|
@ -474,11 +477,6 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
|
||||
def process_dir(self, src_dir, dst_dir):
|
||||
"""Main function coordinating file processing."""
|
||||
# LOG: what's the purpose of this write log?:
|
||||
# if self.recursive_archive_depth > 0:
|
||||
# self.write_log()
|
||||
# We're writing the log here because...
|
||||
# How exactly does the workflow work with an archive?
|
||||
for srcpath in self.list_all_files(src_dir):
|
||||
dstpath = srcpath.replace(src_dir, dst_dir)
|
||||
# TODO: Can we clean up the way we handle relative_path?
|
||||
|
@ -506,9 +504,9 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
Should be given a Kittengroomer file object whose src_path points
|
||||
to an archive."""
|
||||
self.recursive_archive_depth += 1
|
||||
# Check for archivebomb
|
||||
# LOG: write_log or somehow log the archive file here
|
||||
if self.recursive_archive_depth >= self.max_recursive_depth:
|
||||
self._handle_archivebomb(file)
|
||||
file.make_dangerous('Archive bomb')
|
||||
else:
|
||||
tempdir_path = file.make_tempdir()
|
||||
command_str = '{} -p1 x "{}" -o"{}" -bd -aoa'
|
||||
|
@ -521,13 +519,6 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
self.safe_rmtree(tempdir_path)
|
||||
self.recursive_archive_depth -= 1
|
||||
|
||||
def _handle_archivebomb(self, file):
|
||||
file.make_dangerous('Archive bomb')
|
||||
self.logger.log.warning('ARCHIVE BOMB.')
|
||||
self.logger.log.warning('The content of the archive contains recursively other archives.')
|
||||
self.logger.log.warning('This is a bad sign so the archive is not extracted to the destination key.')
|
||||
# TODO: delete whatever we want to delete that's already been copied to dest dir
|
||||
|
||||
def _run_process(self, command_string, timeout=None):
|
||||
"""Run command_string in a subprocess, wait until it finishes."""
|
||||
args = shlex.split(command_string)
|
||||
|
|
|
@ -88,7 +88,7 @@ class FileBase(object):
|
|||
# Note: magic will always return something, even if it's just 'data'
|
||||
except UnicodeEncodeError as e:
|
||||
# FIXME: The encoding of the file is broken (possibly UTF-16)
|
||||
# One of the Travis files will trigger this.
|
||||
# Note: one of the Travis files will trigger this exception
|
||||
self.add_error(e, '')
|
||||
mt = None
|
||||
try:
|
||||
|
@ -115,9 +115,9 @@ class FileBase(object):
|
|||
@property
|
||||
def has_mimetype(self):
|
||||
"""Returns True if file has a full mimetype, else False."""
|
||||
# TODO: broken mimetype checks should be done somewhere else.
|
||||
# Should the check be by default or should we let the API consumer write it?
|
||||
if not self.main_type or not self.sub_type:
|
||||
# LOG: log this as an error, move somewhere else?
|
||||
# self._file_props.update({'broken_mime': True})
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
@ -126,8 +126,6 @@ class FileBase(object):
|
|||
def has_extension(self):
|
||||
"""Returns True if self.extension is set, else False."""
|
||||
if self.extension is None:
|
||||
# TODO: do we actually want to have a seperate prop for no extension?
|
||||
# self.set_property('no_extension', True)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
@ -163,7 +161,7 @@ class FileBase(object):
|
|||
self._file_props['user_defined'][prop_string] = value
|
||||
|
||||
def get_property(self, file_prop):
|
||||
# FIXME: needs to be refactored
|
||||
# TODO: could probably be refactored
|
||||
if file_prop in self._file_props:
|
||||
return self._file_props[file_prop]
|
||||
elif file_prop in self._file_props['user_defined']:
|
||||
|
@ -187,7 +185,7 @@ class FileBase(object):
|
|||
if self.is_dangerous:
|
||||
return
|
||||
self.set_property('safety_category', 'dangerous')
|
||||
# LOG: store reason string
|
||||
# LOG: store reason string somewhere and do something with it
|
||||
path, filename = os.path.split(self.dst_path)
|
||||
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
|
||||
|
||||
|
@ -250,11 +248,12 @@ class FileBase(object):
|
|||
|
||||
def write_log(self):
|
||||
"""Print the logs related to the current file being processed."""
|
||||
tmp_log = self.logger.log.fields(**self._file_props)
|
||||
file_log = self.logger.add_file(self)
|
||||
file_log.fields(**self._file_props)
|
||||
|
||||
|
||||
class GroomerLogger(object):
|
||||
"""Groomer logging interface"""
|
||||
"""Groomer logging interface."""
|
||||
|
||||
def __init__(self, root_dir_path, debug=False):
|
||||
self.root_dir = root_dir_path
|
||||
|
@ -301,8 +300,7 @@ class GroomerLogger(object):
|
|||
return s.hexdigest()
|
||||
|
||||
def add_file(self, file):
|
||||
# return a sublog for the file
|
||||
pass
|
||||
return self.log.name('file.src_path')
|
||||
|
||||
|
||||
class KittenGroomerBase(object):
|
||||
|
|
Loading…
Reference in New Issue