PyCIRCLean/kittengroomer/helpers.py

359 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2015-05-11 14:32:59 +02:00
# -*- coding: utf-8 -*-
2016-12-06 03:02:46 +01:00
"""
Contains the base objects for use when creating a sanitizer using
2017-04-10 13:18:27 +02:00
PyCIRCLean. Subclass or import from FileBase/KittenGroomerBase and implement
your desired behavior.
2016-12-06 03:02:46 +01:00
"""
2015-05-11 14:32:59 +02:00
import os
2015-11-23 19:54:29 +01:00
import hashlib
2015-05-11 14:32:59 +02:00
import shutil
import argparse
2017-08-05 00:02:31 +02:00
import stat
import traceback
from pathlib import Path
from typing import Union, Optional, List, Dict, Any, Tuple, Iterator
2015-05-11 14:32:59 +02:00
import magic # type: ignore
2015-05-11 14:32:59 +02:00
class FileBase(object):
2016-12-06 03:02:46 +01:00
"""
Base object for individual files in the source directory.
Contains file attributes and various helper methods.
2016-12-06 03:02:46 +01:00
"""
2015-05-11 14:32:59 +02:00
def __init__(self, src_path: Path, dst_path: Path):
"""
Initialized with the source path and expected destination path.
Create various properties and determine the file's mimetype.
"""
self.src_path: Path = src_path
self.dst_dir: Path = dst_path.parent
self.filename: str = src_path.name
self.size: int = self._get_size(src_path)
self.is_dangerous: bool = False
self.copied: bool = False
self.symlink_path = None
self._description_string: List[str] = [] # array of descriptions to be joined
self._errors: Dict[Exception, str] = {}
self._user_defined: Dict[str, str] = {}
self.should_copy: bool = True
self.mimetype = self._determine_mimetype(str(src_path))
@property
def dst_path(self) -> Path:
return self.dst_dir / self.filename
@property
def extension(self) -> Union[None, str]:
ext = self.src_path.suffix
if ext == '':
2017-07-17 16:10:21 +02:00
return None
else:
return ext.lower()
@property
def maintype(self) -> Optional[str]:
main, _ = self._split_mimetype(self.mimetype)
return main
2015-11-05 14:43:54 +01:00
@property
def subtype(self) -> Optional[str]:
_, sub = self._split_mimetype(self.mimetype)
return sub
2017-03-10 19:13:38 +01:00
@property
def has_mimetype(self) -> bool:
"""True if file has a main and sub mimetype, else False."""
if not self.maintype or not self.subtype:
2015-11-05 14:43:54 +01:00
return False
else:
return True
2015-11-05 14:43:54 +01:00
2017-03-10 19:13:38 +01:00
@property
def has_extension(self) -> bool:
"""True if self.extension is set, else False."""
if self.extension is None:
2015-11-05 14:43:54 +01:00
return False
else:
return True
2015-11-05 14:43:54 +01:00
2017-03-10 19:13:38 +01:00
@property
def is_symlink(self) -> bool:
"""True if file is a symlink, else False."""
if self.symlink_path is None:
return False
else:
return True
@property
def description_string(self) -> str:
if len(self._description_string) == 0:
return 'No description'
elif len(self._description_string) == 1:
return self._description_string[0]
else:
ret_string = ', '.join(self._description_string)
return ret_string.strip(', ') # NOTE: why strip?
@description_string.setter
def description_string(self, value: str):
if not isinstance(value, str):
raise TypeError(f"value ({value}) must be a 'str' and not a {type(value)}")
if value not in self._description_string:
self._description_string.append(value)
def set_property(self, prop_string: str, value: Any):
"""
Take a property and a value and add them to the file's stored props.
2017-03-21 00:39:37 +01:00
If `prop_string` is part of the file property API, set it to `value`.
Otherwise, add `prop_string`: `value` to `user_defined` properties.
TODO: rewrite docstring
"""
if hasattr(self, prop_string):
setattr(self, prop_string, value)
else:
self._user_defined[prop_string] = value
2015-11-24 17:45:06 +01:00
def get_property(self, prop_string: str) -> Any:
2017-03-21 00:39:37 +01:00
"""
Get the value for a property stored on the file.
Returns `None` if `prop_string` cannot be found on the file.
"""
try:
return getattr(self, prop_string)
except AttributeError:
return self._user_defined.get(prop_string, None)
def get_all_props(self) -> dict:
2017-03-18 05:10:17 +01:00
"""Return a dict containing all stored properties of this file."""
# Maybe move this onto the logger? I think that makes more sense
props_dict = {
'filepath': self.src_path,
'filename': self.filename,
'file_size': self.size,
'mimetype': self.mimetype,
'maintype': self.maintype,
'subtype': self.subtype,
'extension': self.extension,
'is_dangerous': self.is_dangerous,
'is_symlink': self.is_symlink,
'symlink_path': self.symlink_path,
'copied': self.copied,
'description_string': self.description_string,
'errors': self._errors,
'user_defined': self._user_defined
}
return props_dict
2017-03-18 05:10:17 +01:00
def add_error(self, error: Exception, info_string: str):
2017-03-21 00:39:37 +01:00
"""Add an `error`: `info_string` pair to the file."""
self._errors.update({error: info_string})
def add_description(self, description_string: str):
"""
Add a description string to the file.
If `description_string` is already present, will prevent duplicates.
"""
self.set_property('description_string', description_string)
2015-05-11 14:32:59 +02:00
def make_dangerous(self, reason_string: Optional[str]=None):
2016-12-06 03:02:46 +01:00
"""
Mark file as dangerous.
2016-12-06 03:02:46 +01:00
Prepend and append DANGEROUS to the destination file name
2016-12-22 00:04:59 +01:00
to help prevent double-click of death.
2016-12-06 03:02:46 +01:00
"""
if not self.is_dangerous:
self.set_property('is_dangerous', True)
self.filename = 'DANGEROUS_{}_DANGEROUS'.format(self.filename)
if reason_string:
self.add_description(reason_string)
2015-05-11 14:32:59 +02:00
def safe_copy(self):
2017-08-05 00:02:31 +02:00
"""
Copy file and create destination directories if needed.
Sets all exec bits to '0'.
"""
src = self.src_path
dst = self.dst_path
2017-03-09 19:48:07 +01:00
try:
self.dst_dir.mkdir(exist_ok=True, parents=True)
shutil.copy(src, dst)
2017-08-05 00:02:31 +02:00
current_perms = self._get_file_permissions(dst)
only_exec_bits = 0o0111
perms_no_exec = current_perms & (~only_exec_bits)
dst.chmod(perms_no_exec)
return True
except IOError as e:
# Probably means we can't write in the dest dir
2017-03-09 19:48:07 +01:00
self.add_error(e, '')
traceback.print_exc()
return False
2017-03-09 19:48:07 +01:00
def force_ext(self, extension: str):
"""If dst_path does not end in `extension`, append .ext to it."""
new_ext = self._check_leading_dot(extension)
if not self.filename.endswith(new_ext):
# TODO: log that the extension was changed
self.filename += new_ext
def create_metadata_file(self, extension) -> Union[Path, bool]:
# TODO: this method name is confusing
2017-03-21 00:39:37 +01:00
"""
Create a separate file to hold extracted metadata.
The string `extension` will be used as the extension for the file.
2017-03-21 00:39:37 +01:00
"""
ext = self._check_leading_dot(extension)
2017-02-16 23:27:00 +01:00
try:
# Prevent using the same path as another file from src_path
if Path(f'{self.src_path}{ext}').exists():
raise KittenGroomerError(f'Could not create metadata file for "{self.filename}": a file with that path exists.')
2017-02-16 23:27:00 +01:00
else:
self.dst_dir.mkdir(exist_ok=True, parents=True)
# TODO: shouldn't mutate state and also return something
self.metadata_file_path = Path(f'{self.dst_path}{ext}')
2017-02-16 23:27:00 +01:00
return self.metadata_file_path
# TODO: can probably let this exception bubble up
2017-02-16 23:27:00 +01:00
except KittenGroomerError as e:
self.add_error(e, '')
2017-02-16 23:27:00 +01:00
return False
def _check_leading_dot(self, ext: str) -> str:
# TODO: this method name is confusing
2017-03-21 00:39:37 +01:00
if len(ext) > 0:
if not ext.startswith('.'):
return '.' + ext
return ext
def _determine_mimetype(self, file_path: str) -> str:
if os.path.islink(file_path):
# libmagic will throw an IOError on a broken symlink
mimetype = 'inode/symlink'
2017-07-13 23:36:43 +02:00
self.set_property('symlink_path', os.readlink(file_path))
else:
try:
2017-07-20 21:40:49 +02:00
mt = magic.from_file(file_path, mime=True)
2017-08-07 18:09:22 +02:00
# libmagic always returns something, even if it's just 'data'
except UnicodeEncodeError as e:
self.add_error(e, '')
mt = None
try:
2022-03-21 09:57:05 +01:00
mimetype = mt.decode("utf-8") # type: ignore
except Exception:
2017-08-07 18:09:22 +02:00
# FIXME: what should the exception be if mimetype isn't utf-8?
2022-03-21 09:57:05 +01:00
mimetype = 'application/octet-stream'
return mimetype
def _split_mimetype(self, mimetype: str) -> Tuple[Union[str, None], Union[str, None]]:
main_type, sub_type = None, None
2017-07-16 20:25:16 +02:00
if mimetype and '/' in mimetype:
main_type, sub_type = mimetype.split('/')
return main_type, sub_type
def _get_size(self, file_path: Path) -> int:
"""Filesize in bytes as an int, 0 if file does not exist."""
try:
size = os.path.getsize(file_path)
except FileNotFoundError:
size = 0
return size
def _remove_exec_bit(self, file_path: Path):
2017-08-05 00:02:31 +02:00
current_perms = self._get_file_permissions(file_path)
perms_no_exec = current_perms & (~stat.S_IEXEC)
os.chmod(file_path, perms_no_exec)
def _get_file_permissions(self, file_path: Path):
full_mode = file_path.lstat().st_mode
2017-08-05 00:02:31 +02:00
return stat.S_IMODE(full_mode)
2015-05-11 14:32:59 +02:00
class Logging(object):
@staticmethod
def computehash(path: Path) -> str:
"""Return the sha256 hash of a file at a given path."""
s = hashlib.sha256()
with path.open('rb') as f:
while True:
buf = f.read(0x100000)
if not buf:
break
s.update(buf)
return s.hexdigest()
class KittenGroomerBase(object):
"""Base object responsible for copy/sanitization process."""
def __init__(self, src_root_path: str, dst_root_path: str):
"""Initialized with path to source and dest directories."""
self.src_root_path: Path = Path(os.path.abspath(src_root_path))
self.dst_root_path: Path = Path(os.path.abspath(dst_root_path))
def safe_rmtree(self, directory_path: Path):
"""Remove a directory tree if it exists."""
if directory_path.is_dir():
2017-03-21 00:39:37 +01:00
shutil.rmtree(directory_path)
2015-05-11 14:32:59 +02:00
def safe_remove(self, file_path: Path):
2017-03-21 00:39:37 +01:00
"""Remove file at file_path if it exists."""
if file_path.is_file():
2017-03-21 00:39:37 +01:00
os.remove(file_path)
2015-05-11 14:32:59 +02:00
def safe_mkdir(self, directory_path: Path):
"""Make a directory if it does not exist."""
if not directory_path.exists():
directory_path.mkdir(parents=True)
2015-05-11 14:32:59 +02:00
def list_all_files(self, directory_path: Path) -> Iterator[Path]:
2017-02-16 23:27:00 +01:00
"""Generator yielding path to all of the files in a directory tree."""
2017-03-21 00:39:37 +01:00
for root, dirs, files in os.walk(directory_path):
2015-05-11 14:32:59 +02:00
for filename in files:
yield Path(root) / filename
2015-05-11 14:32:59 +02:00
#######################
def processdir(self, src_dir: Path, dst_dir: Path):
"""Implement this function to define file processing behavior."""
raise ImplementationRequired('Please implement processdir.')
2015-05-11 14:32:59 +02:00
2017-07-11 20:45:12 +02:00
class KittenGroomerError(Exception):
"""Base KittenGroomer exception handler."""
def __init__(self, message: str):
2017-07-11 20:45:12 +02:00
super(KittenGroomerError, self).__init__(message)
self.message = message
class ImplementationRequired(KittenGroomerError):
"""Implementation required error."""
pass
2017-08-07 18:09:22 +02:00
def main(
kg_implementation,
description=("Call a KittenGroomer implementation to process files "
"present in the source directory and copy them to the "
"destination directory.")):
print(description)
2015-05-11 14:32:59 +02:00
parser = argparse.ArgumentParser(prog='KittenGroomer', description=description)
parser.add_argument('-s', '--source', type=str, help='Source directory')
parser.add_argument('-d', '--destination', type=str, help='Destination directory')
args = parser.parse_args()
kg = kg_implementation(args.source, args.destination)
kg.processdir()