mirror of https://github.com/CIRCL/PyCIRCLean
Add/update docstrings for filecheck and helpers
parent
ac94cf5d6d
commit
4d8a1d1daf
|
@ -23,6 +23,8 @@ SEVENZ_PATH = '/usr/bin/7z'
|
|||
|
||||
|
||||
class Config:
|
||||
"""Configuration information for Filecheck."""
|
||||
|
||||
# Application subtypes (mimetype: 'application/<subtype>')
|
||||
mimes_ooxml = ['vnd.openxmlformats-officedocument.']
|
||||
mimes_office = ['msword', 'vnd.ms-']
|
||||
|
@ -180,12 +182,13 @@ class File(FileBase):
|
|||
|
||||
@property
|
||||
def has_metadata(self):
|
||||
"""True if filetype typically contains metadata, else False."""
|
||||
if self.mimetype in Config.mimes_metadata:
|
||||
return True
|
||||
return False
|
||||
|
||||
def make_tempdir(self):
|
||||
"""Make a temporary directory."""
|
||||
"""Make a temporary directory at self.tempdir_path."""
|
||||
self.tempdir_path = self.dst_path + '_temp'
|
||||
if not os.path.exists(self.tempdir_path):
|
||||
os.makedirs(self.tempdir_path)
|
||||
|
@ -246,7 +249,7 @@ class File(FileBase):
|
|||
self.force_ext('.txt')
|
||||
|
||||
def application(self):
|
||||
"""Processes an application specific file according to its subtype."""
|
||||
"""Process an application specific file according to its subtype."""
|
||||
for subtype, method in self.app_subtype_methods.items():
|
||||
if subtype in self.sub_type:
|
||||
# TODO: should we change the logic so we don't iterate through all of the subtype methods?
|
||||
|
@ -258,13 +261,13 @@ class File(FileBase):
|
|||
self._unknown_app()
|
||||
|
||||
def _executables(self):
|
||||
"""Processes an executable file."""
|
||||
"""Process an executable file."""
|
||||
# LOG: change the processing_type property to some other name or include in file_string
|
||||
self.set_property('processing_type', 'executable')
|
||||
self.make_dangerous('executable')
|
||||
|
||||
def _winoffice(self):
|
||||
"""Processes a winoffice file using olefile/oletools."""
|
||||
"""Process a winoffice file using olefile/oletools."""
|
||||
# LOG: processing_type property
|
||||
self.set_property('processing_type', 'WinOffice')
|
||||
oid = oletools.oleid.OleID(self.src_path) # First assume a valid file
|
||||
|
@ -297,7 +300,7 @@ class File(FileBase):
|
|||
self.make_dangerous('flash')
|
||||
|
||||
def _ooxml(self):
|
||||
"""Processes an ooxml file."""
|
||||
"""Process an ooxml file."""
|
||||
# LOG: processing_type property
|
||||
self.set_property('processing_type', 'ooxml')
|
||||
try:
|
||||
|
@ -318,7 +321,7 @@ class File(FileBase):
|
|||
self.make_dangerous('embedded pack')
|
||||
|
||||
def _libreoffice(self):
|
||||
"""Processes a libreoffice file."""
|
||||
"""Process a libreoffice file."""
|
||||
self.set_property('processing_type', 'libreoffice')
|
||||
# As long as there is no way to do a sanity check on the files => dangerous
|
||||
try:
|
||||
|
@ -333,7 +336,7 @@ class File(FileBase):
|
|||
self.make_dangerous('macro')
|
||||
|
||||
def _pdf(self):
|
||||
"""Processes a PDF file."""
|
||||
"""Process a PDF file."""
|
||||
# LOG: processing_type property
|
||||
self.set_property('processing_type', 'pdf')
|
||||
xmlDoc = PDFiD(self.src_path)
|
||||
|
@ -351,26 +354,30 @@ class File(FileBase):
|
|||
self.make_dangerous('launch')
|
||||
|
||||
def _archive(self):
|
||||
"""Processes an archive using 7zip. The archive is extracted to a
|
||||
temporary directory and self.process_dir is called on that directory.
|
||||
The recursive archive depth is increased to protect against archive
|
||||
bombs."""
|
||||
"""
|
||||
Process an archive using 7zip.
|
||||
|
||||
The archive is extracted to a temporary directory and self.process_dir
|
||||
is called on that directory. The recursive archive depth is increased
|
||||
to protect against archive bombs.
|
||||
"""
|
||||
# LOG: change this to something archive specific
|
||||
self.set_property('processing_type', 'archive')
|
||||
self.should_copy = False
|
||||
self.is_recursive = True
|
||||
|
||||
def _unknown_app(self):
|
||||
"""Processes an unknown file."""
|
||||
"""Process an unknown file."""
|
||||
self.make_unknown()
|
||||
|
||||
def _binary_app(self):
|
||||
"""Processses an unknown binary file."""
|
||||
"""Process an unknown binary file."""
|
||||
self.make_binary()
|
||||
|
||||
#######################
|
||||
# Metadata extractors
|
||||
def _metadata_exif(self, metadata_file_path):
|
||||
"""Read exif metadata from a jpg or tiff file using exifread."""
|
||||
# TODO: this method is kind of long, can we shorten it somehow?
|
||||
img = open(self.src_path, 'rb')
|
||||
tags = None
|
||||
|
@ -401,6 +408,7 @@ class File(FileBase):
|
|||
return True
|
||||
|
||||
def _metadata_png(self, metadata_file_path):
|
||||
"""Extract metadata from a png file using PIL/Pillow."""
|
||||
warnings.simplefilter('error', Image.DecompressionBombWarning)
|
||||
try:
|
||||
img = Image.open(self.src_path)
|
||||
|
@ -420,6 +428,7 @@ class File(FileBase):
|
|||
return False
|
||||
|
||||
def extract_metadata(self):
|
||||
"""Create metadata file and call correct metadata extraction method."""
|
||||
metadata_file_path = self.create_metadata_file(".metadata.txt")
|
||||
mt = self.mimetype
|
||||
metadata_processing_method = self.metadata_mimetype_methods.get(mt)
|
||||
|
@ -430,12 +439,12 @@ class File(FileBase):
|
|||
#######################
|
||||
# ##### Media - audio and video aren't converted ######
|
||||
def audio(self):
|
||||
"""Processes an audio file."""
|
||||
"""Process an audio file."""
|
||||
self.log_string += 'Audio file'
|
||||
self._media_processing()
|
||||
|
||||
def video(self):
|
||||
"""Processes a video."""
|
||||
"""Process a video."""
|
||||
self.log_string += 'Video file'
|
||||
self._media_processing()
|
||||
|
||||
|
@ -444,11 +453,14 @@ class File(FileBase):
|
|||
self.set_property('processing_type', 'media')
|
||||
|
||||
def image(self):
|
||||
"""Processes an image.
|
||||
"""
|
||||
Process an image.
|
||||
|
||||
Extracts metadata to dest key if metadata is present. Creates a
|
||||
temporary directory on dest key, opens the using PIL.Image,saves it to
|
||||
the temporary directory, and copies it to the destination."""
|
||||
Extracts metadata to dest key using self.extract_metada() if metadata
|
||||
is present. Creates a temporary directory on dest key, opens the image
|
||||
using PIL.Image, saves it to the temporary directory, and copies it to
|
||||
the destination.
|
||||
"""
|
||||
# TODO: make sure this method works for png, gif, tiff
|
||||
if self.has_metadata:
|
||||
self.extract_metadata()
|
||||
|
@ -476,7 +488,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
self.max_recursive_depth = max_recursive_depth
|
||||
|
||||
def process_dir(self, src_dir, dst_dir):
|
||||
"""Main function coordinating file processing."""
|
||||
"""Process a directory on the source key."""
|
||||
self.logger.tree(src_dir)
|
||||
for srcpath in self.list_all_files(src_dir):
|
||||
dstpath = srcpath.replace(src_dir, dst_dir)
|
||||
|
@ -489,6 +501,12 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
self.process_file(self.cur_file)
|
||||
|
||||
def process_file(self, file):
|
||||
"""
|
||||
Process an individual file.
|
||||
|
||||
Check the file, handle archives using self.process_archive, copy
|
||||
the file to the destionation key, and clean up temporary directory.
|
||||
"""
|
||||
file.check()
|
||||
if file.is_recursive:
|
||||
self.process_archive(file)
|
||||
|
@ -500,10 +518,12 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
self.safe_rmtree(file.tempdir_path)
|
||||
|
||||
def process_archive(self, file):
|
||||
"""Unpacks an archive using 7zip and processes contents.
|
||||
"""
|
||||
Unpack an archive using 7zip and process contents using process_dir.
|
||||
|
||||
Should be given a Kittengroomer file object whose src_path points
|
||||
to an archive."""
|
||||
to an archive.
|
||||
"""
|
||||
self.recursive_archive_depth += 1
|
||||
# LOG: write_log or somehow log the archive file here
|
||||
if self.recursive_archive_depth >= self.max_recursive_depth:
|
||||
|
|
|
@ -32,13 +32,18 @@ class ImplementationRequired(KittenGroomerError):
|
|||
|
||||
class FileBase(object):
|
||||
"""
|
||||
Base object for individual files in the source directory. Contains file
|
||||
attributes and various helper methods. Subclass and add attributes
|
||||
or methods relevant to a given implementation.
|
||||
Base object for individual files in the source directory.
|
||||
|
||||
Contains file attributes and various helper methods.
|
||||
"""
|
||||
|
||||
def __init__(self, src_path, dst_path, logger=None):
|
||||
"""Initialized with the source path and expected destination path."""
|
||||
"""
|
||||
Initialized with the source path and expected destination path.
|
||||
|
||||
self.logger should be a logging object with an add_file method.
|
||||
Create various properties and determine the file's mimetype.
|
||||
"""
|
||||
self.src_path = src_path
|
||||
self.dst_path = dst_path
|
||||
self.filename = os.path.basename(self.src_path)
|
||||
|
@ -106,6 +111,7 @@ class FileBase(object):
|
|||
|
||||
@property
|
||||
def size(self):
|
||||
"""Filesize in bytes as an int, 0 if file does not exist."""
|
||||
try:
|
||||
size = os.path.getsize(self.src_path)
|
||||
except FileNotFoundError:
|
||||
|
@ -114,7 +120,7 @@ class FileBase(object):
|
|||
|
||||
@property
|
||||
def has_mimetype(self):
|
||||
"""Returns True if file has a full mimetype, else False."""
|
||||
"""True if file has a main and sub mimetype, else False."""
|
||||
# TODO: broken mimetype checks should be done somewhere else.
|
||||
# Should the check be by default or should we let the API consumer write it?
|
||||
if not self.main_type or not self.sub_type:
|
||||
|
@ -124,7 +130,7 @@ class FileBase(object):
|
|||
|
||||
@property
|
||||
def has_extension(self):
|
||||
"""Returns True if self.extension is set, else False."""
|
||||
"""True if self.extension is set, else False."""
|
||||
if self.extension is None:
|
||||
return False
|
||||
else:
|
||||
|
@ -132,35 +138,42 @@ class FileBase(object):
|
|||
|
||||
@property
|
||||
def is_dangerous(self):
|
||||
"""True if file has been marked 'dangerous' else False."""
|
||||
"""True if file has been marked 'dangerous', else False."""
|
||||
return self._file_props['safety_category'] is 'dangerous'
|
||||
|
||||
@property
|
||||
def is_unknown(self):
|
||||
"""True if file has been marked 'unknown' else False."""
|
||||
"""True if file has been marked 'unknown', else False."""
|
||||
return self._file_props['safety_category'] is 'unknown'
|
||||
|
||||
@property
|
||||
def is_binary(self):
|
||||
"""True if file has been marked 'binary' else False."""
|
||||
"""True if file has been marked 'binary', else False."""
|
||||
return self._file_props['safety_category'] is 'binary'
|
||||
|
||||
@property
|
||||
def is_symlink(self):
|
||||
"""Returns True and updates log if file is a symlink."""
|
||||
"""True if file is a symlink, else False."""
|
||||
if self._file_props['symlink'] is False:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def set_property(self, prop_string, value):
|
||||
"""Takes a property + a value and adds them to self._file_props."""
|
||||
"""
|
||||
Take a property and a value and add them to self._file_props.
|
||||
|
||||
If prop_string is already in _file_props, set prop_string to value.
|
||||
If prop_string not in _file_props, set prop_string to value in
|
||||
_file_props['user_defined'].
|
||||
"""
|
||||
if prop_string in self._file_props.keys():
|
||||
self._file_props[prop_string] = value
|
||||
else:
|
||||
self._file_props['user_defined'][prop_string] = value
|
||||
|
||||
def get_property(self, file_prop):
|
||||
"""Get the value for a property in _file_props."""
|
||||
# TODO: could probably be refactored
|
||||
if file_prop in self._file_props:
|
||||
return self._file_props[file_prop]
|
||||
|
@ -170,16 +183,18 @@ class FileBase(object):
|
|||
return None
|
||||
|
||||
def add_error(self, error, info):
|
||||
"""Add an error: info pair to _file_props['errors']."""
|
||||
self._file_props['errors'].update({error: info})
|
||||
|
||||
def add_file_string(self, file_string):
|
||||
"""Add a file descriptor string to _file_props."""
|
||||
self._file_props['file_string_set'].add(file_string)
|
||||
|
||||
def make_dangerous(self, reason_string=None):
|
||||
"""
|
||||
Marks a file as dangerous.
|
||||
Mark file as dangerous.
|
||||
|
||||
Prepends and appends DANGEROUS to the destination file name
|
||||
Prepend and append DANGEROUS to the destination file name
|
||||
to help prevent double-click of death.
|
||||
"""
|
||||
if self.is_dangerous:
|
||||
|
@ -190,7 +205,7 @@ class FileBase(object):
|
|||
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
|
||||
|
||||
def make_unknown(self):
|
||||
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
|
||||
"""Mark file as an unknown type and prepend UNKNOWN to filename."""
|
||||
if self.is_dangerous or self.is_binary:
|
||||
return
|
||||
self.set_property('safety_category', 'unknown')
|
||||
|
@ -198,7 +213,7 @@ class FileBase(object):
|
|||
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
|
||||
|
||||
def make_binary(self):
|
||||
"""Marks a file as a binary and appends .bin to filename."""
|
||||
"""Mark file as a binary and append .bin to filename."""
|
||||
if self.is_dangerous:
|
||||
return
|
||||
self.set_property('safety_category', 'binary')
|
||||
|
@ -206,7 +221,7 @@ class FileBase(object):
|
|||
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
|
||||
|
||||
def safe_copy(self, src=None, dst=None):
|
||||
"""Copy a file and create directory if needed."""
|
||||
"""Copy file and create destination directories if needed."""
|
||||
if src is None:
|
||||
src = self.src_path
|
||||
if dst is None:
|
||||
|
@ -220,7 +235,7 @@ class FileBase(object):
|
|||
self.add_error(e, '')
|
||||
|
||||
def force_ext(self, ext):
|
||||
"""If dst_path does not end in ext, changes it and edits _file_props."""
|
||||
"""If dst_path does not end in ext, change it and edit _file_props."""
|
||||
if not self.dst_path.endswith(ext):
|
||||
self.set_property('force_ext', True)
|
||||
self.dst_path += ext
|
||||
|
@ -228,7 +243,7 @@ class FileBase(object):
|
|||
self.set_property('extension', ext)
|
||||
|
||||
def create_metadata_file(self, ext):
|
||||
"""Create a separate file to hold this file's metadata."""
|
||||
"""Create a separate file to hold metadata from this file."""
|
||||
try:
|
||||
# make sure we aren't overwriting anything
|
||||
if os.path.exists(self.src_path + ext):
|
||||
|
@ -247,7 +262,7 @@ class FileBase(object):
|
|||
return False
|
||||
|
||||
def write_log(self):
|
||||
"""Print the logs related to the current file being processed."""
|
||||
"""Write logs from file to self.logger."""
|
||||
file_log = self.logger.add_file(self)
|
||||
file_log.fields(**self._file_props)
|
||||
|
||||
|
@ -273,7 +288,7 @@ class GroomerLogger(object):
|
|||
self.log_debug_out = os.devnull
|
||||
|
||||
def tree(self, base_dir, padding=' '):
|
||||
"""Writes a graphical tree to the log for a given directory."""
|
||||
"""Write a graphical tree to the log for `base_dir`."""
|
||||
with open(self.log_content, 'ab') as lf:
|
||||
lf.write(bytes('#' * 80 + '\n', 'UTF-8'))
|
||||
lf.write(bytes('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)).encode()), 'utf8'))
|
||||
|
@ -289,7 +304,7 @@ class GroomerLogger(object):
|
|||
lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)).encode(errors='ignore'))
|
||||
|
||||
def _computehash(self, path):
|
||||
"""Returns a sha256 hash of a file at a given path."""
|
||||
"""Return a sha256 hash of a file at a given path."""
|
||||
s = hashlib.sha256()
|
||||
with open(path, 'rb') as f:
|
||||
while True:
|
||||
|
@ -300,6 +315,7 @@ class GroomerLogger(object):
|
|||
return s.hexdigest()
|
||||
|
||||
def add_file(self, file):
|
||||
"""Add a file to the log."""
|
||||
return self.log.name('file.src_path')
|
||||
|
||||
|
||||
|
@ -340,9 +356,7 @@ class KittenGroomerBase(object):
|
|||
|
||||
# TODO: feels like this function doesn't need to exist if we move main()
|
||||
def processdir(self, src_dir, dst_dir):
|
||||
"""
|
||||
Implement this function in your subclass to define file processing behavior.
|
||||
"""
|
||||
"""Implement this function to define file processing behavior."""
|
||||
raise ImplementationRequired('Please implement processdir.')
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue