diff --git a/bin/filecheck.py b/bin/filecheck.py index d9677b7..7ed02ba 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -1,6 +1,5 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import magic import os import mimetypes import shlex @@ -70,27 +69,23 @@ class File(FileBase): super(File, self).__init__(src_path, dst_path) self.is_recursive = False - try: - mimetype = magic.from_file(src_path, mime=True).decode("utf-8") - self.main_type, self.sub_type = mimetype.split('/') - except: - # FIXME/TEMP: checking what happen, probably bad. - print(src_path, mimetype) - self.log_details.update({'broken_mime': self.extension}) + if not self.has_mimetype(): + # No mimetype, should not happen. + self.make_dangerous() + + if not self.has_extension(): self.make_dangerous() - return - a, self.extension = os.path.splitext(src_path) if self.extension in mal_ext: self.log_details.update({'malicious_extension': self.extension}) self.make_dangerous() - return - elif self.extension == '': - self.log_details.update({'no_extension': self.extension}) - self.make_dangerous() + + if self.is_dangerous(): return - self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension}) + self.log_details.update({'maintype': self.main_type, + 'subtype': self.sub_type, + 'extension': self.extension}) # Check correlation known extension => actual mime type if propertype.get(self.extension) is not None: @@ -101,13 +96,16 @@ class File(FileBase): expected_mimetype = aliases.get(expected_mimetype) is_known_extension = self.extension in mimetypes.types_map.keys() - if is_known_extension and expected_mimetype != mimetype: + if is_known_extension and expected_mimetype != self.mimetype: self.log_details.update({'expected_mimetype': expected_mimetype}) self.make_dangerous() # check correlation actual mime type => known extensions - if aliases.get(mimetype) is not None: - mimetype = aliases.get(mimetype) + if aliases.get(self.mimetype) is not None: + mimetype = aliases.get(self.mimetype) + else: + mimetype = self.mimetype + expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False) if expected_extensions: if len(self.extension) > 0 and self.extension not in expected_extensions: @@ -175,7 +173,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): Print the logs related to the current file being processed ''' tmp_log = self.log_name.fields(**self.cur_file.log_details) - if self.cur_file.log_details.get('dangerous'): + if self.cur_file.is_dangerous(): tmp_log.warning(self.cur_file.log_string) elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'): tmp_log.info(self.cur_file.log_string) @@ -189,7 +187,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): else: deadline = None args = shlex.split(command_line) - with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout: + with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout: p = subprocess.Popen(args, stdout=stdout, stderr=stderr) if background: # This timer is here to make sure the unoconv listener is properly started. @@ -444,7 +442,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''), self.cur_file.main_type, self.cur_file.sub_type) - if self.cur_file.log_details.get('dangerous') is None: + if not self.cur_file.is_dangerous(): self.mime_processing_options.get(self.cur_file.main_type, self.unknown)() else: self._safe_copy() diff --git a/bin/generic.py b/bin/generic.py index 120613f..e0c3e0e 100644 --- a/bin/generic.py +++ b/bin/generic.py @@ -1,6 +1,5 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import magic import os import mimetypes import shlex @@ -47,23 +46,17 @@ class File(FileBase): super(File, self).__init__(src_path, dst_path) self.is_recursive = False - self.main_type = '' - self.main_type = '' - try: - mimetype = magic.from_file(src_path, mime=True) - try: - mimetype = mimetype.decode("utf-8") - except: - pass - except Exception as e: - print('************************** BROKEN', self.src_path, e) + if not self.has_mimetype(): + # No mimetype, should not happen. self.make_dangerous() + + if self.is_dangerous(): return - self.main_type, self.sub_type = mimetype.split('/') - a, self.extension = os.path.splitext(src_path) + self.log_details.update({'maintype': self.main_type, + 'subtype': self.sub_type, + 'extension': self.extension}) - self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension}) # If the mimetype matches as text/*, it will be sent to LibreOffice, no need to cross check the mime/ext if self.main_type == 'text': return @@ -77,13 +70,15 @@ class File(FileBase): expected_mimetype = aliases.get(expected_mimetype) is_known_extension = self.extension in mimetypes.types_map.keys() - if is_known_extension and expected_mimetype != mimetype: + if is_known_extension and expected_mimetype != self.mimetype: self.log_details.update({'expected_mimetype': expected_mimetype}) self.make_dangerous() # check correlation actual mime type => known extensions - if aliases.get(mimetype) is not None: - mimetype = aliases.get(mimetype) + if aliases.get(self.mimetype) is not None: + mimetype = aliases.get(self.mimetype) + else: + mimetype = self.mimetype expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False) if expected_extensions: if len(self.extension) > 0 and self.extension not in expected_extensions: @@ -151,7 +146,7 @@ class KittenGroomer(KittenGroomerBase): Print the logs related to the current file being processed ''' tmp_log = self.log_name.fields(**self.cur_file.log_details) - if self.cur_file.log_details.get('dangerous'): + if self.cur_file.is_dangerous(): tmp_log.warning(self.cur_file.log_string) elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'): tmp_log.info(self.cur_file.log_string) @@ -165,7 +160,7 @@ class KittenGroomer(KittenGroomerBase): else: deadline = None args = shlex.split(command_line) - with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout: + with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout: p = subprocess.Popen(args, stdout=stdout, stderr=stderr) if background: # FIXME: This timer is here to make sure the unoconv listener is properly started. @@ -353,7 +348,7 @@ class KittenGroomer(KittenGroomerBase): self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''), self.cur_file.main_type, self.cur_file.sub_type) - if self.cur_file.log_details.get('dangerous') is None: + if not self.cur_file.is_dangerous(): self.mime_processing_options.get(self.cur_file.main_type, self.unknown)() else: self._safe_copy() diff --git a/bin/pier9.py b/bin/pier9.py index dbf5ee0..6ded725 100644 --- a/bin/pier9.py +++ b/bin/pier9.py @@ -20,7 +20,9 @@ class FilePier9(FileBase): def __init__(self, src_path, dst_path): ''' Init file object, set the extension ''' super(FilePier9, self).__init__(src_path, dst_path) - a, self.extension = os.path.splitext(self.src_path) + + if not self.has_extension(): + self.make_dangerous() class KittenGroomerPier9(KittenGroomerBase): @@ -55,12 +57,16 @@ class KittenGroomerPier9(KittenGroomerBase): for srcpath in self._list_all_files(self.src_root_dir): self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) self.cur_file = FilePier9(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) - if self.cur_file.extension in self.authorized_extensions: + if not self.cur_file.is_dangerous() and self.cur_file.extension in self.authorized_extensions: self.cur_file.add_log_details('valid', True) self.cur_file.log_string = 'Expected extension: ' + self.cur_file.extension self._safe_copy() else: - self.cur_file.log_string = 'Bad extension: ' + self.cur_file.extension + self.cur_file.make_dangerous() + if self.cur_file.extension: + self.cur_file.log_string = 'Bad extension: ' + self.cur_file.extension + else: + self.cur_file.log_string = 'No Extension.' self._print_log() diff --git a/bin/specific.py b/bin/specific.py index 1f570b8..fcca2f4 100644 --- a/bin/specific.py +++ b/bin/specific.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os -import magic from kittengroomer import FileBase, KittenGroomerBase, main @@ -15,15 +14,12 @@ class FileSpec(FileBase): def __init__(self, src_path, dst_path): ''' Init file object, set the extension ''' super(FileSpec, self).__init__(src_path, dst_path) - a, self.extension = os.path.splitext(self.src_path) - try: - self.mimetype = magic.from_file(self.src_path, mime=True) - try: - self.imetype = self.mimetype.decode("utf-8") - except: - pass - except Exception as e: - print('************************** BROKEN', self.src_path, e) + + if not self.has_mimetype(): + self.make_dangerous() + + if not self.has_extension(): + self.make_dangerous() class KittenGroomerSpec(KittenGroomerBase): @@ -62,27 +58,33 @@ class KittenGroomerSpec(KittenGroomerBase): valid = True self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', '')) self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir)) - expected_mime = self.valid_files.get(self.cur_file.extension) - compare_ext = None - compare_mime = None - if expected_mime is None: - # Unexpected extension => disallowed + if self.cur_file.is_dangerous(): valid = False - compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys())) - elif self.cur_file.mimetype != expected_mime: - # Unexpected mimetype => dissalowed - valid = False - compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime) - self.cur_file.add_log_details('valid', valid) - if valid: - to_copy.append(self.cur_file) - self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype) - else: error.append(self.cur_file) - if compare_ext is not None: - self.cur_file.log_string = compare_ext + else: + expected_mime = self.valid_files.get(self.cur_file.extension) + compare_ext = '' + compare_mime = '' + if expected_mime is None: + # Unexpected extension => disallowed + valid = False + compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys())) + elif self.cur_file.mimetype != expected_mime: + # Unexpected mimetype => dissalowed + valid = False + compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime) + + if valid: + to_copy.append(self.cur_file) + self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype) else: - self.cur_file.log_string = compare_mime + error.append(self.cur_file) + if compare_ext: + self.cur_file.log_string = compare_ext + else: + self.cur_file.log_string = compare_mime + self.cur_file.add_log_details('valid', valid) + if len(error) > 0: for f in error + to_copy: self.cur_file = f diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 8b0a62c..3024cd4 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -1,8 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os +import magic import shutil -from twiggy import quickSetup, log +from twiggy import quick_setup, log import argparse @@ -33,6 +34,36 @@ class FileBase(object): self.dst_path = dst_path self.log_details = {'filepath': self.src_path} self.log_string = '' + a, self.extension = os.path.splitext(self.src_path) + + mt = magic.from_file(self.src_path, mime=True) + try: + self.mimetype = mt.decode("utf-8") + except: + self.mimetype = mt + + if self.mimetype and '/' in self.mimetype: + self.main_type, self.sub_type = self.mimetype.split('/') + else: + self.main_type = '' + self.sub_type = '' + + def has_mimetype(self): + if not self.main_type or not self.sub_type: + self.log_details.update({'broken_mime': self.extension}) + return False + return True + + def has_extension(self): + if not self.extension: + self.log_details.update({'no_extension': self.extension}) + return False + return True + + def is_dangerous(self): + if self.log_details.get('dangerous'): + return True + return False def add_log_details(self, key, value): ''' @@ -46,7 +77,7 @@ class FileBase(object): Prepending and appending DANGEROUS to the destination file name avoid double-click of death ''' - if self.log_details.get('dangerous'): + if self.is_dangerous(): # Already marked as dangerous, do nothing return self.log_details['dangerous'] = True @@ -59,7 +90,7 @@ class FileBase(object): a decision. Theuser will have to decide what to do. Prepending UNKNOWN ''' - if self.log_details.get('dangerous') or self.log_details.get('binary'): + if self.is_dangerous() or self.log_details.get('binary'): # Already marked as dangerous or binary, do nothing return self.log_details['unknown'] = True @@ -72,7 +103,7 @@ class FileBase(object): Appending .bin avoir double click of death but the user will have to decide by itself. ''' - if self.log_details.get('dangerous'): + if self.is_dangerous(): # Already marked as dangerous, do nothing return self.log_details['binary'] = True @@ -98,7 +129,7 @@ class KittenGroomerBase(object): self._safe_mkdir(self.log_root_dir) self.log_processing = os.path.join(self.log_root_dir, 'processing.log') - quickSetup(file=self.log_processing) + quick_setup(file=self.log_processing) self.log_name = log.name('files') self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') os.environ["PATH"] += os.pathsep + self.ressources_path