PyCIRCLean/kittengroomer/helpers.py

346 lines
12 KiB
Python
Raw Normal View History

2015-05-11 14:32:59 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
2016-12-06 03:02:46 +01:00
"""
Contains the base objects for use when creating a sanitizer using
2017-04-10 13:18:27 +02:00
PyCIRCLean. Subclass or import from FileBase/KittenGroomerBase and implement
your desired behavior.
2016-12-06 03:02:46 +01:00
"""
2015-05-11 14:32:59 +02:00
import os
2015-11-23 19:54:29 +01:00
import hashlib
2015-05-11 14:32:59 +02:00
import shutil
import argparse
import magic
2015-05-11 14:32:59 +02:00
class FileBase(object):
2016-12-06 03:02:46 +01:00
"""
Base object for individual files in the source directory.
Contains file attributes and various helper methods.
2016-12-06 03:02:46 +01:00
"""
2015-05-11 14:32:59 +02:00
def __init__(self, src_path, dst_path):
"""
Initialized with the source path and expected destination path.
Create various properties and determine the file's mimetype.
"""
2015-05-11 14:32:59 +02:00
self.src_path = src_path
self.dst_path = dst_path
self.filename = os.path.basename(self.src_path)
self._file_props = {
'filepath': self.src_path,
'filename': self.filename,
'file_size': self.size,
'maintype': None,
'subtype': None,
'extension': None,
'safety_category': None,
'symlink': False,
'copied': False,
'description_string': [], # array of descriptions to be joined
'errors': {},
'user_defined': {}
}
self.extension = self._determine_extension()
self.set_property('extension', self.extension)
self.mimetype = self._determine_mimetype()
self.should_copy = True
self.main_type = None
self.sub_type = None
if self.mimetype:
self.main_type, self.sub_type = self._split_subtypes(self.mimetype)
if self.main_type:
self.set_property('maintype', self.main_type)
if self.sub_type:
self.set_property('subtype', self.sub_type)
2015-11-05 14:43:54 +01:00
def _determine_extension(self):
_, ext = os.path.splitext(self.src_path)
ext = ext.lower()
if ext == '':
ext = None
return ext
2016-12-06 03:02:46 +01:00
def _determine_mimetype(self):
2015-11-24 18:13:41 +01:00
if os.path.islink(self.src_path):
# libmagic will throw an IOError on a broken symlink
mimetype = 'inode/symlink'
self.set_property('symlink', os.readlink(self.src_path))
2015-11-24 18:13:41 +01:00
else:
2016-05-09 19:21:58 +02:00
try:
mt = magic.from_file(self.src_path, mime=True)
# libmagic will always return something, even if it's just 'data'
2016-05-09 19:21:58 +02:00
except UnicodeEncodeError as e:
2017-04-10 13:18:27 +02:00
# FIXME: The encoding of the file that triggers this is broken (possibly it's UTF-16 and Python expects utf8)
# Note: one of the Travis files will trigger this exception
self.add_error(e, '')
mt = None
2015-11-24 18:13:41 +01:00
try:
mimetype = mt.decode("utf-8")
2015-11-24 18:13:41 +01:00
except:
mimetype = mt
return mimetype
def _split_subtypes(self, mimetype):
if '/' in mimetype:
main_type, sub_type = mimetype.split('/')
2015-11-05 14:43:54 +01:00
else:
main_type, sub_type = None, None
return main_type, sub_type
2015-11-05 14:43:54 +01:00
@property
def size(self):
"""Filesize in bytes as an int, 0 if file does not exist."""
try:
size = os.path.getsize(self.src_path)
except FileNotFoundError:
size = 0
return size
2017-03-10 19:13:38 +01:00
@property
2015-11-05 14:43:54 +01:00
def has_mimetype(self):
"""True if file has a main and sub mimetype, else False."""
2015-11-05 14:43:54 +01:00
if not self.main_type or not self.sub_type:
return False
else:
return True
2015-11-05 14:43:54 +01:00
2017-03-10 19:13:38 +01:00
@property
2015-11-05 14:43:54 +01:00
def has_extension(self):
"""True if self.extension is set, else False."""
if self.extension is None:
2015-11-05 14:43:54 +01:00
return False
else:
return True
2015-11-05 14:43:54 +01:00
2017-03-10 19:13:38 +01:00
@property
2015-11-05 14:43:54 +01:00
def is_dangerous(self):
"""True if file has been marked 'dangerous', else False."""
return self._file_props['safety_category'] is 'dangerous'
2016-12-22 00:04:59 +01:00
2017-03-10 19:13:38 +01:00
@property
2016-12-22 00:04:59 +01:00
def is_unknown(self):
"""True if file has been marked 'unknown', else False."""
return self._file_props['safety_category'] is 'unknown'
2016-12-22 00:04:59 +01:00
2017-03-10 19:13:38 +01:00
@property
2016-12-22 00:04:59 +01:00
def is_binary(self):
"""True if file has been marked 'binary', else False."""
return self._file_props['safety_category'] is 'binary'
2015-05-11 14:32:59 +02:00
2017-03-10 19:13:38 +01:00
@property
2015-11-24 17:45:06 +01:00
def is_symlink(self):
"""True if file is a symlink, else False."""
if self._file_props['symlink'] is False:
return False
else:
return True
def set_property(self, prop_string, value):
"""
2017-03-21 00:39:37 +01:00
Take a property and a value and add them to the file's property dict.
2017-03-21 00:39:37 +01:00
If `prop_string` is part of the file property API, set it to `value`.
Otherwise, add `prop_string`: `value` to `user_defined` properties.
"""
if prop_string is 'description_string':
if value not in self._file_props['description_string']:
self._file_props['description_string'].append(value)
elif prop_string in self._file_props.keys():
self._file_props[prop_string] = value
else:
self._file_props['user_defined'][prop_string] = value
2015-11-24 17:45:06 +01:00
2017-03-21 00:39:37 +01:00
def get_property(self, prop_string):
"""
Get the value for a property stored on the file.
Returns `None` if `prop_string` cannot be found on the file.
"""
if prop_string in self._file_props:
return self._file_props[prop_string]
elif prop_string in self._file_props['user_defined']:
return self._file_props['user_defined'][prop_string]
else:
return None
2017-03-18 05:10:17 +01:00
def get_all_props(self):
"""Return a dict containing all stored properties of this file."""
return self._file_props
2017-03-21 00:39:37 +01:00
def add_error(self, error, info_string):
"""Add an `error`: `info_string` pair to the file."""
self._file_props['errors'].update({error: info_string})
def add_description(self, description_string):
"""
Add a description string to the file.
If `description_string` is already present, will prevent duplicates.
"""
self.set_property('description_string', description_string)
2015-05-11 14:32:59 +02:00
def make_dangerous(self, reason_string=None):
2016-12-06 03:02:46 +01:00
"""
Mark file as dangerous.
2016-12-06 03:02:46 +01:00
Prepend and append DANGEROUS to the destination file name
2016-12-22 00:04:59 +01:00
to help prevent double-click of death.
2016-12-06 03:02:46 +01:00
"""
2017-03-10 19:13:38 +01:00
if self.is_dangerous:
self.set_property('description_string', reason_string)
return
self.set_property('safety_category', 'dangerous')
self.set_property('description_string', reason_string)
2015-05-11 14:32:59 +02:00
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
def make_unknown(self):
"""Mark file as an unknown type and prepend UNKNOWN to filename."""
2017-03-10 19:13:38 +01:00
if self.is_dangerous or self.is_binary:
return
self.set_property('safety_category', 'unknown')
2015-05-11 14:32:59 +02:00
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
def make_binary(self):
"""Mark file as a binary and append .bin to filename."""
2017-03-10 19:13:38 +01:00
if self.is_dangerous:
return
self.set_property('safety_category', 'binary')
2015-05-11 14:32:59 +02:00
path, filename = os.path.split(self.dst_path)
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
2017-03-09 19:48:07 +01:00
def safe_copy(self, src=None, dst=None):
"""Copy file and create destination directories if needed."""
2017-03-09 19:48:07 +01:00
if src is None:
src = self.src_path
if dst is None:
dst = self.dst_path
try:
dst_path, filename = os.path.split(dst)
if not os.path.exists(dst_path):
os.makedirs(dst_path)
shutil.copy(src, dst)
except Exception as e:
self.add_error(e, '')
def force_ext(self, ext):
2017-03-21 00:39:37 +01:00
"""If dst_path does not end in ext, append .ext to it."""
ext = self._check_leading_dot(ext)
if not self.dst_path.endswith(ext):
2017-03-21 00:39:37 +01:00
# LOG: do we want to log that the extension was changed as below?
# self.set_property('force_ext', True)
self.dst_path += ext
if not self._file_props['extension'] == ext:
self.set_property('extension', ext)
2017-02-16 23:27:00 +01:00
def create_metadata_file(self, ext):
2017-03-21 00:39:37 +01:00
"""
Create a separate file to hold extracted metadata.
The string `ext` will be used as the extension for the metadata file.
"""
ext = self._check_leading_dot(ext)
2017-02-16 23:27:00 +01:00
try:
if os.path.exists(self.src_path + ext):
2017-03-21 00:39:37 +01:00
err_str = ("Could not create metadata file for \"" +
self.filename +
"\": a file with that path already exists.")
raise KittenGroomerError(err_str)
2017-02-16 23:27:00 +01:00
else:
dst_dir_path, filename = os.path.split(self.dst_path)
if not os.path.exists(dst_dir_path):
os.makedirs(dst_dir_path)
2017-02-16 23:27:00 +01:00
self.metadata_file_path = self.dst_path + ext
return self.metadata_file_path
except KittenGroomerError as e:
self.add_error(e, '')
2017-02-16 23:27:00 +01:00
return False
2017-03-21 00:39:37 +01:00
def _check_leading_dot(self, ext):
if len(ext) > 0:
if not ext.startswith('.'):
return '.' + ext
return ext
2015-05-11 14:32:59 +02:00
class Logging(object):
@staticmethod
def computehash(path):
"""Return the sha256 hash of a file at a given path."""
s = hashlib.sha256()
with open(path, 'rb') as f:
while True:
buf = f.read(0x100000)
if not buf:
break
s.update(buf)
return s.hexdigest()
class KittenGroomerBase(object):
"""Base object responsible for copy/sanitization process."""
2017-03-21 00:39:37 +01:00
def __init__(self, src_root_path, dst_root_path):
"""Initialized with path to source and dest directories."""
self.src_root_path = os.path.abspath(src_root_path)
self.dst_root_path = os.path.abspath(dst_root_path)
2017-03-21 00:39:37 +01:00
def safe_rmtree(self, directory_path):
"""Remove a directory tree if it exists."""
2017-03-21 00:39:37 +01:00
if os.path.exists(directory_path):
shutil.rmtree(directory_path)
2015-05-11 14:32:59 +02:00
2017-03-21 00:39:37 +01:00
def safe_remove(self, file_path):
"""Remove file at file_path if it exists."""
if os.path.exists(file_path):
os.remove(file_path)
2015-05-11 14:32:59 +02:00
2017-03-21 00:39:37 +01:00
def safe_mkdir(self, directory_path):
"""Make a directory if it does not exist."""
2017-03-21 00:39:37 +01:00
if not os.path.exists(directory_path):
os.makedirs(directory_path)
2015-05-11 14:32:59 +02:00
2017-03-21 00:39:37 +01:00
def list_all_files(self, directory_path):
2017-02-16 23:27:00 +01:00
"""Generator yielding path to all of the files in a directory tree."""
2017-03-21 00:39:37 +01:00
for root, dirs, files in os.walk(directory_path):
2017-04-10 13:18:27 +02:00
# files is a list anyway so we don't get much from using a generator here
2015-05-11 14:32:59 +02:00
for filename in files:
filepath = os.path.join(root, filename)
yield filepath
#######################
def processdir(self, src_dir, dst_dir):
"""Implement this function to define file processing behavior."""
raise ImplementationRequired('Please implement processdir.')
2015-05-11 14:32:59 +02:00
2017-07-11 20:45:12 +02:00
class KittenGroomerError(Exception):
"""Base KittenGroomer exception handler."""
def __init__(self, message):
super(KittenGroomerError, self).__init__(message)
self.message = message
class ImplementationRequired(KittenGroomerError):
"""Implementation required error."""
pass
def main(kg_implementation, description='Call a KittenGroomer implementation to process files present in the source directory and copy them to the destination directory.'):
2015-05-11 14:32:59 +02:00
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')
parser.add_argument('-d', '--destination', type=str, help='Destination directory')
args = parser.parse_args()
kg = kg_implementation(args.source, args.destination)
kg.processdir()