mirror of https://github.com/CIRCL/PyCIRCLean
Fix completely buggy mimetype/extension xcheck
parent
e9d76adb42
commit
ac372dc59d
|
@ -26,48 +26,58 @@ mimes_compressed = ['zip', 'x-rar', 'x-bzip2', 'x-lzip', 'x-lzma', 'x-lzop',
|
||||||
'x-xz', 'x-compress', 'x-gzip', 'x-tar', 'compressed']
|
'x-xz', 'x-compress', 'x-gzip', 'x-tar', 'compressed']
|
||||||
mimes_data = ['octet-stream']
|
mimes_data = ['octet-stream']
|
||||||
|
|
||||||
|
# Aliases
|
||||||
|
aliases = {
|
||||||
|
# Win executables
|
||||||
|
'application/x-msdos-program': 'application/x-dosexec',
|
||||||
|
'application/x-dosexec': 'application/x-msdos-program'
|
||||||
|
}
|
||||||
|
|
||||||
|
# Sometimes, mimetypes.guess_type is giving unexpected results, such as for the .tar.gz files:
|
||||||
|
# In [12]: mimetypes.guess_type('toot.tar.gz', strict=False)
|
||||||
|
# Out[12]: ('application/x-tar', 'gzip')
|
||||||
|
# It works as expected if you do mimetypes.guess_type('application/gzip', strict=False)
|
||||||
|
propertype = {'.gz': 'application/gzip'}
|
||||||
|
|
||||||
|
|
||||||
class File(FileBase):
|
class File(FileBase):
|
||||||
|
|
||||||
def __init__(self, src_path, dst_path):
|
def __init__(self, src_path, dst_path):
|
||||||
''' Init file object, set the mimetype '''
|
''' Init file object, set the mimetype '''
|
||||||
super(File, self).__init__(src_path, dst_path)
|
super(File, self).__init__(src_path, dst_path)
|
||||||
|
|
||||||
mimetype = magic.from_file(src_path, mime=True)
|
mimetype = magic.from_file(src_path, mime=True)
|
||||||
self.main_type, self.sub_type = mimetype.split('/')
|
self.main_type, self.sub_type = mimetype.split('/')
|
||||||
self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type})
|
a, self.extension = os.path.splitext(src_path)
|
||||||
self.expected_mimetype, self.expected_extensions = self.crosscheck_mime()
|
|
||||||
self.is_recursive = False
|
|
||||||
|
|
||||||
def crosscheck_mime(self):
|
self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension})
|
||||||
'''
|
|
||||||
Set the expected mime and extension variables based on mime type.
|
|
||||||
'''
|
|
||||||
# /usr/share/mime has interesting stuff
|
|
||||||
|
|
||||||
# guess_type uses the extension to get a mime type
|
# Check correlation known extension => actual mime type
|
||||||
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
|
if propertype.get(self.extension) is not None:
|
||||||
if expected_mimetype is not None:
|
expected_mimetype = propertype.get(self.extension)
|
||||||
expected_extensions = mimetypes.guess_all_extensions(expected_mimetype,
|
|
||||||
strict=False)
|
|
||||||
else:
|
else:
|
||||||
# the extension is unknown...
|
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
|
||||||
expected_extensions = None
|
if aliases.get(expected_mimetype) is not None:
|
||||||
|
expected_mimetype = aliases.get(expected_mimetype)
|
||||||
|
|
||||||
return expected_mimetype, expected_extensions
|
is_known_extension = self.extension in mimetypes.types_map.keys()
|
||||||
|
if is_known_extension and expected_mimetype != mimetype:
|
||||||
|
self.log_details.update({'expected_mimetype': expected_mimetype})
|
||||||
|
self.make_dangerous()
|
||||||
|
|
||||||
def verify_extension(self):
|
# check correlation actual mime type => known extensions
|
||||||
'''Check if the extension is the one we expect'''
|
if aliases.get(mimetype) is not None:
|
||||||
if self.expected_extensions is None:
|
mimetype = aliases.get(mimetype)
|
||||||
return None
|
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
|
||||||
path, actual_extension = os.path.splitext(self.src_path)
|
if expected_extensions is not None:
|
||||||
return actual_extension in self.expected_extensions
|
if len(self.extension) > 0 and self.extension not in expected_extensions:
|
||||||
|
self.log_details.update({'expected_mimetype': expected_extensions})
|
||||||
|
self.make_dangerous()
|
||||||
|
else:
|
||||||
|
# there are no known extensions associated to this mimetype.
|
||||||
|
pass
|
||||||
|
|
||||||
def verify_mime(self):
|
self.is_recursive = False
|
||||||
'''Check if the mime is the one we expect'''
|
|
||||||
if self.expected_mimetype is None:
|
|
||||||
return None
|
|
||||||
actual_mimetype = '{}/{}'.format(self.main_type, self.sub_type)
|
|
||||||
return actual_mimetype == self.expected_mimetype
|
|
||||||
|
|
||||||
|
|
||||||
class KittenGroomer(KittenGroomerBase):
|
class KittenGroomer(KittenGroomerBase):
|
||||||
|
@ -290,10 +300,6 @@ class KittenGroomer(KittenGroomerBase):
|
||||||
def _media_processing(self):
|
def _media_processing(self):
|
||||||
'''Generic way to process all the media files'''
|
'''Generic way to process all the media files'''
|
||||||
self.cur_log.fields(processing_type='media')
|
self.cur_log.fields(processing_type='media')
|
||||||
if not self.cur_file.verify_mime() or not self.cur_file.verify_extension():
|
|
||||||
# The extension is unknown or doesn't match the mime type => suspicious
|
|
||||||
# TODO: write details in the logfile
|
|
||||||
self.cur_file.make_dangerous()
|
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
from helpers import FileBase, KittenGroomerBase, main
|
from .helpers import FileBase, KittenGroomerBase, main
|
||||||
|
|
|
@ -46,6 +46,9 @@ class FileBase(object):
|
||||||
Prepending and appending DANGEROUS to the destination
|
Prepending and appending DANGEROUS to the destination
|
||||||
file name avoid double-click of death
|
file name avoid double-click of death
|
||||||
'''
|
'''
|
||||||
|
if self.log_details.get('dangerous'):
|
||||||
|
# Already marked as dangerous, do nothing
|
||||||
|
return
|
||||||
self.log_details['dangerous'] = True
|
self.log_details['dangerous'] = True
|
||||||
path, filename = os.path.split(self.dst_path)
|
path, filename = os.path.split(self.dst_path)
|
||||||
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
|
self.dst_path = os.path.join(path, 'DANGEROUS_{}_DANGEROUS'.format(filename))
|
||||||
|
@ -56,6 +59,9 @@ class FileBase(object):
|
||||||
a decision. Theuser will have to decide what to do.
|
a decision. Theuser will have to decide what to do.
|
||||||
Prepending UNKNOWN
|
Prepending UNKNOWN
|
||||||
'''
|
'''
|
||||||
|
if self.log_details.get('dangerous') or self.log_details.get('binary'):
|
||||||
|
# Already marked as dangerous or binary, do nothing
|
||||||
|
return
|
||||||
self.log_details['unknown'] = True
|
self.log_details['unknown'] = True
|
||||||
path, filename = os.path.split(self.dst_path)
|
path, filename = os.path.split(self.dst_path)
|
||||||
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
|
self.dst_path = os.path.join(path, 'UNKNOWN_{}'.format(filename))
|
||||||
|
@ -66,6 +72,9 @@ class FileBase(object):
|
||||||
Appending .bin avoir double click of death but the user
|
Appending .bin avoir double click of death but the user
|
||||||
will have to decide by itself.
|
will have to decide by itself.
|
||||||
'''
|
'''
|
||||||
|
if self.log_details.get('dangerous'):
|
||||||
|
# Already marked as dangerous, do nothing
|
||||||
|
return
|
||||||
self.log_details['binary'] = True
|
self.log_details['binary'] = True
|
||||||
path, filename = os.path.split(self.dst_path)
|
path, filename = os.path.split(self.dst_path)
|
||||||
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
|
self.dst_path = os.path.join(path, '{}.bin'.format(filename))
|
||||||
|
|
Loading…
Reference in New Issue