mirror of https://github.com/CIRCL/PyCIRCLean
Improve debug, add list of malicious ext
parent
975ed03e38
commit
531ab43dae
|
@ -44,6 +44,24 @@ aliases = {
|
|||
# It works as expected if you do mimetypes.guess_type('application/gzip', strict=False)
|
||||
propertype = {'.gz': 'application/gzip'}
|
||||
|
||||
# Commonly used malicious extensions
|
||||
# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/
|
||||
mal_ext = (
|
||||
# Applications
|
||||
".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr",
|
||||
".hta", ".cpl", ".msc", ".jar",
|
||||
# Scripts
|
||||
".bat", ".cmd", ".vb", ".vbs", ".vbe", ".js", ".jse", ".ws", ".wsf",
|
||||
".wsc", ".wsh", ".ps1", ".ps1xml", ".ps2", ".ps2xml", ".psc1", ".psc2",
|
||||
".msh", ".msh1", ".msh2", ".mshxml", ".msh1xml", ".msh2xml",
|
||||
# Shortcuts
|
||||
".scf", ".lnk", ".inf",
|
||||
# Other
|
||||
".reg", "dll",
|
||||
# Office macro (OOXML with macro enabled)
|
||||
".docm", ".dotm", ".xlsm", ".xltm", ".xlam", ".pptm", ".potm", ".ppam",
|
||||
".ppsm", ".sldm",)
|
||||
|
||||
|
||||
class File(FileBase):
|
||||
|
||||
|
@ -51,15 +69,26 @@ class File(FileBase):
|
|||
''' Init file object, set the mimetype '''
|
||||
super(File, self).__init__(src_path, dst_path)
|
||||
|
||||
mimetype = magic.from_file(src_path, mime=True).decode("utf-8")
|
||||
try:
|
||||
self.main_type, self.sub_type = mimetype.split('/')
|
||||
except Exception as e:
|
||||
# FIXME/TEMP: checking what happen, probably bad.
|
||||
print(e, src_path, mimetype)
|
||||
return
|
||||
a, self.extension = os.path.splitext(src_path)
|
||||
self.is_recursive = False
|
||||
try:
|
||||
mimetype = magic.from_file(src_path, mime=True).decode("utf-8")
|
||||
self.main_type, self.sub_type = mimetype.split('/')
|
||||
except:
|
||||
# FIXME/TEMP: checking what happen, probably bad.
|
||||
print(src_path, mimetype)
|
||||
self.log_details.update({'broken_mime': self.extension})
|
||||
self.make_dangerous()
|
||||
return
|
||||
|
||||
a, self.extension = os.path.splitext(src_path)
|
||||
if self.extension in mal_ext:
|
||||
self.log_details.update({'malicious_extension': self.extension})
|
||||
self.make_dangerous()
|
||||
return
|
||||
elif self.extension == '':
|
||||
self.log_details.update({'no_extension': self.extension})
|
||||
self.make_dangerous()
|
||||
return
|
||||
|
||||
self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension})
|
||||
|
||||
|
@ -91,7 +120,7 @@ class File(FileBase):
|
|||
|
||||
class KittenGroomerFileCheck(KittenGroomerBase):
|
||||
|
||||
def __init__(self, root_src=None, root_dst=None, max_recursive=5):
|
||||
def __init__(self, root_src=None, root_dst=None, max_recursive=5, debug=False):
|
||||
'''
|
||||
Initialize the basics of the conversion process
|
||||
'''
|
||||
|
@ -99,7 +128,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
root_src = os.path.join(os.sep, 'media', 'src')
|
||||
if root_dst is None:
|
||||
root_dst = os.path.join(os.sep, 'media', 'dst')
|
||||
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst)
|
||||
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
|
||||
|
||||
self.recursive = 0
|
||||
self.max_recursive = max_recursive
|
||||
|
@ -160,7 +189,8 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
|||
else:
|
||||
deadline = None
|
||||
args = shlex.split(command_line)
|
||||
p = subprocess.Popen(args)
|
||||
with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout:
|
||||
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
|
||||
if background:
|
||||
# This timer is here to make sure the unoconv listener is properly started.
|
||||
time.sleep(10)
|
||||
|
|
|
@ -46,10 +46,19 @@ class File(FileBase):
|
|||
''' Init file object, set the mimetype '''
|
||||
super(File, self).__init__(src_path, dst_path)
|
||||
|
||||
mimetype = magic.from_file(src_path, mime=True).decode("utf-8")
|
||||
self.is_recursive = False
|
||||
self.main_type = ''
|
||||
self.main_type = ''
|
||||
try:
|
||||
mimetype = magic.from_file(src_path, mime=True).decode("utf-8")
|
||||
except Exception as e:
|
||||
print('************************** BROKEN', self.src_path)
|
||||
print('************************** BROKEN', self.src_path, e)
|
||||
self.make_dangerous()
|
||||
return
|
||||
|
||||
self.main_type, self.sub_type = mimetype.split('/')
|
||||
a, self.extension = os.path.splitext(src_path)
|
||||
self.is_recursive = False
|
||||
|
||||
self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension})
|
||||
# If the mimetype matches as text/*, it will be sent to LibreOffice, no need to cross check the mime/ext
|
||||
|
@ -84,7 +93,7 @@ class File(FileBase):
|
|||
|
||||
class KittenGroomer(KittenGroomerBase):
|
||||
|
||||
def __init__(self, root_src=None, root_dst=None, max_recursive=5):
|
||||
def __init__(self, root_src=None, root_dst=None, max_recursive=5, debug=False):
|
||||
'''
|
||||
Initialize the basics of the conversion process
|
||||
'''
|
||||
|
@ -92,7 +101,7 @@ class KittenGroomer(KittenGroomerBase):
|
|||
root_src = os.path.join(os.sep, 'media', 'src')
|
||||
if root_dst is None:
|
||||
root_dst = os.path.join(os.sep, 'media', 'dst')
|
||||
super(KittenGroomer, self).__init__(root_src, root_dst)
|
||||
super(KittenGroomer, self).__init__(root_src, root_dst, debug)
|
||||
|
||||
self.recursive = 0
|
||||
self.max_recursive = max_recursive
|
||||
|
@ -153,7 +162,8 @@ class KittenGroomer(KittenGroomerBase):
|
|||
else:
|
||||
deadline = None
|
||||
args = shlex.split(command_line)
|
||||
p = subprocess.Popen(args)
|
||||
with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout:
|
||||
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
|
||||
if background:
|
||||
# FIXME: This timer is here to make sure the unoconv listener is properly started.
|
||||
time.sleep(10)
|
||||
|
|
|
@ -25,7 +25,7 @@ class FilePier9(FileBase):
|
|||
|
||||
class KittenGroomerPier9(KittenGroomerBase):
|
||||
|
||||
def __init__(self, root_src=None, root_dst=None):
|
||||
def __init__(self, root_src=None, root_dst=None, debug=False):
|
||||
'''
|
||||
Initialize the basics of the copy
|
||||
'''
|
||||
|
@ -33,7 +33,7 @@ class KittenGroomerPier9(KittenGroomerBase):
|
|||
root_src = os.path.join(os.sep, 'media', 'src')
|
||||
if root_dst is None:
|
||||
root_dst = os.path.join(os.sep, 'media', 'dst')
|
||||
super(KittenGroomerPier9, self).__init__(root_src, root_dst)
|
||||
super(KittenGroomerPier9, self).__init__(root_src, root_dst, debug)
|
||||
|
||||
# The initial version will accept all the file extension for all the machines.
|
||||
self.authorized_extensions = printers + cnc + shopbot + omax + epilog_laser + metabeam + up
|
||||
|
|
|
@ -16,12 +16,15 @@ class FileSpec(FileBase):
|
|||
''' Init file object, set the extension '''
|
||||
super(FileSpec, self).__init__(src_path, dst_path)
|
||||
a, self.extension = os.path.splitext(self.src_path)
|
||||
self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8")
|
||||
try:
|
||||
self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8")
|
||||
except:
|
||||
print('************************** BROKEN', self.src_path)
|
||||
|
||||
|
||||
class KittenGroomerSpec(KittenGroomerBase):
|
||||
|
||||
def __init__(self, root_src=None, root_dst=None):
|
||||
def __init__(self, root_src=None, root_dst=None, debug=False):
|
||||
'''
|
||||
Initialize the basics of the copy
|
||||
'''
|
||||
|
@ -29,7 +32,7 @@ class KittenGroomerSpec(KittenGroomerBase):
|
|||
root_src = os.path.join(os.sep, 'media', 'src')
|
||||
if root_dst is None:
|
||||
root_dst = os.path.join(os.sep, 'media', 'dst')
|
||||
super(KittenGroomerSpec, self).__init__(root_src, root_dst)
|
||||
super(KittenGroomerSpec, self).__init__(root_src, root_dst, debug)
|
||||
self.valid_files = {}
|
||||
|
||||
# The initial version will only accept the file extensions/mimetypes listed here.
|
||||
|
|
|
@ -87,7 +87,7 @@ class FileBase(object):
|
|||
|
||||
class KittenGroomerBase(object):
|
||||
|
||||
def __init__(self, root_src, root_dst):
|
||||
def __init__(self, root_src, root_dst, debug=False):
|
||||
'''
|
||||
Setup the base options of the copy/convert setup
|
||||
'''
|
||||
|
@ -105,6 +105,14 @@ class KittenGroomerBase(object):
|
|||
|
||||
self.cur_file = None
|
||||
|
||||
self.debug = debug
|
||||
if self.debug:
|
||||
self.log_debug_err = os.path.join(self.log_root_dir, 'debug_stderr.log')
|
||||
self.log_debug_out = os.path.join(self.log_root_dir, 'debug_stdout.log')
|
||||
else:
|
||||
self.log_debug_err = os.devnull
|
||||
self.log_debug_out = os.devnull
|
||||
|
||||
# ##### Helpers #####
|
||||
def _safe_rmtree(self, directory):
|
||||
'''Remove a directory tree if it exists'''
|
||||
|
|
|
@ -21,46 +21,50 @@ class TestBasic(unittest.TestCase):
|
|||
self.maxDiff = None
|
||||
self.curpath = os.getcwd()
|
||||
|
||||
def dump_logs(self):
|
||||
logfile = os.path.join(self.curpath, 'tests/dst/logs/processing.log')
|
||||
print(open(logfile, 'rb').read())
|
||||
def dump_logs(self, kg):
|
||||
print(open(kg.log_processing, 'rb').read().decode("utf-8"))
|
||||
if kg.debug:
|
||||
if os.path.exists(kg.log_debug_err):
|
||||
print(open(kg.log_debug_err, 'rb').read().decode("utf-8"))
|
||||
if os.path.exists(kg.log_debug_out):
|
||||
print(open(kg.log_debug_out, 'rb').read().decode("utf-8"))
|
||||
|
||||
def test_specific_valid(self):
|
||||
src = os.path.join(self.curpath, 'tests/src2')
|
||||
dst = os.path.join(self.curpath, 'tests/dst')
|
||||
spec = KittenGroomerSpec(src, dst)
|
||||
spec = KittenGroomerSpec(src, dst, debug=True)
|
||||
spec.processdir()
|
||||
self.dump_logs()
|
||||
self.dump_logs(spec)
|
||||
|
||||
def test_specific_invalid(self):
|
||||
src = os.path.join(self.curpath, 'tests/src')
|
||||
dst = os.path.join(self.curpath, 'tests/dst')
|
||||
spec = KittenGroomerSpec(src, dst)
|
||||
spec = KittenGroomerSpec(src, dst, debug=True)
|
||||
spec.processdir()
|
||||
self.dump_logs()
|
||||
self.dump_logs(spec)
|
||||
|
||||
def test_pier9(self):
|
||||
src = os.path.join(self.curpath, 'tests/src')
|
||||
dst = os.path.join(self.curpath, 'tests/dst')
|
||||
spec = KittenGroomerPier9(src, dst)
|
||||
spec = KittenGroomerPier9(src, dst, debug=True)
|
||||
spec.processdir()
|
||||
self.dump_logs()
|
||||
self.dump_logs(spec)
|
||||
|
||||
def test_generic(self):
|
||||
src = os.path.join(self.curpath, 'tests/src')
|
||||
dst = os.path.join(self.curpath, 'tests/dst')
|
||||
spec = KittenGroomer(src, dst)
|
||||
spec = KittenGroomer(src, dst, debug=True)
|
||||
spec.processdir()
|
||||
self.dump_logs()
|
||||
self.dump_logs(spec)
|
||||
|
||||
def test_filecheck(self):
|
||||
if sys.version_info.major >= 3:
|
||||
return
|
||||
src = os.path.join(self.curpath, 'tests/src')
|
||||
dst = os.path.join(self.curpath, 'tests/dst')
|
||||
spec = KittenGroomerFileCheck(src, dst)
|
||||
spec = KittenGroomerFileCheck(src, dst, debug=True)
|
||||
spec.processdir()
|
||||
self.dump_logs()
|
||||
self.dump_logs(spec)
|
||||
|
||||
def test_help_file(self):
|
||||
f = FileBase('tests/src/blah.conf', 'tests/dst/blah.conf')
|
||||
|
|
Loading…
Reference in New Issue