mirror of https://github.com/CIRCL/PyCIRCLean
commit
dd35d23162
|
@ -67,8 +67,8 @@ target/
|
||||||
*.vrb
|
*.vrb
|
||||||
|
|
||||||
# Project specific
|
# Project specific
|
||||||
/tests/dst/*
|
tests/dst/*
|
||||||
!/tests/logs/
|
tests/test_logs/*
|
||||||
!/tests/.keepdir
|
!tests/**/.keepdir
|
||||||
|
!tests/src_invalid/*
|
||||||
|
!tests/src_valid/*
|
||||||
|
|
50
.travis.yml
50
.travis.yml
|
@ -1,7 +1,6 @@
|
||||||
language: python
|
language: python
|
||||||
|
|
||||||
python:
|
python:
|
||||||
- 2.7
|
|
||||||
- 3.3
|
- 3.3
|
||||||
- 3.4
|
- 3.4
|
||||||
- 3.5
|
- 3.5
|
||||||
|
@ -17,8 +16,6 @@ addons:
|
||||||
packages:
|
packages:
|
||||||
# General dependencies
|
# General dependencies
|
||||||
- p7zip-full
|
- p7zip-full
|
||||||
# generic.py dependencies
|
|
||||||
- ghostscript
|
|
||||||
# Testing dependencies
|
# Testing dependencies
|
||||||
- mercurial
|
- mercurial
|
||||||
|
|
||||||
|
@ -26,45 +23,28 @@ install:
|
||||||
# General dependencies
|
# General dependencies
|
||||||
- sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ trusty multiverse" && sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ trusty-updates multiverse"
|
- sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ trusty multiverse" && sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ trusty-updates multiverse"
|
||||||
- sudo apt-get update -qq
|
- sudo apt-get update -qq
|
||||||
- sudo apt-get install -y p7zip-rar
|
- sudo apt-get install -y p7zip-rar python-pip
|
||||||
# generic.py: pdf2htmlEX + dependencies
|
|
||||||
- sudo add-apt-repository ppa:fontforge/fontforge --yes
|
|
||||||
# to get a working 0.26 poppler
|
|
||||||
- sudo add-apt-repository ppa:delayargentina/delayx --yes
|
|
||||||
- sudo apt-get update -qq
|
|
||||||
- sudo apt-get install -y libpoppler-dev libpoppler-private-dev libspiro-dev libcairo-dev libpango1.0-dev libfreetype6-dev libltdl-dev libfontforge-dev python-imaging python-pip firefox xvfb
|
|
||||||
- git clone https://github.com/coolwanglu/pdf2htmlEX.git
|
|
||||||
- pushd pdf2htmlEX
|
|
||||||
- cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_SVG=ON .
|
|
||||||
- make
|
|
||||||
- sudo make install
|
|
||||||
- popd
|
|
||||||
# generic.py: Other dependencies
|
|
||||||
- sudo apt-get install -y libreoffice libreoffice-script-provider-python unoconv
|
|
||||||
# filecheck.py dependencies
|
# filecheck.py dependencies
|
||||||
- sudo apt-get install libxml2-dev libxslt1-dev
|
- sudo apt-get install libxml2-dev libxslt1-dev
|
||||||
- wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
|
- wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
|
||||||
- unzip pdfid_v0_2_1.zip
|
- unzip pdfid_v0_2_1.zip
|
||||||
- pip install -U pip
|
- pip install -U pip
|
||||||
- pip install lxml exifread pillow
|
- pip install lxml exifread pillow olefile
|
||||||
- pip install git+https://github.com/Rafiot/officedissector.git
|
- pip install git+https://github.com/decalage2/oletools.git
|
||||||
- |
|
- pip install git+https://github.com/grierforensics/officedissector.git
|
||||||
if [[ "$TRAVIS_PYTHON_VERSION" == 2* ]]; then
|
# PyCIRCLean dependencies
|
||||||
pip install -U oletools olefile
|
|
||||||
fi
|
|
||||||
# Module dependencies
|
|
||||||
- pip install -r dev-requirements.txt
|
- pip install -r dev-requirements.txt
|
||||||
- pip install coveralls codecov
|
- pip install coveralls codecov
|
||||||
# Testing dependencies
|
# Testing dependencies
|
||||||
- sudo apt-get install rar
|
- sudo apt-get install rar
|
||||||
# Prepare tests
|
# Prepare tests
|
||||||
# Zoo
|
# Malware from theZoo
|
||||||
- git clone https://github.com/Rafiot/theZoo.git
|
- git clone https://github.com/Rafiot/theZoo.git
|
||||||
- pushd theZoo/malwares/Binaries
|
- pushd theZoo/malwares/Binaries
|
||||||
- python unpackall.py
|
- python unpackall.py
|
||||||
- popd
|
- popd
|
||||||
- mv theZoo/malwares/Binaries/out tests/src_complex/
|
- mv theZoo/malwares/Binaries/out tests/src_invalid/
|
||||||
# Path traversal
|
# Path traversal attacks
|
||||||
- git clone https://github.com/jwilk/path-traversal-samples
|
- git clone https://github.com/jwilk/path-traversal-samples
|
||||||
- pushd path-traversal-samples
|
- pushd path-traversal-samples
|
||||||
- pushd zip
|
- pushd zip
|
||||||
|
@ -74,25 +54,25 @@ install:
|
||||||
- make
|
- make
|
||||||
- popd
|
- popd
|
||||||
- popd
|
- popd
|
||||||
- mv path-traversal-samples/zip/*.zip tests/src_complex/
|
- mv path-traversal-samples/zip/*.zip tests/src_invalid/
|
||||||
- mv path-traversal-samples/rar/*.rar tests/src_complex/
|
- mv path-traversal-samples/rar/*.rar tests/src_invalid/
|
||||||
# Office docs
|
# Office docs
|
||||||
- git clone https://github.com/eea/odfpy.git
|
- git clone https://github.com/eea/odfpy.git
|
||||||
- mv odfpy/tests/examples/* tests/src_complex/
|
- mv odfpy/tests/examples/* tests/src_invalid/
|
||||||
- pushd tests/src_complex/
|
- pushd tests/src_invalid/
|
||||||
- wget https://bitbucket.org/decalage/olefileio_pl/raw/3073963b640935134ed0da34906fea8e506460be/Tests/images/test-ole-file.doc
|
- wget https://bitbucket.org/decalage/olefileio_pl/raw/3073963b640935134ed0da34906fea8e506460be/Tests/images/test-ole-file.doc
|
||||||
- wget --no-check-certificate https://www.officedissector.com/corpus/fraunhoferlibrary.zip
|
- wget --no-check-certificate https://www.officedissector.com/corpus/fraunhoferlibrary.zip
|
||||||
- unzip -o fraunhoferlibrary.zip
|
- unzip -o fraunhoferlibrary.zip
|
||||||
- rm fraunhoferlibrary.zip
|
- rm fraunhoferlibrary.zip
|
||||||
- 7z x 42.zip -p42
|
- 7z x -p42 42.zip
|
||||||
|
# Some random samples
|
||||||
- wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3
|
- wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3
|
||||||
- wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4
|
- wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4
|
||||||
- wget http://thewalter.net/stef/software/rtfx/sample.rtf
|
- wget http://thewalter.net/stef/software/rtfx/sample.rtf
|
||||||
- echo "blah" > test.obj
|
|
||||||
- popd
|
- popd
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- travis_wait 60 py.test --cov=kittengroomer --cov=bin tests/
|
- travis_wait 30 py.test --cov=kittengroomer --cov=bin tests/
|
||||||
|
|
||||||
notifications:
|
notifications:
|
||||||
email:
|
email:
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
Changelog
|
||||||
|
=========
|
||||||
|
|
||||||
|
2.1.0
|
||||||
|
---
|
||||||
|
|
||||||
|
New features:
|
||||||
|
- Dropped Python 2.7 support: PyCIRCLean is now Python 3.3+ only
|
||||||
|
- Tests are now easier to write and run: we have support for pytest and tox!
|
||||||
|
- More documentation: both docstrings and more detailed readmes
|
||||||
|
- Added more types of examples for testing
|
||||||
|
- The Travis build now runs in ~10 minutes vs. ~30 minutes before
|
||||||
|
|
||||||
|
|
||||||
|
Fixes:
|
||||||
|
- Extension matching now catches lower/upper case errors
|
||||||
|
- Fixed remaining python 3 issues with filecheck.py
|
||||||
|
- Fixed support for .rtf files
|
||||||
|
- Many other small filetype related fixes
|
|
@ -29,5 +29,13 @@ or if you have an example you'd like to contribute.
|
||||||
Running the tests
|
Running the tests
|
||||||
=================
|
=================
|
||||||
|
|
||||||
* Running the tests is easy. First, make sure you've installed the project and testing dependencies.
|
* Running the tests is fairly straightforward.
|
||||||
Then, run `python -m pytest` or just `pytest` in the top level or /tests directory.
|
* First, make sure you've installed the project and testing dependencies.
|
||||||
|
* Then, run `python -m pytest` or just `pytest` in the top level directory of the module.
|
||||||
|
* Each integration test that runs will generate a timestamped copy of the log for that run
|
||||||
|
in the tests/testlogs directory.
|
||||||
|
* If you'd like to get information about code coverage, run the tests using
|
||||||
|
`pytest --cov=kittengroomer`.
|
||||||
|
* You can test with multiple versions of Python if you have them installed
|
||||||
|
by running `pip install tox` and then `tox`. Make sure you modify "envlist"
|
||||||
|
in tox.ini for the Python versions you plan to use.
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
include kittengroomer/data/* README.md CONTRIBUTING.md CHANGELOG dev-requirements.txt
|
include README.md CONTRIBUTING.md CHANGELOG dev-requirements.txt
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
PyCIRCLean is the core Python code used by [CIRCLean](https://github.com/CIRCL/Circlean/), an open-source
|
PyCIRCLean is the core Python code used by [CIRCLean](https://github.com/CIRCL/Circlean/), an open-source
|
||||||
USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the
|
USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the
|
||||||
device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments
|
device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments
|
||||||
to trusted environments.
|
to trusted environments. PyCIRCLean is currently Python 3.3+ only.
|
||||||
|
|
||||||
# Installation
|
# Installation
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ pip install .
|
||||||
PyCIRCLean is a simple Python library to handle file checking and sanitization. PyCIRCLean is designed as a simple library
|
PyCIRCLean is a simple Python library to handle file checking and sanitization. PyCIRCLean is designed as a simple library
|
||||||
that can be overloaded to cover specific checking and sanitization workflows in different organizations like industrial
|
that can be overloaded to cover specific checking and sanitization workflows in different organizations like industrial
|
||||||
environments or restricted/classified ICT environments. A series of practical examples utilizing PyCIRCLean can be found
|
environments or restricted/classified ICT environments. A series of practical examples utilizing PyCIRCLean can be found
|
||||||
in the [./bin](./bin) directory.
|
in the [./examples](./examples) directory.
|
||||||
|
|
||||||
The following simple example using PyCIRCLean will only copy files with a .conf extension matching the 'text/plain' MIME
|
The following simple example using PyCIRCLean will only copy files with a .conf extension matching the 'text/plain' MIME
|
||||||
type. If any other file is found in the source directory, the files won't be copied to the destination directory.
|
type. If any other file is found in the source directory, the files won't be copied to the destination directory.
|
||||||
|
|
|
@ -1,70 +1,33 @@
|
||||||
Example scripts
|
|
||||||
===============
|
|
||||||
|
|
||||||
These are a series of example scripts designed to demonstrate PyCIRCLean's capabilities. Feel free to
|
|
||||||
adapt or modify any of them to suit your requirements. In order to use any of these scripts, you will need to
|
|
||||||
install the PyCIRCLean dependencies (preferably in a virtualenv):
|
|
||||||
|
|
||||||
```
|
|
||||||
pip install git+https://github.com/ahupp/python-magic.git # we cannot use the PyPi package for now due to a bug
|
|
||||||
python setup.py install # from the root of the repository
|
|
||||||
```
|
|
||||||
|
|
||||||
Requirements per script
|
|
||||||
=======================
|
|
||||||
|
|
||||||
filecheck.py
|
filecheck.py
|
||||||
------------
|
============
|
||||||
|
|
||||||
*WARNING*: Only works with Python 2.7 (oletools and olefile aren't ported to Python3 for now)
|
This is the script used by the [CIRCLean](https://github.com/CIRCL/Circlean)
|
||||||
|
USB key sanitizer. It is designed to handle a range of file types, and will
|
||||||
|
mark them as dangerous if they meet certain criteria.
|
||||||
|
|
||||||
Requirements by type of document:
|
Before installing the filecheck.py depenencies, make sure to install the PyCIRCLean
|
||||||
|
dependencies:
|
||||||
|
|
||||||
|
```
|
||||||
|
pip install .
|
||||||
|
```
|
||||||
|
|
||||||
|
Dependencies by type of document:
|
||||||
* Microsoft office: oletools, olefile
|
* Microsoft office: oletools, olefile
|
||||||
* OOXML: officedissector
|
* OOXML: officedissector
|
||||||
* PDF: pdfid
|
* PDF: pdfid
|
||||||
* Archives: p7zip-full, p7zip-rar
|
* Archives: p7zip-full, p7zip-rar
|
||||||
|
* Metadata: exifread
|
||||||
|
* Images: pillow
|
||||||
|
|
||||||
|
Note: pdfid is a not installable with pip. It must be downloaded and installed
|
||||||
|
manually in the directory where filecheck will be run.
|
||||||
|
|
||||||
```
|
```
|
||||||
sudo apt-get install p7zip-full p7zip-rar libxml2-dev libxslt1-dev
|
sudo apt-get install p7zip-full p7zip-rar libxml2-dev libxslt1-dev
|
||||||
pip install lxml officedissector git+https://github.com/ahupp/python-magic.git oletools olefile
|
pip install lxml oletools olefile pillow exifread
|
||||||
pip install git+https://github.com/Rafiot/officedissector.git
|
pip install git+https://github.com/Rafiot/officedissector.git
|
||||||
# pdfid is not a package, installing manually
|
# installing pdfid manually
|
||||||
wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
|
wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
|
||||||
unzip pdfid_v0_2_1.zip
|
unzip pdfid_v0_2_1.zip
|
||||||
python setup.py -q install
|
|
||||||
```
|
```
|
||||||
|
|
||||||
generic.py
|
|
||||||
----------
|
|
||||||
|
|
||||||
Requirements by type of document:
|
|
||||||
* Office and all text files: unoconv, libreoffice
|
|
||||||
* PDF: ghostscript, pdf2htmlEX
|
|
||||||
|
|
||||||
```
|
|
||||||
# required for pdf2htmlEX
|
|
||||||
sudo add-apt-repository ppa:fontforge/fontforge --yes
|
|
||||||
sudo add-apt-repository ppa:coolwanglu/pdf2htmlex --yes
|
|
||||||
sudo apt-get update -qq
|
|
||||||
sudo apt-get install -qq libpoppler-dev libpoppler-private-dev libspiro-dev libcairo-dev libpango1.0-dev libfreetype6-dev libltdl-dev libfontforge-dev python-imaging python-pip firefox xvfb
|
|
||||||
# install pdf2htmlEX
|
|
||||||
git clone https://github.com/coolwanglu/pdf2htmlEX.git
|
|
||||||
pushd pdf2htmlEX
|
|
||||||
cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_SVG=ON .
|
|
||||||
make
|
|
||||||
sudo make install
|
|
||||||
popd
|
|
||||||
# Installing the rest
|
|
||||||
sudo apt-get install ghostscript p7zip-full p7zip-rar libreoffice unoconv
|
|
||||||
```
|
|
||||||
|
|
||||||
pier9.py
|
|
||||||
--------
|
|
||||||
|
|
||||||
No external dependencies required.
|
|
||||||
|
|
||||||
specific.py
|
|
||||||
-----------
|
|
||||||
|
|
||||||
No external dependencies required.
|
|
||||||
|
|
273
bin/filecheck.py
273
bin/filecheck.py
|
@ -1,11 +1,9 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import os
|
import os
|
||||||
import sys
|
|
||||||
import mimetypes
|
import mimetypes
|
||||||
import shlex
|
import shlex
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
|
||||||
import zipfile
|
import zipfile
|
||||||
|
|
||||||
import oletools.oleid
|
import oletools.oleid
|
||||||
|
@ -21,8 +19,7 @@ from pdfid import PDFiD, cPDFiD
|
||||||
|
|
||||||
from kittengroomer import FileBase, KittenGroomerBase, main
|
from kittengroomer import FileBase, KittenGroomerBase, main
|
||||||
|
|
||||||
SEVENZ = '/usr/bin/7z'
|
SEVENZ_PATH = '/usr/bin/7z'
|
||||||
PY3 = sys.version_info.major == 3
|
|
||||||
|
|
||||||
|
|
||||||
# Prepare application/<subtype>
|
# Prepare application/<subtype>
|
||||||
|
@ -41,7 +38,7 @@ mimes_data = ['octet-stream']
|
||||||
mimes_exif = ['image/jpeg', 'image/tiff']
|
mimes_exif = ['image/jpeg', 'image/tiff']
|
||||||
mimes_png = ['image/png']
|
mimes_png = ['image/png']
|
||||||
|
|
||||||
# Mime types we can pull metadata from
|
# Mimetypes we can pull metadata from
|
||||||
mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png']
|
mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png']
|
||||||
|
|
||||||
# Aliases
|
# Aliases
|
||||||
|
@ -62,7 +59,7 @@ propertype = {'.gz': 'application/gzip'}
|
||||||
# Commonly used malicious extensions
|
# Commonly used malicious extensions
|
||||||
# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/
|
# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/
|
||||||
# https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java
|
# https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java
|
||||||
mal_ext = (
|
MAL_EXTS = (
|
||||||
# Applications
|
# Applications
|
||||||
".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr",
|
".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr",
|
||||||
".hta", ".cpl", ".msc", ".jar",
|
".hta", ".cpl", ".msc", ".jar",
|
||||||
|
@ -86,55 +83,58 @@ mal_ext = (
|
||||||
class File(FileBase):
|
class File(FileBase):
|
||||||
|
|
||||||
def __init__(self, src_path, dst_path):
|
def __init__(self, src_path, dst_path):
|
||||||
''' Init file object, set the mimetype '''
|
|
||||||
super(File, self).__init__(src_path, dst_path)
|
super(File, self).__init__(src_path, dst_path)
|
||||||
|
|
||||||
self.is_recursive = False
|
self.is_recursive = False
|
||||||
if not self.has_mimetype():
|
self._check_dangerous()
|
||||||
# No mimetype, should not happen.
|
|
||||||
self.make_dangerous()
|
|
||||||
|
|
||||||
if not self.has_extension():
|
|
||||||
self.make_dangerous()
|
|
||||||
|
|
||||||
if self.extension in mal_ext:
|
|
||||||
self.log_details.update({'malicious_extension': self.extension})
|
|
||||||
self.make_dangerous()
|
|
||||||
|
|
||||||
if self.is_dangerous():
|
if self.is_dangerous():
|
||||||
return
|
return
|
||||||
|
|
||||||
self.log_details.update({'maintype': self.main_type,
|
self.log_details.update({'maintype': self.main_type,
|
||||||
'subtype': self.sub_type,
|
'subtype': self.sub_type,
|
||||||
'extension': self.extension})
|
'extension': self.extension})
|
||||||
|
self._check_extension()
|
||||||
|
self._check_mime()
|
||||||
|
|
||||||
# Check correlation known extension => actual mime type
|
def _check_dangerous(self):
|
||||||
|
if not self.has_mimetype():
|
||||||
|
# No mimetype, should not happen.
|
||||||
|
self.make_dangerous()
|
||||||
|
if not self.has_extension():
|
||||||
|
self.make_dangerous()
|
||||||
|
if self.extension in MAL_EXTS:
|
||||||
|
self.log_details.update({'malicious_extension': self.extension})
|
||||||
|
self.make_dangerous()
|
||||||
|
|
||||||
|
def _check_extension(self):
|
||||||
|
"""Guesses the file's mimetype based on its extension. If the file's
|
||||||
|
mimetype (as determined by libmagic) is contained in the mimetype
|
||||||
|
module's list of valid mimetypes and the expected mimetype based on its
|
||||||
|
extension differs from the mimetype determined by libmagic, then it
|
||||||
|
marks the file as dangerous."""
|
||||||
if propertype.get(self.extension) is not None:
|
if propertype.get(self.extension) is not None:
|
||||||
expected_mimetype = propertype.get(self.extension)
|
expected_mimetype = propertype.get(self.extension)
|
||||||
else:
|
else:
|
||||||
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
|
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
|
||||||
if aliases.get(expected_mimetype) is not None:
|
if aliases.get(expected_mimetype) is not None:
|
||||||
expected_mimetype = aliases.get(expected_mimetype)
|
expected_mimetype = aliases.get(expected_mimetype)
|
||||||
|
|
||||||
is_known_extension = self.extension in mimetypes.types_map.keys()
|
is_known_extension = self.extension in mimetypes.types_map.keys()
|
||||||
if is_known_extension and expected_mimetype != self.mimetype:
|
if is_known_extension and expected_mimetype != self.mimetype:
|
||||||
self.log_details.update({'expected_mimetype': expected_mimetype})
|
self.log_details.update({'expected_mimetype': expected_mimetype})
|
||||||
self.make_dangerous()
|
self.make_dangerous()
|
||||||
|
|
||||||
# check correlation actual mime type => known extensions
|
def _check_mime(self):
|
||||||
|
"""Takes the mimetype (as determined by libmagic) and determines
|
||||||
|
whether the list of extensions that are normally associated with
|
||||||
|
that extension contains the file's actual extension."""
|
||||||
if aliases.get(self.mimetype) is not None:
|
if aliases.get(self.mimetype) is not None:
|
||||||
mimetype = aliases.get(self.mimetype)
|
mimetype = aliases.get(self.mimetype)
|
||||||
else:
|
else:
|
||||||
mimetype = self.mimetype
|
mimetype = self.mimetype
|
||||||
|
|
||||||
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
|
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
|
||||||
if expected_extensions:
|
if expected_extensions:
|
||||||
if len(self.extension) > 0 and self.extension not in expected_extensions:
|
if len(self.extension) > 0 and self.extension not in expected_extensions:
|
||||||
self.log_details.update({'expected_extensions': expected_extensions})
|
self.log_details.update({'expected_extensions': expected_extensions})
|
||||||
self.make_dangerous()
|
self.make_dangerous()
|
||||||
else:
|
|
||||||
# there are no known extensions associated to this mimetype.
|
|
||||||
pass
|
|
||||||
|
|
||||||
def has_metadata(self):
|
def has_metadata(self):
|
||||||
if self.mimetype in mimes_metadata:
|
if self.mimetype in mimes_metadata:
|
||||||
|
@ -144,18 +144,14 @@ class File(FileBase):
|
||||||
|
|
||||||
class KittenGroomerFileCheck(KittenGroomerBase):
|
class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
|
|
||||||
def __init__(self, root_src=None, root_dst=None, max_recursive=2, debug=False):
|
def __init__(self, root_src=None, root_dst=None, max_recursive_depth=2, debug=False):
|
||||||
'''
|
|
||||||
Initialize the basics of the conversion process
|
|
||||||
'''
|
|
||||||
if root_src is None:
|
if root_src is None:
|
||||||
root_src = os.path.join(os.sep, 'media', 'src')
|
root_src = os.path.join(os.sep, 'media', 'src')
|
||||||
if root_dst is None:
|
if root_dst is None:
|
||||||
root_dst = os.path.join(os.sep, 'media', 'dst')
|
root_dst = os.path.join(os.sep, 'media', 'dst')
|
||||||
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
|
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
|
||||||
|
self.recursive_archive_depth = 0
|
||||||
self.recursive = 0
|
self.max_recursive_depth = max_recursive_depth
|
||||||
self.max_recursive = max_recursive
|
|
||||||
|
|
||||||
subtypes_apps = [
|
subtypes_apps = [
|
||||||
(mimes_office, self._winoffice),
|
(mimes_office, self._winoffice),
|
||||||
|
@ -189,21 +185,18 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
'inode': self.inode,
|
'inode': self.inode,
|
||||||
}
|
}
|
||||||
|
|
||||||
# ##### Helpers #####
|
# ##### Helper functions #####
|
||||||
def _init_subtypes_application(self, subtypes_application):
|
def _init_subtypes_application(self, subtypes_application):
|
||||||
'''
|
"""Creates a dictionary with the right method based on the sub mime type."""
|
||||||
Create the Dict to pick the right function based on the sub mime type
|
subtype_dict = {}
|
||||||
'''
|
for list_subtypes, func in subtypes_application:
|
||||||
to_return = {}
|
|
||||||
for list_subtypes, fct in subtypes_application:
|
|
||||||
for st in list_subtypes:
|
for st in list_subtypes:
|
||||||
to_return[st] = fct
|
subtype_dict[st] = func
|
||||||
return to_return
|
return subtype_dict
|
||||||
|
|
||||||
def _print_log(self):
|
def _print_log(self):
|
||||||
'''
|
"""Print the logs related to the current file being processed."""
|
||||||
Print the logs related to the current file being processed
|
# TODO: change name to _write_log
|
||||||
'''
|
|
||||||
tmp_log = self.log_name.fields(**self.cur_file.log_details)
|
tmp_log = self.log_name.fields(**self.cur_file.log_details)
|
||||||
if self.cur_file.is_dangerous():
|
if self.cur_file.is_dangerous():
|
||||||
tmp_log.warning(self.cur_file.log_string)
|
tmp_log.warning(self.cur_file.log_string)
|
||||||
|
@ -212,66 +205,53 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
else:
|
else:
|
||||||
tmp_log.debug(self.cur_file.log_string)
|
tmp_log.debug(self.cur_file.log_string)
|
||||||
|
|
||||||
def _run_process(self, command_line, timeout=0, background=False):
|
def _run_process(self, command_string, timeout=None):
|
||||||
'''Run subprocess, wait until it finishes'''
|
"""Run command_string in a subprocess, wait until it finishes."""
|
||||||
if timeout != 0:
|
args = shlex.split(command_string)
|
||||||
deadline = time.time() + timeout
|
|
||||||
else:
|
|
||||||
deadline = None
|
|
||||||
args = shlex.split(command_line)
|
|
||||||
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
|
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
|
||||||
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
|
try:
|
||||||
if background:
|
subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout)
|
||||||
# This timer is here to make sure the unoconv listener is properly started.
|
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
|
||||||
time.sleep(10)
|
return
|
||||||
return True
|
|
||||||
while True:
|
|
||||||
code = p.poll()
|
|
||||||
if code is not None:
|
|
||||||
break
|
|
||||||
if deadline is not None and time.time() > deadline:
|
|
||||||
p.kill()
|
|
||||||
break
|
|
||||||
time.sleep(1)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
|
# ##### Discarded mimetypes, reason in the docstring ######
|
||||||
# ##### Discarded mime types, reason in the comments ######
|
|
||||||
def inode(self):
|
def inode(self):
|
||||||
''' Usually empty file. No reason (?) to copy it on the dest key'''
|
"""Empty file or symlink."""
|
||||||
if self.cur_file.is_symlink():
|
if self.cur_file.is_symlink():
|
||||||
self.cur_file.log_string += 'Symlink to {}'.format(self.log_details['symlink'])
|
self.cur_file.log_string += 'Symlink to {}'.format(self.cur_file.log_details['symlink'])
|
||||||
else:
|
else:
|
||||||
self.cur_file.log_string += 'Inode file'
|
self.cur_file.log_string += 'Inode file'
|
||||||
|
|
||||||
def unknown(self):
|
def unknown(self):
|
||||||
''' This main type is unknown, that should not happen '''
|
"""Main type should never be unknown."""
|
||||||
self.cur_file.log_string += 'Unknown file'
|
self.cur_file.log_string += 'Unknown file'
|
||||||
|
|
||||||
def example(self):
|
def example(self):
|
||||||
'''Used in examples, should never be returned by libmagic'''
|
"""Used in examples, should never be returned by libmagic."""
|
||||||
self.cur_file.log_string += 'Example file'
|
self.cur_file.log_string += 'Example file'
|
||||||
|
|
||||||
def multipart(self):
|
def multipart(self):
|
||||||
'''Used in web apps, should never be returned by libmagic'''
|
"""Used in web apps, should never be returned by libmagic"""
|
||||||
self.cur_file.log_string += 'Multipart file'
|
self.cur_file.log_string += 'Multipart file'
|
||||||
|
|
||||||
# ##### Threated as malicious, no reason to have it on a USB key ######
|
# ##### Treated as malicious, no reason to have it on a USB key ######
|
||||||
def message(self):
|
def message(self):
|
||||||
'''Way to process message file'''
|
"""Process a message file."""
|
||||||
self.cur_file.log_string += 'Message file'
|
self.cur_file.log_string += 'Message file'
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def model(self):
|
def model(self):
|
||||||
'''Way to process model file'''
|
"""Process a model file."""
|
||||||
self.cur_file.log_string += 'Model file'
|
self.cur_file.log_string += 'Model file'
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
# ##### Converted ######
|
# ##### Files that will be converted ######
|
||||||
def text(self):
|
def text(self):
|
||||||
|
"""Process an rtf, ooxml, or plaintext file."""
|
||||||
for r in mimes_rtf:
|
for r in mimes_rtf:
|
||||||
if r in self.cur_file.sub_type:
|
if r in self.cur_file.sub_type:
|
||||||
self.cur_file.log_string += 'Rich Text file'
|
self.cur_file.log_string += 'Rich Text file'
|
||||||
|
@ -289,7 +269,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def application(self):
|
def application(self):
|
||||||
''' Everything can be there, using the subtype to decide '''
|
"""Processes an application specific file according to its subtype."""
|
||||||
for subtype, fct in self.subtypes_application.items():
|
for subtype, fct in self.subtypes_application.items():
|
||||||
if subtype in self.cur_file.sub_type:
|
if subtype in self.cur_file.sub_type:
|
||||||
fct()
|
fct()
|
||||||
|
@ -299,12 +279,13 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
self._unknown_app()
|
self._unknown_app()
|
||||||
|
|
||||||
def _executables(self):
|
def _executables(self):
|
||||||
'''Way to process executable file'''
|
"""Processes an executable file."""
|
||||||
self.cur_file.add_log_details('processing_type', 'executable')
|
self.cur_file.add_log_details('processing_type', 'executable')
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def _winoffice(self):
|
def _winoffice(self):
|
||||||
|
"""Processes a winoffice file using olefile/oletools."""
|
||||||
self.cur_file.add_log_details('processing_type', 'WinOffice')
|
self.cur_file.add_log_details('processing_type', 'WinOffice')
|
||||||
# Try as if it is a valid document
|
# Try as if it is a valid document
|
||||||
oid = oletools.oleid.OleID(self.cur_file.src_path)
|
oid = oletools.oleid.OleID(self.cur_file.src_path)
|
||||||
|
@ -343,6 +324,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def _ooxml(self):
|
def _ooxml(self):
|
||||||
|
"""Processes an ooxml file."""
|
||||||
self.cur_file.add_log_details('processing_type', 'ooxml')
|
self.cur_file.add_log_details('processing_type', 'ooxml')
|
||||||
try:
|
try:
|
||||||
doc = officedissector.doc.Document(self.cur_file.src_path)
|
doc = officedissector.doc.Document(self.cur_file.src_path)
|
||||||
|
@ -369,6 +351,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def _libreoffice(self):
|
def _libreoffice(self):
|
||||||
|
"""Processes a libreoffice file."""
|
||||||
self.cur_file.add_log_details('processing_type', 'libreoffice')
|
self.cur_file.add_log_details('processing_type', 'libreoffice')
|
||||||
# As long as there ar no way to do a sanity check on the files => dangerous
|
# As long as there ar no way to do a sanity check on the files => dangerous
|
||||||
try:
|
try:
|
||||||
|
@ -385,55 +368,69 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def _pdf(self):
|
def _pdf(self):
|
||||||
'''Way to process PDF file'''
|
"""Processes a PDF file."""
|
||||||
self.cur_file.add_log_details('processing_type', 'pdf')
|
self.cur_file.add_log_details('processing_type', 'pdf')
|
||||||
xmlDoc = PDFiD(self.cur_file.src_path)
|
xmlDoc = PDFiD(self.cur_file.src_path)
|
||||||
oPDFiD = cPDFiD(xmlDoc, True)
|
oPDFiD = cPDFiD(xmlDoc, True)
|
||||||
# TODO: other keywords?
|
# TODO: other keywords?
|
||||||
if oPDFiD.encrypt > 0:
|
if oPDFiD.encrypt.count > 0:
|
||||||
self.cur_file.add_log_details('encrypted', True)
|
self.cur_file.add_log_details('encrypted', True)
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
if oPDFiD.js > 0 or oPDFiD.javascript > 0:
|
if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0:
|
||||||
self.cur_file.add_log_details('javascript', True)
|
self.cur_file.add_log_details('javascript', True)
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
if oPDFiD.aa > 0 or oPDFiD.openaction > 0:
|
if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0:
|
||||||
self.cur_file.add_log_details('openaction', True)
|
self.cur_file.add_log_details('openaction', True)
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
if oPDFiD.richmedia > 0:
|
if oPDFiD.richmedia.count > 0:
|
||||||
self.cur_file.add_log_details('flash', True)
|
self.cur_file.add_log_details('flash', True)
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
if oPDFiD.launch > 0:
|
if oPDFiD.launch.count > 0:
|
||||||
self.cur_file.add_log_details('launch', True)
|
self.cur_file.add_log_details('launch', True)
|
||||||
self.cur_file.make_dangerous()
|
self.cur_file.make_dangerous()
|
||||||
|
|
||||||
def _archive(self):
|
def _archive(self):
|
||||||
'''Way to process Archive'''
|
"""Processes an archive using 7zip. The archive is extracted to a
|
||||||
|
temporary directory and self.processdir is called on that directory.
|
||||||
|
The recursive archive depth is increased to protect against archive
|
||||||
|
bombs."""
|
||||||
self.cur_file.add_log_details('processing_type', 'archive')
|
self.cur_file.add_log_details('processing_type', 'archive')
|
||||||
self.cur_file.is_recursive = True
|
self.cur_file.is_recursive = True
|
||||||
self.cur_file.log_string += 'Archive extracted, processing content.'
|
self.cur_file.log_string += 'Archive extracted, processing content.'
|
||||||
tmpdir = self.cur_file.dst_path + '_temp'
|
tmpdir = self.cur_file.dst_path + '_temp'
|
||||||
self._safe_mkdir(tmpdir)
|
self._safe_mkdir(tmpdir)
|
||||||
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ, self.cur_file.src_path, tmpdir)
|
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir)
|
||||||
self._run_process(extract_command)
|
self._run_process(extract_command)
|
||||||
self.recursive += 1
|
self.recursive_archive_depth += 1
|
||||||
self.tree(tmpdir)
|
self.tree(tmpdir)
|
||||||
self.processdir(tmpdir, self.cur_file.dst_path)
|
self.processdir(tmpdir, self.cur_file.dst_path)
|
||||||
self.recursive -= 1
|
self.recursive_archive_depth -= 1
|
||||||
self._safe_rmtree(tmpdir)
|
self._safe_rmtree(tmpdir)
|
||||||
|
|
||||||
|
def _handle_archivebomb(self, src_dir):
|
||||||
|
self.cur_file.make_dangerous()
|
||||||
|
self.cur_file.add_log_details('Archive Bomb', True)
|
||||||
|
self.log_name.warning('ARCHIVE BOMB.')
|
||||||
|
self.log_name.warning('The content of the archive contains recursively other archives.')
|
||||||
|
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
|
||||||
|
self._safe_rmtree(src_dir)
|
||||||
|
if src_dir.endswith('_temp'):
|
||||||
|
bomb_path = src_dir[:-len('_temp')]
|
||||||
|
self._safe_remove(bomb_path)
|
||||||
|
|
||||||
def _unknown_app(self):
|
def _unknown_app(self):
|
||||||
'''Way to process an unknown file'''
|
"""Processes an unknown file."""
|
||||||
self.cur_file.make_unknown()
|
self.cur_file.make_unknown()
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
def _binary_app(self):
|
def _binary_app(self):
|
||||||
'''Way to process an unknown binary file'''
|
"""Processses an unknown binary file."""
|
||||||
self.cur_file.make_binary()
|
self.cur_file.make_binary()
|
||||||
self._safe_copy()
|
self._safe_copy()
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
# Metadata extractors
|
# Metadata extractors
|
||||||
def _metadata_exif(self, metadataFile):
|
def _metadata_exif(self, metadata_file):
|
||||||
img = open(self.cur_file.src_path, 'rb')
|
img = open(self.cur_file.src_path, 'rb')
|
||||||
tags = None
|
tags = None
|
||||||
|
|
||||||
|
@ -459,11 +456,11 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
# Exifreader truncates data.
|
# Exifreader truncates data.
|
||||||
if len(printable) > 25 and printable.endswith(", ... ]"):
|
if len(printable) > 25 and printable.endswith(", ... ]"):
|
||||||
value = tags[tag].values
|
value = tags[tag].values
|
||||||
if isinstance(value, basestring):
|
if isinstance(value, str):
|
||||||
printable = value
|
printable = value
|
||||||
else:
|
else:
|
||||||
printable = str(value)
|
printable = str(value)
|
||||||
metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable))
|
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
|
||||||
self.cur_file.add_log_details('metadata', 'exif')
|
self.cur_file.add_log_details('metadata', 'exif')
|
||||||
img.close()
|
img.close()
|
||||||
return True
|
return True
|
||||||
|
@ -487,22 +484,36 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def extract_metadata(self):
|
def extract_metadata(self):
|
||||||
metadataFile = self._safe_metadata_split(".metadata.txt")
|
metadata_file = self._safe_metadata_split(".metadata.txt")
|
||||||
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile)
|
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadata_file)
|
||||||
metadataFile.close()
|
metadata_file.close()
|
||||||
if not success:
|
if not success:
|
||||||
# FIXME Delete empty metadata file
|
# FIXME Delete empty metadata file
|
||||||
pass
|
pass
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
# ##### Not converted, checking the mime type ######
|
# ##### Media - audio and video aren't converted ######
|
||||||
def audio(self):
|
def audio(self):
|
||||||
'''Way to process an audio file'''
|
"""Processes an audio file."""
|
||||||
self.cur_file.log_string += 'Audio file'
|
self.cur_file.log_string += 'Audio file'
|
||||||
self._media_processing()
|
self._media_processing()
|
||||||
|
|
||||||
|
def video(self):
|
||||||
|
"""Processes a video."""
|
||||||
|
self.cur_file.log_string += 'Video file'
|
||||||
|
self._media_processing()
|
||||||
|
|
||||||
|
def _media_processing(self):
|
||||||
|
"""Generic way to process all media files."""
|
||||||
|
self.cur_file.add_log_details('processing_type', 'media')
|
||||||
|
self._safe_copy()
|
||||||
|
|
||||||
def image(self):
|
def image(self):
|
||||||
'''Way to process an image'''
|
"""Processes an image.
|
||||||
|
|
||||||
|
Extracts metadata if metadata is present. Creates a temporary
|
||||||
|
directory, opens the using PIL.Image, saves it to the temporary
|
||||||
|
directory, and copies it to the destination."""
|
||||||
if self.cur_file.has_metadata():
|
if self.cur_file.has_metadata():
|
||||||
self.extract_metadata()
|
self.extract_metadata()
|
||||||
|
|
||||||
|
@ -534,46 +545,14 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
self.cur_file.log_string += 'Image file'
|
self.cur_file.log_string += 'Image file'
|
||||||
self.cur_file.add_log_details('processing_type', 'image')
|
self.cur_file.add_log_details('processing_type', 'image')
|
||||||
|
|
||||||
def video(self):
|
|
||||||
'''Way to process a video'''
|
|
||||||
self.cur_file.log_string += 'Video file'
|
|
||||||
self._media_processing()
|
|
||||||
|
|
||||||
def _media_processing(self):
|
|
||||||
'''Generic way to process all the media files'''
|
|
||||||
self.cur_file.add_log_details('processing_type', 'media')
|
|
||||||
self._safe_copy()
|
|
||||||
|
|
||||||
#######################
|
#######################
|
||||||
|
|
||||||
def processdir(self, src_dir=None, dst_dir=None):
|
def process_file(self, srcpath, dstpath, relative_path):
|
||||||
'''
|
self.cur_file = File(srcpath, dstpath)
|
||||||
Main function doing the processing
|
self.log_name.info('Processing {} ({}/{})',
|
||||||
'''
|
relative_path,
|
||||||
if src_dir is None:
|
self.cur_file.main_type,
|
||||||
src_dir = self.src_root_dir
|
self.cur_file.sub_type)
|
||||||
if dst_dir is None:
|
|
||||||
dst_dir = self.dst_root_dir
|
|
||||||
|
|
||||||
if self.recursive > 0:
|
|
||||||
self._print_log()
|
|
||||||
|
|
||||||
if self.recursive >= self.max_recursive:
|
|
||||||
self.cur_file.make_dangerous()
|
|
||||||
self.cur_file.add_log_details('Archive Bomb', True)
|
|
||||||
self.log_name.warning('ARCHIVE BOMB.')
|
|
||||||
self.log_name.warning('The content of the archive contains recursively other archives.')
|
|
||||||
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
|
|
||||||
self._safe_rmtree(src_dir)
|
|
||||||
if src_dir.endswith('_temp'):
|
|
||||||
archbomb_path = src_dir[:-len('_temp')]
|
|
||||||
self._safe_remove(archbomb_path)
|
|
||||||
|
|
||||||
for srcpath in self._list_all_files(src_dir):
|
|
||||||
self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir))
|
|
||||||
|
|
||||||
self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''),
|
|
||||||
self.cur_file.main_type, self.cur_file.sub_type)
|
|
||||||
if not self.cur_file.is_dangerous():
|
if not self.cur_file.is_dangerous():
|
||||||
self.mime_processing_options.get(self.cur_file.main_type, self.unknown)()
|
self.mime_processing_options.get(self.cur_file.main_type, self.unknown)()
|
||||||
else:
|
else:
|
||||||
|
@ -581,5 +560,25 @@ class KittenGroomerFileCheck(KittenGroomerBase):
|
||||||
if not self.cur_file.is_recursive:
|
if not self.cur_file.is_recursive:
|
||||||
self._print_log()
|
self._print_log()
|
||||||
|
|
||||||
|
def processdir(self, src_dir=None, dst_dir=None):
|
||||||
|
"""Main function coordinating file processing."""
|
||||||
|
if src_dir is None:
|
||||||
|
src_dir = self.src_root_dir
|
||||||
|
if dst_dir is None:
|
||||||
|
dst_dir = self.dst_root_dir
|
||||||
|
|
||||||
|
if self.recursive_archive_depth > 0:
|
||||||
|
self._print_log()
|
||||||
|
|
||||||
|
if self.recursive_archive_depth >= self.max_recursive_depth:
|
||||||
|
self._handle_archivebomb(src_dir)
|
||||||
|
|
||||||
|
for srcpath in self._list_all_files(src_dir):
|
||||||
|
dstpath = srcpath.replace(src_dir, dst_dir)
|
||||||
|
relative_path = srcpath.replace(src_dir + '/', '')
|
||||||
|
# which path do we want in the log?
|
||||||
|
self.process_file(srcpath, dstpath, relative_path)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main(KittenGroomerFileCheck, 'Generic version of the KittenGroomer. Convert and rename files.')
|
main(KittenGroomerFileCheck, 'File sanitizer used in CIRCLean. Renames potentially dangerous files.')
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
Examples
|
||||||
|
========
|
||||||
|
|
||||||
|
These are several sanitizers that demonstrate PyCIRCLean's capabilities. Feel free to
|
||||||
|
adapt or modify any of them to suit your requirements. In order to use any of these scripts,
|
||||||
|
you will first need to install the PyCIRCLean dependencies (preferably in a virtualenv):
|
||||||
|
|
||||||
|
```
|
||||||
|
pip install .
|
||||||
|
```
|
||||||
|
|
||||||
|
Requirements per script
|
||||||
|
=======================
|
||||||
|
|
||||||
|
generic.py
|
||||||
|
----------
|
||||||
|
|
||||||
|
This is a script that was used by an older version of CIRCLean.
|
||||||
|
|
||||||
|
Requirements by type of document:
|
||||||
|
* Office and all text files: unoconv, libreoffice
|
||||||
|
* PDF: ghostscript, pdf2htmlEX
|
||||||
|
|
||||||
|
```
|
||||||
|
# required for pdf2htmlEX
|
||||||
|
sudo add-apt-repository ppa:fontforge/fontforge --yes
|
||||||
|
sudo add-apt-repository ppa:coolwanglu/pdf2htmlex --yes
|
||||||
|
sudo apt-get update -qq
|
||||||
|
sudo apt-get install -qq libpoppler-dev libpoppler-private-dev libspiro-dev libcairo-dev libpango1.0-dev libfreetype6-dev libltdl-dev libfontforge-dev python-imaging python-pip firefox xvfb
|
||||||
|
# install pdf2htmlEX
|
||||||
|
git clone https://github.com/coolwanglu/pdf2htmlEX.git
|
||||||
|
pushd pdf2htmlEX
|
||||||
|
cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_SVG=ON .
|
||||||
|
make
|
||||||
|
sudo make install
|
||||||
|
popd
|
||||||
|
# Installing the rest
|
||||||
|
sudo apt-get install ghostscript p7zip-full p7zip-rar libreoffice unoconv
|
||||||
|
```
|
||||||
|
|
||||||
|
pier9.py
|
||||||
|
--------
|
||||||
|
|
||||||
|
This script contains a list of file formats for various brands of industrial
|
||||||
|
manufacturing equipment, such as 3d printers, CNC machines, etc. It only
|
||||||
|
copies files that match these file formats.
|
||||||
|
|
||||||
|
No external dependencies required.
|
||||||
|
|
||||||
|
specific.py
|
||||||
|
-----------
|
||||||
|
|
||||||
|
As the name suggests, this script copies only specific file formats according
|
||||||
|
to the configuration provided by the user.
|
||||||
|
|
||||||
|
No external dependencies required.
|
|
@ -1,40 +0,0 @@
|
||||||
%!
|
|
||||||
% This is a sample prefix file for creating a PDF/A document.
|
|
||||||
% Feel free to modify entries marked with "Customize".
|
|
||||||
% This assumes an ICC profile to reside in the file (ISO Coated sb.icc),
|
|
||||||
% unless the user modifies the corresponding line below.
|
|
||||||
|
|
||||||
% Define entries in the document Info dictionary :
|
|
||||||
/ICCProfile (srgb.icc) % Customise
|
|
||||||
def
|
|
||||||
|
|
||||||
[ /Title (Title) % Customise
|
|
||||||
/DOCINFO pdfmark
|
|
||||||
|
|
||||||
% Define an ICC profile :
|
|
||||||
|
|
||||||
[/_objdef {icc_PDFA} /type /stream /OBJ pdfmark
|
|
||||||
[{icc_PDFA}
|
|
||||||
<<
|
|
||||||
/N currentpagedevice /ProcessColorModel known {
|
|
||||||
currentpagedevice /ProcessColorModel get dup /DeviceGray eq
|
|
||||||
{pop 1} {
|
|
||||||
/DeviceRGB eq
|
|
||||||
{3}{4} ifelse
|
|
||||||
} ifelse
|
|
||||||
} {
|
|
||||||
(ERROR, unable to determine ProcessColorModel) == flush
|
|
||||||
} ifelse
|
|
||||||
>> /PUT pdfmark
|
|
||||||
[{icc_PDFA} ICCProfile (r) file /PUT pdfmark
|
|
||||||
|
|
||||||
% Define the output intent dictionary :
|
|
||||||
|
|
||||||
[/_objdef {OutputIntent_PDFA} /type /dict /OBJ pdfmark
|
|
||||||
[{OutputIntent_PDFA} <<
|
|
||||||
/Type /OutputIntent % Must be so (the standard requires).
|
|
||||||
/S /GTS_PDFA1 % Must be so (the standard requires).
|
|
||||||
/DestOutputProfile {icc_PDFA} % Must be so (see above).
|
|
||||||
/OutputConditionIdentifier (sRGB) % Customize
|
|
||||||
>> /PUT pdfmark
|
|
||||||
[{Catalog} <</OutputIntents [ {OutputIntent_PDFA} ]>> /PUT pdfmark
|
|
Binary file not shown.
|
@ -45,9 +45,13 @@ class FileBase(object):
|
||||||
self.dst_path = dst_path
|
self.dst_path = dst_path
|
||||||
self.log_details = {'filepath': self.src_path}
|
self.log_details = {'filepath': self.src_path}
|
||||||
self.log_string = ''
|
self.log_string = ''
|
||||||
_, self.extension = os.path.splitext(self.src_path)
|
self._determine_extension()
|
||||||
self._determine_mimetype()
|
self._determine_mimetype()
|
||||||
|
|
||||||
|
def _determine_extension(self):
|
||||||
|
_, ext = os.path.splitext(self.src_path)
|
||||||
|
self.extension = ext.lower()
|
||||||
|
|
||||||
def _determine_mimetype(self):
|
def _determine_mimetype(self):
|
||||||
if os.path.islink(self.src_path):
|
if os.path.islink(self.src_path):
|
||||||
# magic will throw an IOError on a broken symlink
|
# magic will throw an IOError on a broken symlink
|
||||||
|
@ -55,6 +59,7 @@ class FileBase(object):
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
mt = magic.from_file(self.src_path, mime=True)
|
mt = magic.from_file(self.src_path, mime=True)
|
||||||
|
# magic will always return something, even if it's just 'data'
|
||||||
except UnicodeEncodeError as e:
|
except UnicodeEncodeError as e:
|
||||||
# FIXME: The encoding of the file is broken (possibly UTF-16)
|
# FIXME: The encoding of the file is broken (possibly UTF-16)
|
||||||
mt = ''
|
mt = ''
|
||||||
|
@ -76,7 +81,6 @@ class FileBase(object):
|
||||||
Returns False + updates log if self.main_type or self.sub_type
|
Returns False + updates log if self.main_type or self.sub_type
|
||||||
are not set.
|
are not set.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self.main_type or not self.sub_type:
|
if not self.main_type or not self.sub_type:
|
||||||
self.log_details.update({'broken_mime': True})
|
self.log_details.update({'broken_mime': True})
|
||||||
return False
|
return False
|
||||||
|
@ -88,16 +92,22 @@ class FileBase(object):
|
||||||
|
|
||||||
Returns False + updates self.log_details if self.extension is not set.
|
Returns False + updates self.log_details if self.extension is not set.
|
||||||
"""
|
"""
|
||||||
if not self.extension:
|
if self.extension == '':
|
||||||
self.log_details.update({'no_extension': True})
|
self.log_details.update({'no_extension': True})
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def is_dangerous(self):
|
def is_dangerous(self):
|
||||||
"""Returns True if self.log_details contains 'dangerous'."""
|
"""Returns True if self.log_details contains 'dangerous'."""
|
||||||
if self.log_details.get('dangerous'):
|
return ('dangerous' in self.log_details)
|
||||||
return True
|
|
||||||
return False
|
def is_unknown(self):
|
||||||
|
"""Returns True if self.log_details contains 'unknown'."""
|
||||||
|
return ('unknown' in self.log_details)
|
||||||
|
|
||||||
|
def is_binary(self):
|
||||||
|
"""returns True if self.log_details contains 'binary'."""
|
||||||
|
return ('binary' in self.log_details)
|
||||||
|
|
||||||
def is_symlink(self):
|
def is_symlink(self):
|
||||||
"""Returns True and updates log if file is a symlink."""
|
"""Returns True and updates log if file is a symlink."""
|
||||||
|
@ -115,10 +125,9 @@ class FileBase(object):
|
||||||
Marks a file as dangerous.
|
Marks a file as dangerous.
|
||||||
|
|
||||||
Prepends and appends DANGEROUS to the destination file name
|
Prepends and appends DANGEROUS to the destination file name
|
||||||
to avoid double-click of death.
|
to help prevent double-click of death.
|
||||||
"""
|
"""
|
||||||
if self.is_dangerous():
|
if self.is_dangerous():
|
||||||
# Already marked as dangerous, do nothing
|
|
||||||
return
|
return
|
||||||
self.log_details['dangerous'] = True
|
self.log_details['dangerous'] = True
|
||||||
path, filename = os.path.split(self.dst_path)
|
path, filename = os.path.split(self.dst_path)
|
||||||
|
@ -126,8 +135,7 @@ class FileBase(object):
|
||||||
|
|
||||||
def make_unknown(self):
|
def make_unknown(self):
|
||||||
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
|
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
|
||||||
if self.is_dangerous() or self.log_details.get('binary'):
|
if self.is_dangerous() or self.is_binary():
|
||||||
# Already marked as dangerous or binary, do nothing
|
|
||||||
return
|
return
|
||||||
self.log_details['unknown'] = True
|
self.log_details['unknown'] = True
|
||||||
path, filename = os.path.split(self.dst_path)
|
path, filename = os.path.split(self.dst_path)
|
||||||
|
@ -136,7 +144,6 @@ class FileBase(object):
|
||||||
def make_binary(self):
|
def make_binary(self):
|
||||||
"""Marks a file as a binary and appends .bin to filename."""
|
"""Marks a file as a binary and appends .bin to filename."""
|
||||||
if self.is_dangerous():
|
if self.is_dangerous():
|
||||||
# Already marked as dangerous, do nothing
|
|
||||||
return
|
return
|
||||||
self.log_details['binary'] = True
|
self.log_details['binary'] = True
|
||||||
path, filename = os.path.split(self.dst_path)
|
path, filename = os.path.split(self.dst_path)
|
||||||
|
@ -260,6 +267,7 @@ class KittenGroomerBase(object):
|
||||||
|
|
||||||
def _safe_metadata_split(self, ext):
|
def _safe_metadata_split(self, ext):
|
||||||
"""Create a separate file to hold this file's metadata."""
|
"""Create a separate file to hold this file's metadata."""
|
||||||
|
# TODO: fix logic in this method
|
||||||
dst = self.cur_file.dst_path
|
dst = self.cur_file.dst_path
|
||||||
try:
|
try:
|
||||||
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
|
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
This directory contains extra files that may or may not be used in the project
|
|
|
@ -1,16 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
from usb.core import find
|
|
||||||
import usb.control
|
|
||||||
|
|
||||||
|
|
||||||
def is_mass_storage(dev):
|
|
||||||
import usb.util
|
|
||||||
for cfg in dev:
|
|
||||||
if usb.util.find_descriptor(cfg, bInterfaceClass=8) is not None:
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
for mass in find(find_all=True, custom_match=is_mass_storage):
|
|
||||||
print(mass)
|
|
10
setup.py
10
setup.py
|
@ -4,23 +4,21 @@ from setuptools import setup
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='kittengroomer',
|
name='kittengroomer',
|
||||||
version='2.0.2',
|
version='2.1',
|
||||||
author='Raphaël Vinot',
|
author='Raphaël Vinot',
|
||||||
author_email='raphael.vinot@circl.lu',
|
author_email='raphael.vinot@circl.lu',
|
||||||
maintainer='Raphaël Vinot',
|
maintainer='Raphaël Vinot',
|
||||||
url='https://github.com/CIRCL/CIRCLean',
|
url='https://github.com/CIRCL/CIRCLean',
|
||||||
description='Standalone CIRCLean/KittenGroomer code.',
|
description='Standalone CIRCLean/KittenGroomer code.',
|
||||||
packages=['kittengroomer'],
|
packages=['kittengroomer'],
|
||||||
scripts=['bin/generic.py', 'bin/pier9.py', 'bin/specific.py', 'bin/filecheck.py'],
|
scripts=[
|
||||||
include_package_data=True,
|
'bin/filecheck.py'
|
||||||
package_data={'data': ['PDFA_def.ps', 'srgb.icc']},
|
],
|
||||||
test_suite="tests",
|
|
||||||
classifiers=[
|
classifiers=[
|
||||||
'License :: OSI Approved :: BSD License',
|
'License :: OSI Approved :: BSD License',
|
||||||
'Development Status :: 5 - Production/Stable',
|
'Development Status :: 5 - Production/Stable',
|
||||||
'Environment :: Console',
|
'Environment :: Console',
|
||||||
'Intended Audience :: Science/Research',
|
'Intended Audience :: Science/Research',
|
||||||
'Programming Language :: Python :: 2.7',
|
|
||||||
'Programming Language :: Python :: 3',
|
'Programming Language :: Python :: 3',
|
||||||
'Topic :: Communications :: File Sharing',
|
'Topic :: Communications :: File Sharing',
|
||||||
'Topic :: Security',
|
'Topic :: Security',
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def save_logs(groomer, test_description):
|
||||||
|
divider = ('=' * 10 + '{}' + '=' * 10 + '\n')
|
||||||
|
test_log_path = 'tests/test_logs/{}.log'.format(test_description)
|
||||||
|
with open(test_log_path, 'w+') as test_log:
|
||||||
|
test_log.write(divider.format('TEST LOG'))
|
||||||
|
with open(groomer.log_processing, 'r') as logfile:
|
||||||
|
log = logfile.read()
|
||||||
|
test_log.write(log)
|
||||||
|
if groomer.debug:
|
||||||
|
if os.path.exists(groomer.log_debug_err):
|
||||||
|
test_log.write(divider.format('ERR LOG'))
|
||||||
|
with open(groomer.log_debug_err, 'r') as debug_err:
|
||||||
|
err = debug_err.read()
|
||||||
|
test_log.write(err)
|
||||||
|
if os.path.exists(groomer.log_debug_out):
|
||||||
|
test_log.write(divider.format('OUT LOG'))
|
||||||
|
with open(groomer.log_debug_out, 'r') as debug_out:
|
||||||
|
out = debug_out.read()
|
||||||
|
test_log.write(out)
|
|
@ -1,95 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
import unittest
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
|
|
||||||
|
|
||||||
from bin.specific import KittenGroomerSpec
|
|
||||||
from bin.pier9 import KittenGroomerPier9
|
|
||||||
from bin.generic import KittenGroomer
|
|
||||||
|
|
||||||
if sys.version_info.major == 2:
|
|
||||||
from bin.filecheck import KittenGroomerFileCheck
|
|
||||||
|
|
||||||
from kittengroomer import FileBase
|
|
||||||
|
|
||||||
|
|
||||||
class TestBasic(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.maxDiff = None
|
|
||||||
self.curpath = os.getcwd()
|
|
||||||
|
|
||||||
def dump_logs(self, kg):
|
|
||||||
print(open(kg.log_processing, 'rb').read())
|
|
||||||
if kg.debug:
|
|
||||||
if os.path.exists(kg.log_debug_err):
|
|
||||||
print(open(kg.log_debug_err, 'rb').read())
|
|
||||||
if os.path.exists(kg.log_debug_out):
|
|
||||||
print(open(kg.log_debug_out, 'rb').read())
|
|
||||||
|
|
||||||
def test_specific_valid(self):
|
|
||||||
src = os.path.join(self.curpath, 'tests/src2')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomerSpec(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_specific_invalid(self):
|
|
||||||
src = os.path.join(self.curpath, 'tests/src')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomerSpec(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_pier9(self):
|
|
||||||
src = os.path.join(self.curpath, 'tests/src')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomerPier9(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_generic(self):
|
|
||||||
src = os.path.join(self.curpath, 'tests/src2')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomer(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_generic_2(self):
|
|
||||||
src = os.path.join(self.curpath, 'tests/src')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomer(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_filecheck(self):
|
|
||||||
if sys.version_info.major >= 3:
|
|
||||||
return
|
|
||||||
src = os.path.join(self.curpath, 'tests/src')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomerFileCheck(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_filecheck_2(self):
|
|
||||||
if sys.version_info.major >= 3:
|
|
||||||
return
|
|
||||||
src = os.path.join(self.curpath, 'tests/src2')
|
|
||||||
dst = os.path.join(self.curpath, 'tests/dst')
|
|
||||||
spec = KittenGroomerFileCheck(src, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
self.dump_logs(spec)
|
|
||||||
|
|
||||||
def test_help_file(self):
|
|
||||||
f = FileBase('tests/src/blah.conf', 'tests/dst/blah.conf')
|
|
||||||
f.make_unknown()
|
|
||||||
f.make_binary()
|
|
||||||
f.make_unknown()
|
|
||||||
f.make_dangerous()
|
|
||||||
f.make_binary()
|
|
||||||
f.make_dangerous()
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
[autorun]
|
||||||
|
open=setup.exe
|
||||||
|
icon=setup.ico
|
||||||
|
label=My install CD
|
|
@ -0,0 +1 @@
|
||||||
|
blah
|
|
@ -1,88 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from bin.specific import KittenGroomerSpec
|
|
||||||
from bin.pier9 import KittenGroomerPier9
|
|
||||||
from bin.generic import KittenGroomer
|
|
||||||
|
|
||||||
if sys.version_info.major == 2:
|
|
||||||
from bin.filecheck import KittenGroomerFileCheck
|
|
||||||
|
|
||||||
|
|
||||||
skip = pytest.mark.skip
|
|
||||||
py2_only = pytest.mark.skipif(sys.version_info.major == 3,
|
|
||||||
reason="filecheck.py only runs on python 2")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def src_simple():
|
|
||||||
return os.path.join(os.getcwd(), 'tests/src_simple')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def src_complex():
|
|
||||||
return os.path.join(os.getcwd(), 'tests/src_complex')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def dst():
|
|
||||||
return os.path.join(os.getcwd(), 'tests/dst')
|
|
||||||
|
|
||||||
|
|
||||||
def test_specific_valid(src_simple, dst):
|
|
||||||
spec = KittenGroomerSpec(src_simple, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
|
|
||||||
def test_specific_invalid(src_complex, dst):
|
|
||||||
spec = KittenGroomerSpec(src_complex, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
|
|
||||||
def test_pier9(src_complex, dst):
|
|
||||||
spec = KittenGroomerPier9(src_complex, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
|
|
||||||
def test_generic(src_simple, dst):
|
|
||||||
spec = KittenGroomer(src_simple, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
|
|
||||||
def test_generic_2(src_complex, dst):
|
|
||||||
spec = KittenGroomer(src_complex, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
|
|
||||||
@py2_only
|
|
||||||
def test_filecheck(src_complex, dst):
|
|
||||||
spec = KittenGroomerFileCheck(src_complex, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
|
|
||||||
@py2_only
|
|
||||||
def test_filecheck_2(src_simple, dst):
|
|
||||||
spec = KittenGroomerFileCheck(src_simple, dst, debug=True)
|
|
||||||
spec.processdir()
|
|
||||||
dump_logs(spec)
|
|
||||||
|
|
||||||
## Helper functions
|
|
||||||
|
|
||||||
def dump_logs(spec):
|
|
||||||
print(open(spec.log_processing, 'rb').read())
|
|
||||||
if spec.debug:
|
|
||||||
if os.path.exists(spec.log_debug_err):
|
|
||||||
print(open(spec.log_debug_err, 'rb').read())
|
|
||||||
if os.path.exists(spec.log_debug_out):
|
|
||||||
print(open(spec.log_debug_out, 'rb').read())
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tests.logging import save_logs
|
||||||
|
try:
|
||||||
|
from bin.filecheck import KittenGroomerFileCheck, File, main
|
||||||
|
NODEPS = False
|
||||||
|
except ImportError:
|
||||||
|
NODEPS = True
|
||||||
|
|
||||||
|
skipif_nodeps = pytest.mark.skipif(NODEPS,
|
||||||
|
reason="Dependencies aren't installed")
|
||||||
|
|
||||||
|
|
||||||
|
@skipif_nodeps
|
||||||
|
class TestIntegration:
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def src_valid(self):
|
||||||
|
return os.path.join(os.getcwd(), 'tests/src_valid')
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def src_invalid(self):
|
||||||
|
return os.path.join(os.getcwd(), 'tests/src_invalid')
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def dst(self):
|
||||||
|
return os.path.join(os.getcwd(), 'tests/dst')
|
||||||
|
|
||||||
|
def test_filecheck(self, src_invalid, dst):
|
||||||
|
groomer = KittenGroomerFileCheck(src_invalid, dst, debug=True)
|
||||||
|
groomer.processdir()
|
||||||
|
test_description = "filecheck_invalid"
|
||||||
|
save_logs(groomer, test_description)
|
||||||
|
|
||||||
|
def test_filecheck_2(self, src_valid, dst):
|
||||||
|
groomer = KittenGroomerFileCheck(src_valid, dst, debug=True)
|
||||||
|
groomer.processdir()
|
||||||
|
test_description = "filecheck_valid"
|
||||||
|
save_logs(groomer, test_description)
|
||||||
|
|
||||||
|
|
||||||
|
class TestFileHandling:
|
||||||
|
pass
|
|
@ -1,25 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
import os
|
|
||||||
|
|
||||||
import kittengroomer as kg
|
|
||||||
import bin.specific as specific
|
|
||||||
|
|
||||||
PATH = os.getcwd() + '/tests/'
|
|
||||||
|
|
||||||
|
|
||||||
def test_base():
|
|
||||||
assert kg.FileBase
|
|
||||||
assert kg.KittenGroomerBase
|
|
||||||
assert kg.main
|
|
||||||
|
|
||||||
|
|
||||||
def test_help_file():
|
|
||||||
f = kg.FileBase('tests/src_complex/blah.conf', 'tests/dst/blah.conf')
|
|
||||||
f.make_unknown()
|
|
||||||
f.make_binary()
|
|
||||||
f.make_unknown()
|
|
||||||
f.make_dangerous()
|
|
||||||
f.make_binary()
|
|
||||||
f.make_dangerous()
|
|
|
@ -2,14 +2,12 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from kittengroomer import FileBase, KittenGroomerBase
|
from kittengroomer import FileBase, KittenGroomerBase
|
||||||
from kittengroomer.helpers import ImplementationRequired
|
from kittengroomer.helpers import ImplementationRequired
|
||||||
|
|
||||||
PY3 = sys.version_info.major == 3
|
|
||||||
skip = pytest.mark.skip
|
skip = pytest.mark.skip
|
||||||
xfail = pytest.mark.xfail
|
xfail = pytest.mark.xfail
|
||||||
fixture = pytest.fixture
|
fixture = pytest.fixture
|
||||||
|
@ -21,7 +19,7 @@ class TestFileBase:
|
||||||
|
|
||||||
@fixture
|
@fixture
|
||||||
def source_file(self):
|
def source_file(self):
|
||||||
return 'tests/src_simple/blah.conf'
|
return 'tests/src_valid/blah.conf'
|
||||||
|
|
||||||
@fixture
|
@fixture
|
||||||
def dest_file(self):
|
def dest_file(self):
|
||||||
|
@ -84,23 +82,15 @@ class TestFileBase:
|
||||||
# We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log)
|
# We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log)
|
||||||
|
|
||||||
def test_create(self):
|
def test_create(self):
|
||||||
file = FileBase('tests/src_simple/blah.conf', '/tests/dst/blah.conf')
|
file = FileBase('tests/src_valid/blah.conf', '/tests/dst/blah.conf')
|
||||||
|
|
||||||
def test_create_broken(self, tmpdir):
|
def test_create_broken(self, tmpdir):
|
||||||
with pytest.raises(TypeError):
|
with pytest.raises(TypeError):
|
||||||
file_no_args = FileBase()
|
file_no_args = FileBase()
|
||||||
if PY3:
|
|
||||||
with pytest.raises(FileNotFoundError):
|
with pytest.raises(FileNotFoundError):
|
||||||
file_empty_args = FileBase('', '')
|
file_empty_args = FileBase('', '')
|
||||||
else:
|
|
||||||
with pytest.raises(IOError):
|
|
||||||
file_empty_args = FileBase('', '')
|
|
||||||
if PY3:
|
|
||||||
with pytest.raises(IsADirectoryError):
|
with pytest.raises(IsADirectoryError):
|
||||||
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
|
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
|
||||||
else:
|
|
||||||
with pytest.raises(IOError):
|
|
||||||
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
|
|
||||||
# are there other cases here? path to a file that doesn't exist? permissions?
|
# are there other cases here? path to a file that doesn't exist? permissions?
|
||||||
|
|
||||||
def test_init(self, generic_conf_file):
|
def test_init(self, generic_conf_file):
|
||||||
|
@ -113,6 +103,13 @@ class TestFileBase:
|
||||||
# assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable
|
# assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable
|
||||||
# we should probably check for more extensions here
|
# we should probably check for more extensions here
|
||||||
|
|
||||||
|
def test_extension_uppercase(self, tmpdir):
|
||||||
|
file_path = tmpdir.join('TEST.TXT')
|
||||||
|
file_path.write('testing')
|
||||||
|
file_path = file_path.strpath
|
||||||
|
file = FileBase(file_path, file_path)
|
||||||
|
assert file.extension == '.txt'
|
||||||
|
|
||||||
def test_mimetypes(self, generic_conf_file):
|
def test_mimetypes(self, generic_conf_file):
|
||||||
assert generic_conf_file.has_mimetype()
|
assert generic_conf_file.has_mimetype()
|
||||||
assert generic_conf_file.mimetype == 'text/plain'
|
assert generic_conf_file.mimetype == 'text/plain'
|
||||||
|
@ -221,7 +218,7 @@ class TestKittenGroomerBase:
|
||||||
|
|
||||||
@fixture
|
@fixture
|
||||||
def source_directory(self):
|
def source_directory(self):
|
||||||
return 'tests/src_complex'
|
return 'tests/src_invalid'
|
||||||
|
|
||||||
@fixture
|
@fixture
|
||||||
def dest_directory(self):
|
def dest_directory(self):
|
Loading…
Reference in New Issue