Add script for specific purposes, add testcase

pull/2/head
Raphaël Vinot 2015-10-26 17:11:36 +01:00
parent dc098dd9a8
commit 5d848f4787
8 changed files with 137 additions and 1 deletions

21
.travis.yml Normal file
View File

@ -0,0 +1,21 @@
language: python
python:
- "2.7"
- "3.3"
- "3.4"
- "3.5"
- "nightly"
install:
- pip install --user python-magic
- pip install --user coveralls
- pip install --user codecov
- python setup.py -q install
script:
- coverage run setup.py test
after_success:
- coveralls
- codecov

90
bin/specific.py Normal file
View File

@ -0,0 +1,90 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import magic
from kittengroomer import FileBase, KittenGroomerBase, main
# Extension
configfiles = {'.conf': 'text/plain'}
class FileSpec(FileBase):
def __init__(self, src_path, dst_path):
''' Init file object, set the extension '''
super(FileSpec, self).__init__(src_path, dst_path)
a, self.extension = os.path.splitext(self.src_path)
self.mimetype = magic.from_file(self.src_path, mime=True).decode("utf-8")
class KittenGroomerSpec(KittenGroomerBase):
def __init__(self, root_src=None, root_dst=None):
'''
Initialize the basics of the copy
'''
if root_src is None:
root_src = os.path.join(os.sep, 'media', 'src')
if root_dst is None:
root_dst = os.path.join(os.sep, 'media', 'dst')
super(KittenGroomerSpec, self).__init__(root_src, root_dst)
self.valid_files = {}
# The initial version will only accept the file extensions/mimetypes listed here.
self.valid_files.update(configfiles)
def _print_log(self):
'''
Print the logs related to the current file being processed
'''
tmp_log = self.log_name.fields(**self.cur_file.log_details)
if not self.cur_file.log_details.get('valid'):
tmp_log.warning(self.cur_file.log_string)
else:
tmp_log.debug(self.cur_file.log_string)
def processdir(self):
'''
Main function doing the processing
'''
to_copy = []
error = []
for srcpath in self._list_all_files(self.src_root_dir):
valid = True
self.log_name.info('Processing {}', srcpath.replace(self.src_root_dir + '/', ''))
self.cur_file = FileSpec(srcpath, srcpath.replace(self.src_root_dir, self.dst_root_dir))
expected_mime = self.valid_files.get(self.cur_file.extension)
if expected_mime is None:
# Unexpected extension => disallowed
valid = False
compare_ext = 'Extension: {} - Expected: {}'.format(self.cur_file.extension, ', '.join(self.valid_files.keys()))
elif self.cur_file.mimetype != expected_mime:
# Unexpected mimetype => dissalowed
valid = False
compare_mime = 'Mime: {} - Expected: {}'.format(self.cur_file.mimetype, expected_mime)
self.cur_file.add_log_details('valid', valid)
if valid:
to_copy.append(self.cur_file)
self.cur_file.log_string = 'Extension: {} - MimeType: {}'.format(self.cur_file.extension, self.cur_file.mimetype)
else:
error.append(self.cur_file)
if compare_ext is not None:
self.cur_file.log_string = compare_ext
else:
self.cur_file.log_string = compare_mime
if len(error) > 0:
for f in error + to_copy:
self.cur_file = f
self._print_log()
else:
for f in to_copy:
self.cur_file = f
self._safe_copy()
self._print_log()
if __name__ == '__main__':
main(KittenGroomerSpec, ' Only copy some files, returns an error is anything else is found')
exit(0)

View File

@ -89,6 +89,7 @@ class KittenGroomerBase(object):
self.src_root_dir = root_src self.src_root_dir = root_src
self.dst_root_dir = root_dst self.dst_root_dir = root_dst
self.log_root_dir = os.path.join(self.dst_root_dir, 'logs') self.log_root_dir = os.path.join(self.dst_root_dir, 'logs')
self._safe_rmtree(self.log_root_dir)
self._safe_mkdir(self.log_root_dir) self._safe_mkdir(self.log_root_dir)
self.log_processing = os.path.join(self.log_root_dir, 'processing.log') self.log_processing = os.path.join(self.log_root_dir, 'processing.log')

View File

@ -11,9 +11,10 @@ setup(
url='https://github.com/CIRCL/CIRCLean', url='https://github.com/CIRCL/CIRCLean',
description='Standalone CIRCLean/KittenGroomer code.', description='Standalone CIRCLean/KittenGroomer code.',
packages=['kittengroomer'], packages=['kittengroomer'],
scripts=['bin/generic.py', 'bin/pier9.py'], scripts=['bin/generic.py', 'bin/pier9.py', 'bin/specific.py'],
include_package_data = True, include_package_data = True,
package_data = {'data': ['PDFA_def.ps','srgb.icc']}, package_data = {'data': ['PDFA_def.ps','srgb.icc']},
test_suite="tests",
classifiers=[ classifiers=[
'License :: OSI Approved :: BSD License', 'License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable', 'Development Status :: 5 - Production/Stable',

0
tests/__init__.py Normal file
View File

0
tests/dst/.keepdir Normal file
View File

1
tests/src/blah.conf Normal file
View File

@ -0,0 +1 @@
This is a test.

22
tests/test.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import unittest
import subprocess
import time
class TestBasic(unittest.TestCase):
def setUp(self):
self.maxDiff = None
def test_specific(self):
p = subprocess.Popen(['specific.py', '-s', 'tests/src', '-d', 'tests/dst'])
while True:
p.poll()
print(p.returncode)
if p.returncode is None:
time.sleep(1)
else:
return