From 531ab43daeb7327f00bca89f9c88b3a3261bd429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 4 Nov 2015 11:06:57 +0100 Subject: [PATCH] Improve debug, add list of malicious ext --- bin/filecheck.py | 52 +++++++++++++++++++++++++++++++--------- bin/generic.py | 20 ++++++++++++---- bin/pier9.py | 4 ++-- bin/specific.py | 9 ++++--- kittengroomer/helpers.py | 10 +++++++- tests/test.py | 30 +++++++++++++---------- 6 files changed, 90 insertions(+), 35 deletions(-) diff --git a/bin/filecheck.py b/bin/filecheck.py index 2f53f04..86ec8d7 100644 --- a/bin/filecheck.py +++ b/bin/filecheck.py @@ -44,6 +44,24 @@ aliases = { # It works as expected if you do mimetypes.guess_type('application/gzip', strict=False) propertype = {'.gz': 'application/gzip'} +# Commonly used malicious extensions +# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/ +mal_ext = ( + # Applications + ".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr", + ".hta", ".cpl", ".msc", ".jar", + # Scripts + ".bat", ".cmd", ".vb", ".vbs", ".vbe", ".js", ".jse", ".ws", ".wsf", + ".wsc", ".wsh", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2", + ".msh", ".msh1", ".msh2", ".mshxml", ".msh1xml", ".msh2xml", + # Shortcuts + ".scf", ".lnk", ".inf", + # Other + ".reg", "dll", + # Office macro (OOXML with macro enabled) + ".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm", ".potm", ".ppam", + ".ppsm", ".sldm",) + class File(FileBase): @@ -51,15 +69,26 @@ class File(FileBase): ''' Init file object, set the mimetype ''' super(File, self).__init__(src_path, dst_path) - mimetype = magic.from_file(src_path, mime=True).decode("utf-8") - try: - self.main_type, self.sub_type = mimetype.split('/') - except Exception as e: - # FIXME/TEMP: checking what happen, probably bad. - print(e, src_path, mimetype) - return - a, self.extension = os.path.splitext(src_path) self.is_recursive = False + try: + mimetype = magic.from_file(src_path, mime=True).decode("utf-8") + self.main_type, self.sub_type = mimetype.split('/') + except: + # FIXME/TEMP: checking what happen, probably bad. + print(src_path, mimetype) + self.log_details.update({'broken_mime': self.extension}) + self.make_dangerous() + return + + a, self.extension = os.path.splitext(src_path) + if self.extension in mal_ext: + self.log_details.update({'malicious_extension': self.extension}) + self.make_dangerous() + return + elif self.extension == '': + self.log_details.update({'no_extension': self.extension}) + self.make_dangerous() + return self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension}) @@ -91,7 +120,7 @@ class File(FileBase): class KittenGroomerFileCheck(KittenGroomerBase): - def __init__(self, root_src=None, root_dst=None, max_recursive=5): + def __init__(self, root_src=None, root_dst=None, max_recursive=5, debug=False): ''' Initialize the basics of the conversion process ''' @@ -99,7 +128,7 @@ class KittenGroomerFileCheck(KittenGroomerBase): root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomerFileCheck, self).__init__(root_src, root_dst) + super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug) self.recursive = 0 self.max_recursive = max_recursive @@ -160,7 +189,8 @@ class KittenGroomerFileCheck(KittenGroomerBase): else: deadline = None args = shlex.split(command_line) - p = subprocess.Popen(args) + with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout: + p = subprocess.Popen(args, stdout=stdout, stderr=stderr) if background: # This timer is here to make sure the unoconv listener is properly started. time.sleep(10) diff --git a/bin/generic.py b/bin/generic.py index d3d172a..5f18c13 100644 --- a/bin/generic.py +++ b/bin/generic.py @@ -46,10 +46,19 @@ class File(FileBase): ''' Init file object, set the mimetype ''' super(File, self).__init__(src_path, dst_path) - mimetype = magic.from_file(src_path, mime=True).decode("utf-8") + self.is_recursive = False + self.main_type = '' + self.main_type = '' + try: + mimetype = magic.from_file(src_path, mime=True).decode("utf-8") + except Exception as e: + print('************************** BROKEN', self.src_path) + print('************************** BROKEN', self.src_path, e) + self.make_dangerous() + return + self.main_type, self.sub_type = mimetype.split('/') a, self.extension = os.path.splitext(src_path) - self.is_recursive = False self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension}) # If the mimetype matches as text/*, it will be sent to LibreOffice, no need to cross check the mime/ext @@ -84,7 +93,7 @@ class File(FileBase): class KittenGroomer(KittenGroomerBase): - def __init__(self, root_src=None, root_dst=None, max_recursive=5): + def __init__(self, root_src=None, root_dst=None, max_recursive=5, debug=False): ''' Initialize the basics of the conversion process ''' @@ -92,7 +101,7 @@ class KittenGroomer(KittenGroomerBase): root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomer, self).__init__(root_src, root_dst) + super(KittenGroomer, self).__init__(root_src, root_dst, debug) self.recursive = 0 self.max_recursive = max_recursive @@ -153,7 +162,8 @@ class KittenGroomer(KittenGroomerBase): else: deadline = None args = shlex.split(command_line) - p = subprocess.Popen(args) + with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout: + p = subprocess.Popen(args, stdout=stdout, stderr=stderr) if background: # FIXME: This timer is here to make sure the unoconv listener is properly started. time.sleep(10) diff --git a/bin/pier9.py b/bin/pier9.py index 004494a..dbf5ee0 100644 --- a/bin/pier9.py +++ b/bin/pier9.py @@ -25,7 +25,7 @@ class FilePier9(FileBase): class KittenGroomerPier9(KittenGroomerBase): - def __init__(self, root_src=None, root_dst=None): + def __init__(self, root_src=None, root_dst=None, debug=False): ''' Initialize the basics of the copy ''' @@ -33,7 +33,7 @@ class KittenGroomerPier9(KittenGroomerBase): root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomerPier9, self).__init__(root_src, root_dst) + super(KittenGroomerPier9, self).__init__(root_src, root_dst, debug) # The initial version will accept all the file extension for all the machines. self.authorized_extensions = printers + cnc + shopbot + omax + epilog_laser + metabeam + up diff --git a/bin/specific.py b/bin/specific.py index e0b18a7..9e5dbed 100644 --- a/bin/specific.py +++ b/bin/specific.py @@ -16,12 +16,15 @@ class FileSpec(FileBase): ''' Init file object, set the extension ''' super(FileSpec, self).__init__(src_path, dst_path) a, self.extension = os.path.splitext(self.src_path) - self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8") + try: + self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8") + except: + print('************************** BROKEN', self.src_path) class KittenGroomerSpec(KittenGroomerBase): - def __init__(self, root_src=None, root_dst=None): + def __init__(self, root_src=None, root_dst=None, debug=False): ''' Initialize the basics of the copy ''' @@ -29,7 +32,7 @@ class KittenGroomerSpec(KittenGroomerBase): root_src = os.path.join(os.sep, 'media', 'src') if root_dst is None: root_dst = os.path.join(os.sep, 'media', 'dst') - super(KittenGroomerSpec, self).__init__(root_src, root_dst) + super(KittenGroomerSpec, self).__init__(root_src, root_dst, debug) self.valid_files = {} # The initial version will only accept the file extensions/mimetypes listed here. diff --git a/kittengroomer/helpers.py b/kittengroomer/helpers.py index 5d2997f..8b0a62c 100644 --- a/kittengroomer/helpers.py +++ b/kittengroomer/helpers.py @@ -87,7 +87,7 @@ class FileBase(object): class KittenGroomerBase(object): - def __init__(self, root_src, root_dst): + def __init__(self, root_src, root_dst, debug=False): ''' Setup the base options of the copy/convert setup ''' @@ -105,6 +105,14 @@ class KittenGroomerBase(object): self.cur_file = None + self.debug = debug + if self.debug: + self.log_debug_err = os.path.join(self.log_root_dir, 'debug_stderr.log') + self.log_debug_out = os.path.join(self.log_root_dir, 'debug_stdout.log') + else: + self.log_debug_err = os.devnull + self.log_debug_out = os.devnull + # ##### Helpers ##### def _safe_rmtree(self, directory): '''Remove a directory tree if it exists''' diff --git a/tests/test.py b/tests/test.py index cd0af9e..07b217b 100644 --- a/tests/test.py +++ b/tests/test.py @@ -21,46 +21,50 @@ class TestBasic(unittest.TestCase): self.maxDiff = None self.curpath = os.getcwd() - def dump_logs(self): - logfile = os.path.join(self.curpath, 'tests/dst/logs/processing.log') - print(open(logfile, 'rb').read()) + def dump_logs(self, kg): + print(open(kg.log_processing, 'rb').read().decode("utf-8")) + if kg.debug: + if os.path.exists(kg.log_debug_err): + print(open(kg.log_debug_err, 'rb').read().decode("utf-8")) + if os.path.exists(kg.log_debug_out): + print(open(kg.log_debug_out, 'rb').read().decode("utf-8")) def test_specific_valid(self): src = os.path.join(self.curpath, 'tests/src2') dst = os.path.join(self.curpath, 'tests/dst') - spec = KittenGroomerSpec(src, dst) + spec = KittenGroomerSpec(src, dst, debug=True) spec.processdir() - self.dump_logs() + self.dump_logs(spec) def test_specific_invalid(self): src = os.path.join(self.curpath, 'tests/src') dst = os.path.join(self.curpath, 'tests/dst') - spec = KittenGroomerSpec(src, dst) + spec = KittenGroomerSpec(src, dst, debug=True) spec.processdir() - self.dump_logs() + self.dump_logs(spec) def test_pier9(self): src = os.path.join(self.curpath, 'tests/src') dst = os.path.join(self.curpath, 'tests/dst') - spec = KittenGroomerPier9(src, dst) + spec = KittenGroomerPier9(src, dst, debug=True) spec.processdir() - self.dump_logs() + self.dump_logs(spec) def test_generic(self): src = os.path.join(self.curpath, 'tests/src') dst = os.path.join(self.curpath, 'tests/dst') - spec = KittenGroomer(src, dst) + spec = KittenGroomer(src, dst, debug=True) spec.processdir() - self.dump_logs() + self.dump_logs(spec) def test_filecheck(self): if sys.version_info.major >= 3: return src = os.path.join(self.curpath, 'tests/src') dst = os.path.join(self.curpath, 'tests/dst') - spec = KittenGroomerFileCheck(src, dst) + spec = KittenGroomerFileCheck(src, dst, debug=True) spec.processdir() - self.dump_logs() + self.dump_logs(spec) def test_help_file(self): f = FileBase('tests/src/blah.conf', 'tests/dst/blah.conf')