mirror of https://github.com/CIRCL/PyCIRCLean
				
				
				
			
		
			
				
	
	
		
			102 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
import os
 | 
						|
 | 
						|
from kittengroomer import FileBase, KittenGroomerBase, main
 | 
						|
 | 
						|
 | 
						|
# Extension
 | 
						|
configfiles = {'.conf': 'text/plain'}
 | 
						|
 | 
						|
 | 
						|
class FileSpec(FileBase):
 | 
						|
 | 
						|
    def __init__(self, src_path, dst_path):
 | 
						|
        ''' Init file object, set the extension '''
 | 
						|
        super(FileSpec, self).__init__(src_path, dst_path)
 | 
						|
 | 
						|
        if not self.has_mimetype():
 | 
						|
            self.make_dangerous()
 | 
						|
 | 
						|
        if not self.has_extension():
 | 
						|
            self.make_dangerous()
 | 
						|
 | 
						|
 | 
						|
class KittenGroomerSpec(KittenGroomerBase):
 | 
						|
 | 
						|
    def __init__(self, root_src=None, root_dst=None, debug=False):
 | 
						|
        '''
 | 
						|
            Initialize the basics of the copy
 | 
						|
        '''
 | 
						|
        if root_src is None:
 | 
						|
            root_src = os.path.join(os.sep, 'media', 'src')
 | 
						|
        if root_dst is None:
 | 
						|
            root_dst = os.path.join(os.sep, 'media', 'dst')
 | 
						|
        super(KittenGroomerSpec, self).__init__(root_src, root_dst, debug)
 | 
						|
        self.valid_files = {}
 | 
						|
 | 
						|
        # The initial version will only accept the file extensions/mimetypes listed here.
 | 
						|
        self.valid_files.update(configfiles)
 | 
						|
 | 
						|
    def _print_log(self):
 | 
						|
        '''
 | 
						|
            Print the logs related to the current file being processed
 | 
						|
        '''
 | 
						|
        tmp_log = self.log_name.fields(**self.cur_file.log_details)
 | 
						|
        if not self.cur_file.log_details.get('valid'):
 | 
						|
            tmp_log.warning(self.cur_file.log_string)
 | 
						|
        else:
 | 
						|
            tmp_log.debug(self.cur_file.log_string)
 | 
						|
 | 
						|
    def processdir(self):
 | 
						|
        '''
 | 
						|
            Main function doing the processing
 | 
						|
        '''
 | 
						|
        to_copy = []
 | 
						|
        error = []
 | 
						|
        for srcpath in self.list_all_files(self.src_root_dir):
 | 
						|
            valid = True
 | 
						|
            self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
 | 
						|
            self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))
 | 
						|
            if self.cur_file.is_dangerous():
 | 
						|
                valid = False
 | 
						|
                error.append(self.cur_file)
 | 
						|
            else:
 | 
						|
                expected_mime = self.valid_files.get(self.cur_file.extension)
 | 
						|
                compare_ext = ''
 | 
						|
                compare_mime = ''
 | 
						|
                if expected_mime is None:
 | 
						|
                    # Unexpected extension => disallowed
 | 
						|
                    valid = False
 | 
						|
                    compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys()))
 | 
						|
                elif self.cur_file.mimetype != expected_mime:
 | 
						|
                    # Unexpected mimetype => dissalowed
 | 
						|
                    valid = False
 | 
						|
                    compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime)
 | 
						|
 | 
						|
                if valid:
 | 
						|
                    to_copy.append(self.cur_file)
 | 
						|
                    self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype)
 | 
						|
                else:
 | 
						|
                    error.append(self.cur_file)
 | 
						|
                    if compare_ext:
 | 
						|
                        self.cur_file.log_string = compare_ext
 | 
						|
                    else:
 | 
						|
                        self.cur_file.log_string = compare_mime
 | 
						|
            self.cur_file.add_log_details('valid', valid)
 | 
						|
 | 
						|
        if len(error) > 0:
 | 
						|
            for f in error + to_copy:
 | 
						|
                self.cur_file = f
 | 
						|
                self._print_log()
 | 
						|
        else:
 | 
						|
            for f in to_copy:
 | 
						|
                self.cur_file = f
 | 
						|
                self._safe_copy()
 | 
						|
                self._print_log()
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    main(KittenGroomerSpec, ' Only copy some files, returns an error is anything else is found')
 | 
						|
    exit(0)
 |