Code de-dupication

pull/2/head
Raphaël Vinot 2015-11-05 14:43:54 +01:00
parent b0d0912ff9
commit 03f1d90f33
5 changed files with 109 additions and 77 deletions

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import magic
import os
import mimetypes
import shlex
@ -70,27 +69,23 @@ class File(FileBase):
super(File, self).__init__(src_path, dst_path)
self.is_recursive = False
try:
mimetype = magic.from_file(src_path, mime=True).decode("utf-8")
self.main_type, self.sub_type = mimetype.split('/')
except:
# FIXME/TEMP: checking what happen, probably bad.
print(src_path, mimetype)
self.log_details.update({'broken_mime': self.extension})
if not self.has_mimetype():
# No mimetype, should not happen.
self.make_dangerous()
if not self.has_extension():
self.make_dangerous()
return
a, self.extension = os.path.splitext(src_path)
if self.extension in mal_ext:
self.log_details.update({'malicious_extension': self.extension})
self.make_dangerous()
return
elif self.extension == '':
self.log_details.update({'no_extension': self.extension})
self.make_dangerous()
if self.is_dangerous():
return
self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension})
self.log_details.update({'maintype': self.main_type,
'subtype': self.sub_type,
'extension': self.extension})
# Check correlation known extension => actual mime type
if propertype.get(self.extension) is not None:
@ -101,13 +96,16 @@ class File(FileBase):
expected_mimetype = aliases.get(expected_mimetype)
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and expected_mimetype != mimetype:
if is_known_extension and expected_mimetype != self.mimetype:
self.log_details.update({'expected_mimetype': expected_mimetype})
self.make_dangerous()
# check correlation actual mime type => known extensions
if aliases.get(mimetype) is not None:
mimetype = aliases.get(mimetype)
if aliases.get(self.mimetype) is not None:
mimetype = aliases.get(self.mimetype)
else:
mimetype = self.mimetype
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
if expected_extensions:
if len(self.extension) > 0 and self.extension not in expected_extensions:
@ -175,7 +173,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
Print the logs related to the current file being processed
'''
tmp_log = self.log_name.fields(**self.cur_file.log_details)
if self.cur_file.log_details.get('dangerous'):
if self.cur_file.is_dangerous():
tmp_log.warning(self.cur_file.log_string)
elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'):
tmp_log.info(self.cur_file.log_string)
@ -189,7 +187,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
else:
deadline = None
args = shlex.split(command_line)
with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout:
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
if background:
# This timer is here to make sure the unoconv listener is properly started.
@ -444,7 +442,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''),
self.cur_file.main_type, self.cur_file.sub_type)
if self.cur_file.log_details.get('dangerous') is None:
if not self.cur_file.is_dangerous():
self.mime_processing_options.get(self.cur_file.main_type, self.unknown)()
else:
self._safe_copy()

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import magic
import os
import mimetypes
import shlex
@ -47,23 +46,17 @@ class File(FileBase):
super(File, self).__init__(src_path, dst_path)
self.is_recursive = False
self.main_type = ''
self.main_type = ''
try:
mimetype = magic.from_file(src_path, mime=True)
try:
mimetype = mimetype.decode("utf-8")
except:
pass
except Exception as e:
print('************************** BROKEN', self.src_path, e)
if not self.has_mimetype():
# No mimetype, should not happen.
self.make_dangerous()
if self.is_dangerous():
return
self.main_type, self.sub_type = mimetype.split('/')
a, self.extension = os.path.splitext(src_path)
self.log_details.update({'maintype': self.main_type,
'subtype': self.sub_type,
'extension': self.extension})
self.log_details.update({'maintype': self.main_type, 'subtype': self.sub_type, 'extension': self.extension})
# If the mimetype matches as text/*, it will be sent to LibreOffice, no need to cross check the mime/ext
if self.main_type == 'text':
return
@ -77,13 +70,15 @@ class File(FileBase):
expected_mimetype = aliases.get(expected_mimetype)
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and expected_mimetype != mimetype:
if is_known_extension and expected_mimetype != self.mimetype:
self.log_details.update({'expected_mimetype': expected_mimetype})
self.make_dangerous()
# check correlation actual mime type => known extensions
if aliases.get(mimetype) is not None:
mimetype = aliases.get(mimetype)
if aliases.get(self.mimetype) is not None:
mimetype = aliases.get(self.mimetype)
else:
mimetype = self.mimetype
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
if expected_extensions:
if len(self.extension) > 0 and self.extension not in expected_extensions:
@ -151,7 +146,7 @@ class KittenGroomer(KittenGroomerBase):
Print the logs related to the current file being processed
'''
tmp_log = self.log_name.fields(**self.cur_file.log_details)
if self.cur_file.log_details.get('dangerous'):
if self.cur_file.is_dangerous():
tmp_log.warning(self.cur_file.log_string)
elif self.cur_file.log_details.get('unknown') or self.cur_file.log_details.get('binary'):
tmp_log.info(self.cur_file.log_string)
@ -165,7 +160,7 @@ class KittenGroomer(KittenGroomerBase):
else:
deadline = None
args = shlex.split(command_line)
with open(self.log_debug_err, 'wb') as stderr, open(self.log_debug_out, 'wb') as stdout:
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
if background:
# FIXME: This timer is here to make sure the unoconv listener is properly started.
@ -353,7 +348,7 @@ class KittenGroomer(KittenGroomerBase):
self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''),
self.cur_file.main_type, self.cur_file.sub_type)
if self.cur_file.log_details.get('dangerous') is None:
if not self.cur_file.is_dangerous():
self.mime_processing_options.get(self.cur_file.main_type, self.unknown)()
else:
self._safe_copy()

View File

@ -20,7 +20,9 @@ class FilePier9(FileBase):
def __init__(self, src_path, dst_path):
''' Init file object, set the extension '''
super(FilePier9, self).__init__(src_path, dst_path)
a, self.extension = os.path.splitext(self.src_path)
if not self.has_extension():
self.make_dangerous()
class KittenGroomerPier9(KittenGroomerBase):
@ -55,12 +57,16 @@ class KittenGroomerPier9(KittenGroomerBase):
for srcpath in self._list_all_files(self.src_root_dir):
self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
self.cur_file = FilePier9(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))
if self.cur_file.extension in self.authorized_extensions:
if not self.cur_file.is_dangerous() and self.cur_file.extension in self.authorized_extensions:
self.cur_file.add_log_details('valid', True)
self.cur_file.log_string = 'Expected extension: ' + self.cur_file.extension
self._safe_copy()
else:
self.cur_file.log_string = 'Bad extension: ' + self.cur_file.extension
self.cur_file.make_dangerous()
if self.cur_file.extension:
self.cur_file.log_string = 'Bad extension: ' + self.cur_file.extension
else:
self.cur_file.log_string = 'No Extension.'
self._print_log()

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import magic
from kittengroomer import FileBase, KittenGroomerBase, main
@ -15,15 +14,12 @@ class FileSpec(FileBase):
def __init__(self, src_path, dst_path):
''' Init file object, set the extension '''
super(FileSpec, self).__init__(src_path, dst_path)
a, self.extension = os.path.splitext(self.src_path)
try:
self.mimetype = magic.from_file(self.src_path, mime=True)
try:
self.imetype = self.mimetype.decode("utf-8")
except:
pass
except Exception as e:
print('************************** BROKEN', self.src_path, e)
if not self.has_mimetype():
self.make_dangerous()
if not self.has_extension():
self.make_dangerous()
class KittenGroomerSpec(KittenGroomerBase):
@ -62,27 +58,33 @@ class KittenGroomerSpec(KittenGroomerBase):
valid = True
self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))
expected_mime = self.valid_files.get(self.cur_file.extension)
compare_ext = None
compare_mime = None
if expected_mime is None:
# Unexpected extension => disallowed
if self.cur_file.is_dangerous():
valid = False
compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys()))
elif self.cur_file.mimetype != expected_mime:
# Unexpected mimetype => dissalowed
valid = False
compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime)
self.cur_file.add_log_details('valid', valid)
if valid:
to_copy.append(self.cur_file)
self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype)
else:
error.append(self.cur_file)
if compare_ext is not None:
self.cur_file.log_string = compare_ext
else:
expected_mime = self.valid_files.get(self.cur_file.extension)
compare_ext = ''
compare_mime = ''
if expected_mime is None:
# Unexpected extension => disallowed
valid = False
compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys()))
elif self.cur_file.mimetype != expected_mime:
# Unexpected mimetype => dissalowed
valid = False
compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime)
if valid:
to_copy.append(self.cur_file)
self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype)
else:
self.cur_file.log_string = compare_mime
error.append(self.cur_file)
if compare_ext:
self.cur_file.log_string = compare_ext
else:
self.cur_file.log_string = compare_mime
self.cur_file.add_log_details('valid', valid)
if len(error) > 0:
for f in error + to_copy:
self.cur_file = f

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import magic
import shutil
from twiggy import quickSetup, log
from twiggy import quick_setup, log
import argparse
@ -33,6 +34,36 @@ class FileBase(object):
self.dst_path = dst_path
self.log_details = {'filepath': self.src_path}
self.log_string = ''
a, self.extension = os.path.splitext(self.src_path)
mt = magic.from_file(self.src_path, mime=True)
try:
self.mimetype = mt.decode("utf-8")
except:
self.mimetype = mt
if self.mimetype and '/' in self.mimetype:
self.main_type, self.sub_type = self.mimetype.split('/')
else:
self.main_type = ''
self.sub_type = ''
def has_mimetype(self):
if not self.main_type or not self.sub_type:
self.log_details.update({'broken_mime': self.extension})
return False
return True
def has_extension(self):
if not self.extension:
self.log_details.update({'no_extension': self.extension})
return False
return True
def is_dangerous(self):
if self.log_details.get('dangerous'):
return True
return False
def add_log_details(self, key, value):
'''
@ -46,7 +77,7 @@ class FileBase(object):
Prepending and appending DANGEROUS to the destination
file name avoid double-click of death
'''
if self.log_details.get('dangerous'):
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
self.log_details['dangerous'] = True
@ -59,7 +90,7 @@ class FileBase(object):
a decision. Theuser will have to decide what to do.
Prepending UNKNOWN
'''
if self.log_details.get('dangerous') or self.log_details.get('binary'):
if self.is_dangerous() or self.log_details.get('binary'):
# Already marked as dangerous or binary, do nothing
return
self.log_details['unknown'] = True
@ -72,7 +103,7 @@ class FileBase(object):
Appending .bin avoir double click of death but the user
will have to decide by itself.
'''
if self.log_details.get('dangerous'):
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
self.log_details['binary'] = True
@ -98,7 +129,7 @@ class KittenGroomerBase(object):
self._safe_mkdir(self.log_root_dir)
self.log_processing = os.path.join(self.log_root_dir, 'processing.log')
quickSetup(file=self.log_processing)
quick_setup(file=self.log_processing)
self.log_name = log.name('files')
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
os.environ["PATH"] += os.pathsep + self.ressources_path