PyCIRCLean/kittengroomer/helpers.py

308 lines
11 KiB
Python
Raw Normal View History

2015-05-11 14:32:59 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
2016-12-06 03:02:46 +01:00
"""
Contains the base objects for use when creating a sanitizer using
PyCIRCLean. Subclass FileBase and KittenGroomerBase to implement your
desired behavior.
"""
2015-05-11 14:32:59 +02:00
import os
2016-05-09 19:21:58 +02:00
import sys
2015-11-23 19:54:29 +01:00
import hashlib
2015-05-11 14:32:59 +02:00
import shutil
import argparse
import magic
from twiggy import quick_setup, log
2015-05-11 14:32:59 +02:00
class KittenGroomerError(Exception):
2016-12-06 03:02:46 +01:00
"""Base KittenGroomer exception handler."""
2015-05-11 14:32:59 +02:00
def __init__(self, message):
super(KittenGroomerError, self).__init__(message)
self.message = message
class ImplementationRequired(KittenGroomerError):
2016-12-06 03:02:46 +01:00
"""Implementation required error."""
2015-05-11 14:32:59 +02:00
pass
class FileBase(object):
2016-12-06 03:02:46 +01:00
"""
Base object for individual files in the source directory. Has information
about the file as attributes and various helper methods. Initialised with
the source path and expected destination path. Subclass and add attributes
or methods relevant to a given implementation."
"""
2015-05-11 14:32:59 +02:00
def __init__(self, src_path, dst_path):
self.src_path = src_path
self.dst_path = dst_path
self.log_details = {'filepath': self.src_path}
self.log_string = ''
2016-12-06 03:02:46 +01:00
_, self.extension = os.path.splitext(self.src_path)
self._determine_mimetype()
2015-11-05 14:43:54 +01:00
2016-12-06 03:02:46 +01:00
def _determine_mimetype(self):
2015-11-24 18:13:41 +01:00
if os.path.islink(self.src_path):
# magic will throw an IOError on a broken symlink
self.mimetype = 'inode/symlink'
else:
2016-05-09 19:21:58 +02:00
try:
mt = magic.from_file(self.src_path, mime=True)
except UnicodeEncodeError as e:
# FIXME: The encoding of the file is broken (possibly UTF-16)
mt = ''
self.log_details.update({'UnicodeError': e})
2015-11-24 18:13:41 +01:00
try:
self.mimetype = mt.decode("utf-8")
except:
self.mimetype = mt
2015-11-05 14:43:54 +01:00
if self.mimetype and '/' in self.mimetype:
self.main_type, self.sub_type = self.mimetype.split('/')
else:
self.main_type = ''
self.sub_type = ''
def has_mimetype(self):
2016-12-06 03:02:46 +01:00
"""
Returns True if file has a full mimetype, else False.
Returns False + updates log if self.main_type or self.sub_type
are not set.
"""
2015-11-05 14:43:54 +01:00
if not self.main_type or not self.sub_type:
2015-11-24 11:49:28 +01:00
self.log_details.update({'broken_mime': True})
2015-11-05 14:43:54 +01:00
return False
return True
def has_extension(self):
2016-12-06 03:02:46 +01:00
"""
Returns True if self.extension is set, else False.
Returns False + updates self.log_details if self.extension is not set.
"""
2015-11-05 14:43:54 +01:00
if not self.extension:
2015-11-24 11:49:28 +01:00
self.log_details.update({'no_extension': True})
2015-11-05 14:43:54 +01:00
return False
return True
def is_dangerous(self):
2016-12-06 03:02:46 +01:00
"""Returns True if self.log_details contains 'dangerous'."""
2015-11-05 14:43:54 +01:00
if self.log_details.get('dangerous'):
return True
return False
2015-05-11 14:32:59 +02:00
2015-11-24 17:45:06 +01:00
def is_symlink(self):
2016-12-06 03:02:46 +01:00
"""Returns True and updates log if file is a symlink."""
2015-11-24 17:45:06 +01:00
if self.has_mimetype() and self.main_type == 'inode' and self.sub_type == 'symlink':
self.log_details.update({'symlink': os.readlink(self.src_path)})
return True
return False
2015-05-11 14:32:59 +02:00
def add_log_details(self, key, value):
2016-12-06 03:02:46 +01:00
"""Takes a key + a value and adds them to self.log_details."""
2015-05-11 14:32:59 +02:00
self.log_details[key] = value
def make_dangerous(self):
2016-12-06 03:02:46 +01:00
"""
Marks a file as dangerous.
Prepends and appends DANGEROUS to the destination file name
to avoid double-click of death.
"""
2015-11-05 14:43:54 +01:00
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
2015-05-11 14:32:59 +02:00
self.log_details['dangerous'] = True
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
def make_unknown(self):
2016-12-06 03:02:46 +01:00
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
2015-11-05 14:43:54 +01:00
if self.is_dangerous() or self.log_details.get('binary'):
# Already marked as dangerous or binary, do nothing
return
2015-05-11 14:32:59 +02:00
self.log_details['unknown'] = True
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
def make_binary(self):
2016-12-06 03:02:46 +01:00
"""Marks a file as a binary and appends .bin to filename."""
2015-11-05 14:43:54 +01:00
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
2015-05-11 14:32:59 +02:00
self.log_details['binary'] = True
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
def force_ext(self, ext):
2016-12-06 03:02:46 +01:00
"""If dst_path does not end in ext, appends the ext and updates log."""
if not self.dst_path.endswith(ext):
self.log_details['force_ext'] = True
self.dst_path += ext
2015-05-11 14:32:59 +02:00
class KittenGroomerBase(object):
def __init__(self, root_src, root_dst, debug=False):
2015-05-11 14:32:59 +02:00
'''
Setup the base options of the copy/convert setup
'''
self.src_root_dir = root_src
self.dst_root_dir = root_dst
self.log_root_dir = os.path.join(self.dst_root_dir, 'logs')
self._safe_rmtree(self.log_root_dir)
2015-05-26 18:08:57 +02:00
self._safe_mkdir(self.log_root_dir)
2015-05-11 14:32:59 +02:00
self.log_processing = os.path.join(self.log_root_dir, 'processing.log')
2015-11-23 19:54:29 +01:00
self.log_content = os.path.join(self.log_root_dir, 'content.log')
self.tree(self.src_root_dir)
2015-05-11 14:32:59 +02:00
2015-11-05 14:43:54 +01:00
quick_setup(file=self.log_processing)
2015-05-11 14:32:59 +02:00
self.log_name = log.name('files')
2015-05-31 15:36:36 +02:00
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
os.environ["PATH"] += os.pathsep + self.ressources_path
2015-05-11 14:32:59 +02:00
self.cur_file = None
self.debug = debug
if self.debug:
self.log_debug_err = os.path.join(self.log_root_dir, 'debug_stderr.log')
self.log_debug_out = os.path.join(self.log_root_dir, 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
2015-11-23 19:54:29 +01:00
def _computehash(self, path):
s = hashlib.sha1()
with open(path, 'rb') as f:
2015-11-23 19:54:29 +01:00
while True:
buf = f.read(0x100000)
if not buf:
break
s.update(buf)
return s.hexdigest()
def tree(self, base_dir, padding=' '):
2016-05-09 19:21:58 +02:00
if sys.version_info.major == 2:
self.__tree_py2(base_dir, padding)
else:
self.__tree_py3(base_dir, padding)
def __tree_py2(self, base_dir, padding=' '):
with open(self.log_content, 'ab') as lf:
2015-11-23 19:54:29 +01:00
lf.write('#' * 80 + '\n')
lf.write('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir))))
padding += '| '
files = sorted(os.listdir(base_dir))
for f in files:
curpath = os.path.join(base_dir, f)
2015-11-24 17:45:06 +01:00
if os.path.islink(curpath):
lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)))
elif os.path.isdir(curpath):
2015-11-23 19:54:29 +01:00
self.tree(curpath, padding)
2015-11-24 17:45:06 +01:00
elif os.path.isfile(curpath):
2015-11-23 19:54:29 +01:00
lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)))
2016-05-09 19:21:58 +02:00
def __tree_py3(self, base_dir, padding=' '):
with open(self.log_content, 'ab') as lf:
lf.write(bytes('#' * 80 + '\n', 'UTF-8'))
lf.write(bytes('{}+- {}/\n'.format(padding, os.path.basename(os.path.abspath(base_dir)).encode()), 'utf8'))
padding += '| '
files = sorted(os.listdir(base_dir))
for f in files:
curpath = os.path.join(base_dir, f)
if os.path.islink(curpath):
lf.write('{}+-- {}\t- Symbolic link to {}\n'.format(padding, f, os.readlink(curpath)).encode(errors='ignore'))
elif os.path.isdir(curpath):
self.tree(curpath, padding)
elif os.path.isfile(curpath):
lf.write('{}+-- {}\t- {}\n'.format(padding, f, self._computehash(curpath)).encode(errors='ignore'))
2015-05-11 14:32:59 +02:00
# ##### Helpers #####
def _safe_rmtree(self, directory):
'''Remove a directory tree if it exists'''
if os.path.exists(directory):
shutil.rmtree(directory)
def _safe_remove(self, filepath):
'''Remove a file if it exists'''
if os.path.exists(filepath):
os.remove(filepath)
def _safe_mkdir(self, directory):
'''Make a directory if it does not exist'''
2015-05-11 14:32:59 +02:00
if not os.path.exists(directory):
os.makedirs(directory)
2015-05-31 15:36:36 +02:00
def _safe_copy(self, src=None, dst=None):
''' Copy a file and create directory if needed'''
2015-05-31 15:36:36 +02:00
if src is None:
src = self.cur_file.src_path
if dst is None:
dst = self.cur_file.dst_path
2015-05-11 14:32:59 +02:00
try:
2015-05-31 15:36:36 +02:00
dst_path, filename = os.path.split(dst)
2015-05-11 14:32:59 +02:00
self._safe_mkdir(dst_path)
2015-05-31 15:36:36 +02:00
shutil.copy(src, dst)
2015-05-11 14:32:59 +02:00
return True
except Exception as e:
# TODO: Logfile
print(e)
return False
def _safe_metadata_split(self, ext):
'''Create a separate file to hold this file's metadata'''
dst = self.cur_file.dst_path
try:
2016-05-09 18:32:45 +02:00
if os.path.exists(self.cur_file.src_path + ext):
raise KittenGroomerError("Cannot create split metadata file for \"" +
2016-05-09 18:32:45 +02:00
self.cur_file.dst_path + "\", type '" +
ext + "': File exists.")
dst_path, filename = os.path.split(dst)
self._safe_mkdir(dst_path)
2016-05-09 18:32:45 +02:00
return open(dst + ext, 'w+')
except Exception as e:
# TODO: Logfile
print(e)
return False
2016-05-09 18:32:45 +02:00
2015-05-11 14:32:59 +02:00
def _list_all_files(self, directory):
''' Generate an iterator over all the files in a directory tree'''
2015-05-11 14:32:59 +02:00
for root, dirs, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
yield filepath
def _print_log(self):
'''
Print log, should be called after each file.
You probably want to reimplement it in the subclass
'''
tmp_log = self.log_name.fields(**self.cur_file.log_details)
tmp_log.info('It did a thing.')
#######################
def processdir(self, src_dir=None, dst_dir=None):
'''
Main function doing the work, you have to implement it yourself.
'''
raise ImplementationRequired('You have to implement the result processdir.')
def main(kg_implementation, description='Call the KittenGroomer implementation to do things on files present in the source directory to the destination directory'):
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')
parser.add_argument('-d', '--destination', type=str, help='Destination directory')
args = parser.parse_args()
kg = kg_implementation(args.source, args.destination)
kg.processdir()