PyCIRCLean/kittengroomer/helpers.py

211 lines
7.0 KiB
Python
Raw Normal View History

2015-05-11 14:32:59 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
2015-11-05 14:43:54 +01:00
import magic
2015-05-11 14:32:59 +02:00
import shutil
2015-11-05 14:43:54 +01:00
from twiggy import quick_setup, log
2015-05-11 14:32:59 +02:00
import argparse
class KittenGroomerError(Exception):
def __init__(self, message):
'''
Base KittenGroomer exception handler.
'''
super(KittenGroomerError, self).__init__(message)
self.message = message
class ImplementationRequired(KittenGroomerError):
'''
Implementation required error
'''
pass
class FileBase(object):
def __init__(self, src_path, dst_path):
'''
Contains base information for a file on the source USB key,
initialised with expected src and dest path
'''
self.src_path = src_path
self.dst_path = dst_path
self.log_details = {'filepath': self.src_path}
self.log_string = ''
2015-11-05 14:43:54 +01:00
a, self.extension = os.path.splitext(self.src_path)
mt = magic.from_file(self.src_path, mime=True)
try:
self.mimetype = mt.decode("utf-8")
except:
self.mimetype = mt
if self.mimetype and '/' in self.mimetype:
self.main_type, self.sub_type = self.mimetype.split('/')
else:
self.main_type = ''
self.sub_type = ''
def has_mimetype(self):
if not self.main_type or not self.sub_type:
self.log_details.update({'broken_mime': self.extension})
return False
return True
def has_extension(self):
if not self.extension:
self.log_details.update({'no_extension': self.extension})
return False
return True
def is_dangerous(self):
if self.log_details.get('dangerous'):
return True
return False
2015-05-11 14:32:59 +02:00
def add_log_details(self, key, value):
'''
Add an entry in the log dictionary
'''
self.log_details[key] = value
def make_dangerous(self):
'''
This file should be considered as dangerous and never run.
Prepending and appending DANGEROUS to the destination
file name avoid double-click of death
'''
2015-11-05 14:43:54 +01:00
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
2015-05-11 14:32:59 +02:00
self.log_details['dangerous'] = True
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
def make_unknown(self):
'''
This file has an unknown type and it was not possible to take
a decision. Theuser will have to decide what to do.
Prepending UNKNOWN
'''
2015-11-05 14:43:54 +01:00
if self.is_dangerous() or self.log_details.get('binary'):
# Already marked as dangerous or binary, do nothing
return
2015-05-11 14:32:59 +02:00
self.log_details['unknown'] = True
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
def make_binary(self):
'''
This file is a binary, and should probably not be run.
Appending .bin avoir double click of death but the user
will have to decide by itself.
'''
2015-11-05 14:43:54 +01:00
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
2015-05-11 14:32:59 +02:00
self.log_details['binary'] = True
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
def force_ext(self, ext):
if not self.dst_path.endswith(ext):
self.log_details['force_ext'] = True
self.dst_path += ext
2015-05-11 14:32:59 +02:00
class KittenGroomerBase(object):
def __init__(self, root_src, root_dst, debug=False):
2015-05-11 14:32:59 +02:00
'''
Setup the base options of the copy/convert setup
'''
self.src_root_dir = root_src
self.dst_root_dir = root_dst
self.log_root_dir = os.path.join(self.dst_root_dir, 'logs')
self._safe_rmtree(self.log_root_dir)
2015-05-26 18:08:57 +02:00
self._safe_mkdir(self.log_root_dir)
2015-05-11 14:32:59 +02:00
self.log_processing = os.path.join(self.log_root_dir, 'processing.log')
2015-11-05 14:43:54 +01:00
quick_setup(file=self.log_processing)
2015-05-11 14:32:59 +02:00
self.log_name = log.name('files')
2015-05-31 15:36:36 +02:00
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
os.environ["PATH"] += os.pathsep + self.ressources_path
2015-05-11 14:32:59 +02:00
self.cur_file = None
self.debug = debug
if self.debug:
self.log_debug_err = os.path.join(self.log_root_dir, 'debug_stderr.log')
self.log_debug_out = os.path.join(self.log_root_dir, 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
2015-05-11 14:32:59 +02:00
# ##### Helpers #####
def _safe_rmtree(self, directory):
'''Remove a directory tree if it exists'''
if os.path.exists(directory):
shutil.rmtree(directory)
def _safe_remove(self, filepath):
'''Remove a file if it exists'''
if os.path.exists(filepath):
os.remove(filepath)
def _safe_mkdir(self, directory):
'''Remove a directory if it exists'''
if not os.path.exists(directory):
os.makedirs(directory)
2015-05-31 15:36:36 +02:00
def _safe_copy(self, src=None, dst=None):
2015-05-11 14:32:59 +02:00
''' Copy a file and create directory if needed '''
2015-05-31 15:36:36 +02:00
if src is None:
src = self.cur_file.src_path
if dst is None:
dst = self.cur_file.dst_path
2015-05-11 14:32:59 +02:00
try:
2015-05-31 15:36:36 +02:00
dst_path, filename = os.path.split(dst)
2015-05-11 14:32:59 +02:00
self._safe_mkdir(dst_path)
2015-05-31 15:36:36 +02:00
shutil.copy(src, dst)
2015-05-11 14:32:59 +02:00
return True
except Exception as e:
# TODO: Logfile
print(e)
return False
def _list_all_files(self, directory):
''' Generate an iterator over all the files in a directory tree '''
for root, dirs, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
yield filepath
def _print_log(self):
'''
Print log, should be called after each file.
You probably want to reimplement it in the subclass
'''
tmp_log = self.log_name.fields(**self.cur_file.log_details)
tmp_log.info('It did a thing.')
#######################
def processdir(self, src_dir=None, dst_dir=None):
'''
Main function doing the work, you have to implement it yourself.
'''
raise ImplementationRequired('You have to implement the result processdir.')
def main(kg_implementation, description='Call the KittenGroomer implementation to do things on files present in the source directory to the destination directory'):
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')
parser.add_argument('-d', '--destination', type=str, help='Destination directory')
args = parser.parse_args()
kg = kg_implementation(args.source, args.destination)
kg.processdir()