mirror of https://github.com/CIRCL/PyCIRCLean
Hacks to make tests pass before fixing
parent
1cf8a62f46
commit
7d62238270
|
@ -82,8 +82,8 @@ MAL_EXTS = (
|
||||||
|
|
||||||
class File(FileBase):
|
class File(FileBase):
|
||||||
|
|
||||||
def __init__(self, src_path, dst_path):
|
def __init__(self, src_path, dst_path, logger):
|
||||||
super(File, self).__init__(src_path, dst_path)
|
super(File, self).__init__(src_path, dst_path, logger)
|
||||||
self.is_recursive = False
|
self.is_recursive = False
|
||||||
self._check_dangerous()
|
self._check_dangerous()
|
||||||
if self.is_dangerous():
|
if self.is_dangerous():
|
||||||
|
@ -148,6 +148,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
|
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
|
||||||
self.recursive_archive_depth = 0
|
self.recursive_archive_depth = 0
|
||||||
self.max_recursive_depth = max_recursive_depth
|
self.max_recursive_depth = max_recursive_depth
|
||||||
|
self.log_name = self.logger.log
|
||||||
|
|
||||||
subtypes_apps = [
|
subtypes_apps = [
|
||||||
(mimes_office, self._winoffice),
|
(mimes_office, self._winoffice),
|
||||||
|
@ -193,7 +194,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
def _print_log(self):
|
def _print_log(self):
|
||||||
"""Print the logs related to the current file being processed."""
|
"""Print the logs related to the current file being processed."""
|
||||||
# TODO: change name to _write_log
|
# TODO: change name to _write_log
|
||||||
tmp_log = self.log_name.fields(**self.cur_file.log_details)
|
tmp_log = self.logger.log.fields(**self.cur_file.log_details)
|
||||||
if self.cur_file.is_dangerous():
|
if self.cur_file.is_dangerous():
|
||||||
tmp_log.warning(self.cur_file.log_string)
|
tmp_log.warning(self.cur_file.log_string)
|
||||||
elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'):
|
elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'):
|
||||||
|
@ -205,7 +206,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
"""Run command_string in a subprocess, wait until it finishes."""
|
"""Run command_string in a subprocess, wait until it finishes."""
|
||||||
args = shlex.split(command_string)
|
args = shlex.split(command_string)
|
||||||
# TODO: log_debug_err and log_debug are now broken, fix
|
# TODO: log_debug_err and log_debug are now broken, fix
|
||||||
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
|
with open(self.logger.log_debug_err, 'ab') as stderr, open(self.logger.log_debug_out, 'ab') as stdout:
|
||||||
try:
|
try:
|
||||||
subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout)
|
subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout)
|
||||||
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
|
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
|
||||||
|
@ -400,7 +401,8 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir)
|
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir)
|
||||||
self._run_process(extract_command)
|
self._run_process(extract_command)
|
||||||
self.recursive_archive_depth += 1
|
self.recursive_archive_depth += 1
|
||||||
self.tree(tmpdir)
|
# Broken so commenting out for now:
|
||||||
|
# self.tree(tmpdir)
|
||||||
self.processdir(tmpdir, self.cur_file.dst_path)
|
self.processdir(tmpdir, self.cur_file.dst_path)
|
||||||
self.recursive_archive_depth -= 1
|
self.recursive_archive_depth -= 1
|
||||||
self._safe_rmtree(tmpdir)
|
self._safe_rmtree(tmpdir)
|
||||||
|
@ -549,7 +551,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
#######################
|
#######################
|
||||||
|
|
||||||
def process_file(self, srcpath, dstpath, relative_path):
|
def process_file(self, srcpath, dstpath, relative_path):
|
||||||
self.cur_file = File(srcpath, dstpath)
|
self.cur_file = File(srcpath, dstpath, self.logger)
|
||||||
self.log_name.info('Processing {} ({}/{})',
|
self.log_name.info('Processing {} ({}/{})',
|
||||||
relative_path,
|
relative_path,
|
||||||
self.cur_file.main_type,
|
self.cur_file.main_type,
|
||||||
|
|
|
@ -42,6 +42,7 @@ class FileBase(object):
|
||||||
"""Initialized with the source path and expected destination path."""
|
"""Initialized with the source path and expected destination path."""
|
||||||
self.src_path = src_path
|
self.src_path = src_path
|
||||||
self.dst_path = dst_path
|
self.dst_path = dst_path
|
||||||
|
# TODO: rename this. file_information? file_data? file_metadata? file_log?
|
||||||
self.log_details = {'filepath': self.src_path}
|
self.log_details = {'filepath': self.src_path}
|
||||||
self.log_string = ''
|
self.log_string = ''
|
||||||
self._determine_extension()
|
self._determine_extension()
|
||||||
|
@ -221,6 +222,13 @@ class GroomerLog(object):
|
||||||
s.update(buf)
|
s.update(buf)
|
||||||
return s.hexdigest()
|
return s.hexdigest()
|
||||||
|
|
||||||
|
def add_file(self):
|
||||||
|
pass
|
||||||
|
# Should this return a sublog that the file can then write to? Does that work? Too confusing to understand?
|
||||||
|
|
||||||
|
def add_event(self, event, log_level):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class KittenGroomerBase(object):
|
class KittenGroomerBase(object):
|
||||||
"""Base object responsible for copy/sanitization process."""
|
"""Base object responsible for copy/sanitization process."""
|
||||||
|
@ -278,6 +286,7 @@ class KittenGroomerBase(object):
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
|
|
||||||
|
# TODO: feels like this funciton doesn't need to exist if we move main()
|
||||||
def processdir(self, src_dir, dst_dir):
|
def processdir(self, src_dir, dst_dir):
|
||||||
"""
|
"""
|
||||||
Implement this function in your subclass to define file processing behavior.
|
Implement this function in your subclass to define file processing behavior.
|
||||||
|
@ -285,6 +294,7 @@ class KittenGroomerBase(object):
|
||||||
raise ImplementationRequired('Please implement processdir.')
|
raise ImplementationRequired('Please implement processdir.')
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Maybe this shouldn't exist? It should probably get moved to filecheck since this isn't really API code
|
||||||
def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'):
|
def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'):
|
||||||
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
|
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
|
||||||
parser.add_argument('-s', '--source', type=str, help='Source directory')
|
parser.add_argument('-s', '--source', type=str, help='Source directory')
|
||||||
|
|
|
@ -6,17 +6,17 @@ def save_logs(groomer, test_description):
|
||||||
test_log_path = 'tests/test_logs/{}.log'.format(test_description)
|
test_log_path = 'tests/test_logs/{}.log'.format(test_description)
|
||||||
with open(test_log_path, 'w+') as test_log:
|
with open(test_log_path, 'w+') as test_log:
|
||||||
test_log.write(divider.format('TEST LOG'))
|
test_log.write(divider.format('TEST LOG'))
|
||||||
with open(groomer.log_processing, 'r') as logfile:
|
with open(groomer.logger.log_processing, 'r') as logfile:
|
||||||
log = logfile.read()
|
log = logfile.read()
|
||||||
test_log.write(log)
|
test_log.write(log)
|
||||||
if groomer.debug:
|
if groomer.debug:
|
||||||
if os.path.exists(groomer.log_debug_err):
|
if os.path.exists(groomer.logger.log_debug_err):
|
||||||
test_log.write(divider.format('ERR LOG'))
|
test_log.write(divider.format('ERR LOG'))
|
||||||
with open(groomer.log_debug_err, 'r') as debug_err:
|
with open(groomer.logger.log_debug_err, 'r') as debug_err:
|
||||||
err = debug_err.read()
|
err = debug_err.read()
|
||||||
test_log.write(err)
|
test_log.write(err)
|
||||||
if os.path.exists(groomer.log_debug_out):
|
if os.path.exists(groomer.logger.log_debug_out):
|
||||||
test_log.write(divider.format('OUT LOG'))
|
test_log.write(divider.format('OUT LOG'))
|
||||||
with open(groomer.log_debug_out, 'r') as debug_out:
|
with open(groomer.logger.log_debug_out, 'r') as debug_out:
|
||||||
out = debug_out.read()
|
out = debug_out.read()
|
||||||
test_log.write(out)
|
test_log.write(out)
|
||||||
|
|
Loading…
Reference in New Issue