Merge branch 'master' of github.com:CIRCL/PyCIRCLean

pull/13/head
Raphaël Vinot 2017-03-10 14:45:11 +01:00
commit 8a186bfd26
40 changed files with 374 additions and 545 deletions

10
.gitignore vendored
View File

@ -67,8 +67,8 @@ target/
*.vrb
# Project specific
/tests/dst/*
!/tests/logs/
!/tests/.keepdir
tests/dst/*
tests/test_logs/*
!tests/**/.keepdir
!tests/src_invalid/*
!tests/src_valid/*

View File

@ -1,7 +1,6 @@
language: python
python:
- 2.7
- 3.3
- 3.4
- 3.5
@ -17,8 +16,6 @@ addons:
packages:
# General dependencies
- p7zip-full
# generic.py dependencies
- ghostscript
# Testing dependencies
- mercurial
@ -26,45 +23,28 @@ install:
# General dependencies
- sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ trusty multiverse" && sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu/ trusty-updates multiverse"
- sudo apt-get update -qq
- sudo apt-get install -y p7zip-rar
# generic.py: pdf2htmlEX + dependencies
- sudo add-apt-repository ppa:fontforge/fontforge --yes
# to get a working 0.26 poppler
- sudo add-apt-repository ppa:delayargentina/delayx --yes
- sudo apt-get update -qq
- sudo apt-get install -y libpoppler-dev libpoppler-private-dev libspiro-dev libcairo-dev libpango1.0-dev libfreetype6-dev libltdl-dev libfontforge-dev python-imaging python-pip firefox xvfb
- git clone https://github.com/coolwanglu/pdf2htmlEX.git
- pushd pdf2htmlEX
- cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_SVG=ON .
- make
- sudo make install
- popd
# generic.py: Other dependencies
- sudo apt-get install -y libreoffice libreoffice-script-provider-python unoconv
- sudo apt-get install -y p7zip-rar python-pip
# filecheck.py dependencies
- sudo apt-get install libxml2-dev libxslt1-dev
- wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
- unzip pdfid_v0_2_1.zip
- pip install -U pip
- pip install lxml exifread pillow
- pip install git+https://github.com/Rafiot/officedissector.git
- |
if [[ "$TRAVIS_PYTHON_VERSION" == 2* ]]; then
pip install -U oletools olefile
fi
# Module dependencies
- pip install lxml exifread pillow olefile
- pip install git+https://github.com/decalage2/oletools.git
- pip install git+https://github.com/grierforensics/officedissector.git
# PyCIRCLean dependencies
- pip install -r dev-requirements.txt
- pip install coveralls codecov
# Testing dependencies
- sudo apt-get install rar
# Prepare tests
# Zoo
# Malware from theZoo
- git clone https://github.com/Rafiot/theZoo.git
- pushd theZoo/malwares/Binaries
- python unpackall.py
- popd
- mv theZoo/malwares/Binaries/out tests/src_complex/
# Path traversal
- mv theZoo/malwares/Binaries/out tests/src_invalid/
# Path traversal attacks
- git clone https://github.com/jwilk/path-traversal-samples
- pushd path-traversal-samples
- pushd zip
@ -74,25 +54,25 @@ install:
- make
- popd
- popd
- mv path-traversal-samples/zip/*.zip tests/src_complex/
- mv path-traversal-samples/rar/*.rar tests/src_complex/
- mv path-traversal-samples/zip/*.zip tests/src_invalid/
- mv path-traversal-samples/rar/*.rar tests/src_invalid/
# Office docs
- git clone https://github.com/eea/odfpy.git
- mv odfpy/tests/examples/* tests/src_complex/
- pushd tests/src_complex/
- mv odfpy/tests/examples/* tests/src_invalid/
- pushd tests/src_invalid/
- wget https://bitbucket.org/decalage/olefileio_pl/raw/3073963b640935134ed0da34906fea8e506460be/Tests/images/test-ole-file.doc
- wget --no-check-certificate https://www.officedissector.com/corpus/fraunhoferlibrary.zip
- unzip -o fraunhoferlibrary.zip
- rm fraunhoferlibrary.zip
- 7z x 42.zip -p42
- 7z x -p42 42.zip
# Some random samples
- wget http://www.sample-videos.com/audio/mp3/india-national-anthem.mp3
- wget http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4
- wget http://thewalter.net/stef/software/rtfx/sample.rtf
- echo "blah" > test.obj
- popd
script:
- travis_wait 60 py.test --cov=kittengroomer --cov=bin tests/
- travis_wait 30 py.test --cov=kittengroomer --cov=bin tests/
notifications:
email:

View File

@ -1,9 +0,0 @@
Changelog
=========
2.1.0
---
New features:
Fixes:

19
CHANGELOG.md Normal file
View File

@ -0,0 +1,19 @@
Changelog
=========
2.1.0
---
New features:
- Dropped Python 2.7 support: PyCIRCLean is now Python 3.3+ only
- Tests are now easier to write and run: we have support for pytest and tox!
- More documentation: both docstrings and more detailed readmes
- Added more types of examples for testing
- The Travis build now runs in ~10 minutes vs. ~30 minutes before
Fixes:
- Extension matching now catches lower/upper case errors
- Fixed remaining python 3 issues with filecheck.py
- Fixed support for .rtf files
- Many other small filetype related fixes

View File

@ -29,5 +29,13 @@ or if you have an example you'd like to contribute.
Running the tests
=================
* Running the tests is easy. First, make sure you've installed the project and testing dependencies.
Then, run `python -m pytest` or just `pytest` in the top level or /tests directory.
* Running the tests is fairly straightforward.
* First, make sure you've installed the project and testing dependencies.
* Then, run `python -m pytest` or just `pytest` in the top level directory of the module.
* Each integration test that runs will generate a timestamped copy of the log for that run
in the tests/testlogs directory.
* If you'd like to get information about code coverage, run the tests using
`pytest --cov=kittengroomer`.
* You can test with multiple versions of Python if you have them installed
by running `pip install tox` and then `tox`. Make sure you modify "envlist"
in tox.ini for the Python versions you plan to use.

View File

@ -1 +1 @@
include kittengroomer/data/* README.md CONTRIBUTING.md CHANGELOG dev-requirements.txt
include README.md CONTRIBUTING.md CHANGELOG dev-requirements.txt

View File

@ -7,7 +7,7 @@
PyCIRCLean is the core Python code used by [CIRCLean](https://github.com/CIRCL/Circlean/), an open-source
USB key and document sanitizer created by [CIRCL](https://www.circl.lu/). This module has been separated from the
device-specific scripts and can be used for dedicated security applications to sanitize documents from hostile environments
to trusted environments.
to trusted environments. PyCIRCLean is currently Python 3.3+ only.
# Installation
@ -26,7 +26,7 @@ pip install .
PyCIRCLean is a simple Python library to handle file checking and sanitization. PyCIRCLean is designed as a simple library
that can be overloaded to cover specific checking and sanitization workflows in different organizations like industrial
environments or restricted/classified ICT environments. A series of practical examples utilizing PyCIRCLean can be found
in the [./bin](./bin) directory.
in the [./examples](./examples) directory.
The following simple example using PyCIRCLean will only copy files with a .conf extension matching the 'text/plain' MIME
type. If any other file is found in the source directory, the files won't be copied to the destination directory.

View File

@ -1,70 +1,33 @@
Example scripts
===============
These are a series of example scripts designed to demonstrate PyCIRCLean's capabilities. Feel free to
adapt or modify any of them to suit your requirements. In order to use any of these scripts, you will need to
install the PyCIRCLean dependencies (preferably in a virtualenv):
```
pip install git+https://github.com/ahupp/python-magic.git # we cannot use the PyPi package for now due to a bug
python setup.py install # from the root of the repository
```
Requirements per script
=======================
filecheck.py
------------
============
*WARNING*: Only works with Python 2.7 (oletools and olefile aren't ported to Python3 for now)
This is the script used by the [CIRCLean](https://github.com/CIRCL/Circlean)
USB key sanitizer. It is designed to handle a range of file types, and will
mark them as dangerous if they meet certain criteria.
Requirements by type of document:
Before installing the filecheck.py depenencies, make sure to install the PyCIRCLean
dependencies:
```
pip install .
```
Dependencies by type of document:
* Microsoft office: oletools, olefile
* OOXML: officedissector
* PDF: pdfid
* Archives: p7zip-full, p7zip-rar
* Metadata: exifread
* Images: pillow
Note: pdfid is a not installable with pip. It must be downloaded and installed
manually in the directory where filecheck will be run.
```
sudo apt-get install p7zip-full p7zip-rar libxml2-dev libxslt1-dev
pip install lxml officedissector git+https://github.com/ahupp/python-magic.git oletools olefile
pip install lxml oletools olefile pillow exifread
pip install git+https://github.com/Rafiot/officedissector.git
# pdfid is not a package, installing manually
# installing pdfid manually
wget https://didierstevens.com/files/software/pdfid_v0_2_1.zip
unzip pdfid_v0_2_1.zip
python setup.py -q install
```
generic.py
----------
Requirements by type of document:
* Office and all text files: unoconv, libreoffice
* PDF: ghostscript, pdf2htmlEX
```
# required for pdf2htmlEX
sudo add-apt-repository ppa:fontforge/fontforge --yes
sudo add-apt-repository ppa:coolwanglu/pdf2htmlex --yes
sudo apt-get update -qq
sudo apt-get install -qq libpoppler-dev libpoppler-private-dev libspiro-dev libcairo-dev libpango1.0-dev libfreetype6-dev libltdl-dev libfontforge-dev python-imaging python-pip firefox xvfb
# install pdf2htmlEX
git clone https://github.com/coolwanglu/pdf2htmlEX.git
pushd pdf2htmlEX
cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_SVG=ON .
make
sudo make install
popd
# Installing the rest
sudo apt-get install ghostscript p7zip-full p7zip-rar libreoffice unoconv
```
pier9.py
--------
No external dependencies required.
specific.py
-----------
No external dependencies required.

View File

@ -1,11 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import mimetypes
import shlex
import subprocess
import time
import zipfile
import oletools.oleid
@ -21,8 +19,7 @@ from pdfid import PDFiD, cPDFiD
from kittengroomer import FileBase, KittenGroomerBase, main
SEVENZ = '/usr/bin/7z'
PY3 = sys.version_info.major == 3
SEVENZ_PATH = '/usr/bin/7z'
# Prepare application/<subtype>
@ -41,7 +38,7 @@ mimes_data = ['octet-stream']
mimes_exif = ['image/jpeg', 'image/tiff']
mimes_png = ['image/png']
# Mime types we can pull metadata from
# Mimetypes we can pull metadata from
mimes_metadata = ['image/jpeg', 'image/tiff', 'image/png']
# Aliases
@ -62,7 +59,7 @@ propertype = {'.gz': 'application/gzip'}
# Commonly used malicious extensions
# Sources: http://www.howtogeek.com/137270/50-file-extensions-that-are-potentially-dangerous-on-windows/
# https://github.com/wiregit/wirecode/blob/master/components/core-settings/src/main/java/org/limewire/core/settings/FilterSettings.java
mal_ext = (
MAL_EXTS = (
# Applications
".exe", ".pif", ".application", ".gadget", ".msi", ".msp", ".com", ".scr",
".hta", ".cpl", ".msc", ".jar",
@ -86,55 +83,58 @@ mal_ext = (
class File(FileBase):
def __init__(self, src_path, dst_path):
''' Init file object, set the mimetype '''
super(File, self).__init__(src_path, dst_path)
self.is_recursive = False
if not self.has_mimetype():
# No mimetype, should not happen.
self.make_dangerous()
if not self.has_extension():
self.make_dangerous()
if self.extension in mal_ext:
self.log_details.update({'malicious_extension': self.extension})
self.make_dangerous()
self._check_dangerous()
if self.is_dangerous():
return
self.log_details.update({'maintype': self.main_type,
'subtype': self.sub_type,
'extension': self.extension})
self._check_extension()
self._check_mime()
# Check correlation known extension => actual mime type
def _check_dangerous(self):
if not self.has_mimetype():
# No mimetype, should not happen.
self.make_dangerous()
if not self.has_extension():
self.make_dangerous()
if self.extension in MAL_EXTS:
self.log_details.update({'malicious_extension': self.extension})
self.make_dangerous()
def _check_extension(self):
"""Guesses the file's mimetype based on its extension. If the file's
mimetype (as determined by libmagic) is contained in the mimetype
module's list of valid mimetypes and the expected mimetype based on its
extension differs from the mimetype determined by libmagic, then it
marks the file as dangerous."""
if propertype.get(self.extension) is not None:
expected_mimetype = propertype.get(self.extension)
else:
expected_mimetype, encoding = mimetypes.guess_type(self.src_path, strict=False)
if aliases.get(expected_mimetype) is not None:
expected_mimetype = aliases.get(expected_mimetype)
is_known_extension = self.extension in mimetypes.types_map.keys()
if is_known_extension and expected_mimetype != self.mimetype:
self.log_details.update({'expected_mimetype': expected_mimetype})
self.make_dangerous()
# check correlation actual mime type => known extensions
def _check_mime(self):
"""Takes the mimetype (as determined by libmagic) and determines
whether the list of extensions that are normally associated with
that extension contains the file's actual extension."""
if aliases.get(self.mimetype) is not None:
mimetype = aliases.get(self.mimetype)
else:
mimetype = self.mimetype
expected_extensions = mimetypes.guess_all_extensions(mimetype, strict=False)
if expected_extensions:
if len(self.extension) > 0 and self.extension not in expected_extensions:
self.log_details.update({'expected_extensions': expected_extensions})
self.make_dangerous()
else:
# there are no known extensions associated to this mimetype.
pass
def has_metadata(self):
if self.mimetype in mimes_metadata:
@ -144,18 +144,14 @@ class File(FileBase):
class KittenGroomerFileCheck(KittenGroomerBase):
def __init__(self, root_src=None, root_dst=None, max_recursive=2, debug=False):
'''
Initialize the basics of the conversion process
'''
def __init__(self, root_src=None, root_dst=None, max_recursive_depth=2, debug=False):
if root_src is None:
root_src = os.path.join(os.sep, 'media', 'src')
if root_dst is None:
root_dst = os.path.join(os.sep, 'media', 'dst')
super(KittenGroomerFileCheck, self).__init__(root_src, root_dst, debug)
self.recursive = 0
self.max_recursive = max_recursive
self.recursive_archive_depth = 0
self.max_recursive_depth = max_recursive_depth
subtypes_apps = [
(mimes_office, self._winoffice),
@ -189,21 +185,18 @@ class KittenGroomerFileCheck(KittenGroomerBase):
'inode': self.inode,
}
# ##### Helpers #####
# ##### Helper functions #####
def _init_subtypes_application(self, subtypes_application):
'''
Create the Dict to pick the right function based on the sub mime type
'''
to_return = {}
for list_subtypes, fct in subtypes_application:
"""Creates a dictionary with the right method based on the sub mime type."""
subtype_dict = {}
for list_subtypes, func in subtypes_application:
for st in list_subtypes:
to_return[st] = fct
return to_return
subtype_dict[st] = func
return subtype_dict
def _print_log(self):
'''
Print the logs related to the current file being processed
'''
"""Print the logs related to the current file being processed."""
# TODO: change name to _write_log
tmp_log = self.log_name.fields(**self.cur_file.log_details)
if self.cur_file.is_dangerous():
tmp_log.warning(self.cur_file.log_string)
@ -212,66 +205,53 @@ class KittenGroomerFileCheck(KittenGroomerBase):
else:
tmp_log.debug(self.cur_file.log_string)
def _run_process(self, command_line, timeout=0, background=False):
'''Run subprocess, wait until it finishes'''
if timeout != 0:
deadline = time.time() + timeout
else:
deadline = None
args = shlex.split(command_line)
def _run_process(self, command_string, timeout=None):
"""Run command_string in a subprocess, wait until it finishes."""
args = shlex.split(command_string)
with open(self.log_debug_err, 'ab') as stderr, open(self.log_debug_out, 'ab') as stdout:
p = subprocess.Popen(args, stdout=stdout, stderr=stderr)
if background:
# This timer is here to make sure the unoconv listener is properly started.
time.sleep(10)
return True
while True:
code = p.poll()
if code is not None:
break
if deadline is not None and time.time() > deadline:
p.kill()
break
time.sleep(1)
try:
subprocess.check_call(args, stdout=stdout, stderr=stderr, timeout=timeout)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
return
return True
#######################
# ##### Discarded mime types, reason in the comments ######
# ##### Discarded mimetypes, reason in the docstring ######
def inode(self):
''' Usually empty file. No reason (?) to copy it on the dest key'''
"""Empty file or symlink."""
if self.cur_file.is_symlink():
self.cur_file.log_string += 'Symlink to {}'.format(self.log_details['symlink'])
self.cur_file.log_string += 'Symlink to {}'.format(self.cur_file.log_details['symlink'])
else:
self.cur_file.log_string += 'Inode file'
def unknown(self):
''' This main type is unknown, that should not happen '''
"""Main type should never be unknown."""
self.cur_file.log_string += 'Unknown file'
def example(self):
'''Used in examples, should never be returned by libmagic'''
"""Used in examples, should never be returned by libmagic."""
self.cur_file.log_string += 'Example file'
def multipart(self):
'''Used in web apps, should never be returned by libmagic'''
"""Used in web apps, should never be returned by libmagic"""
self.cur_file.log_string += 'Multipart file'
# ##### Threated as malicious, no reason to have it on a USB key ######
# ##### Treated as malicious, no reason to have it on a USB key ######
def message(self):
'''Way to process message file'''
"""Process a message file."""
self.cur_file.log_string += 'Message file'
self.cur_file.make_dangerous()
self._safe_copy()
def model(self):
'''Way to process model file'''
"""Process a model file."""
self.cur_file.log_string += 'Model file'
self.cur_file.make_dangerous()
self._safe_copy()
# ##### Converted ######
# ##### Files that will be converted ######
def text(self):
"""Process an rtf, ooxml, or plaintext file."""
for r in mimes_rtf:
if r in self.cur_file.sub_type:
self.cur_file.log_string += 'Rich Text file'
@ -289,7 +269,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def application(self):
''' Everything can be there, using the subtype to decide '''
"""Processes an application specific file according to its subtype."""
for subtype, fct in self.subtypes_application.items():
if subtype in self.cur_file.sub_type:
fct()
@ -299,12 +279,13 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._unknown_app()
def _executables(self):
'''Way to process executable file'''
"""Processes an executable file."""
self.cur_file.add_log_details('processing_type', 'executable')
self.cur_file.make_dangerous()
self._safe_copy()
def _winoffice(self):
"""Processes a winoffice file using olefile/oletools."""
self.cur_file.add_log_details('processing_type', 'WinOffice')
# Try as if it is a valid document
oid = oletools.oleid.OleID(self.cur_file.src_path)
@ -343,6 +324,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def _ooxml(self):
"""Processes an ooxml file."""
self.cur_file.add_log_details('processing_type', 'ooxml')
try:
doc = officedissector.doc.Document(self.cur_file.src_path)
@ -369,6 +351,7 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def _libreoffice(self):
"""Processes a libreoffice file."""
self.cur_file.add_log_details('processing_type', 'libreoffice')
# As long as there ar no way to do a sanity check on the files => dangerous
try:
@ -385,55 +368,69 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self._safe_copy()
def _pdf(self):
'''Way to process PDF file'''
"""Processes a PDF file."""
self.cur_file.add_log_details('processing_type', 'pdf')
xmlDoc = PDFiD(self.cur_file.src_path)
oPDFiD = cPDFiD(xmlDoc, True)
# TODO: other keywords?
if oPDFiD.encrypt > 0:
if oPDFiD.encrypt.count > 0:
self.cur_file.add_log_details('encrypted', True)
self.cur_file.make_dangerous()
if oPDFiD.js > 0 or oPDFiD.javascript > 0:
if oPDFiD.js.count > 0 or oPDFiD.javascript.count > 0:
self.cur_file.add_log_details('javascript', True)
self.cur_file.make_dangerous()
if oPDFiD.aa > 0 or oPDFiD.openaction > 0:
if oPDFiD.aa.count > 0 or oPDFiD.openaction.count > 0:
self.cur_file.add_log_details('openaction', True)
self.cur_file.make_dangerous()
if oPDFiD.richmedia > 0:
if oPDFiD.richmedia.count > 0:
self.cur_file.add_log_details('flash', True)
self.cur_file.make_dangerous()
if oPDFiD.launch > 0:
if oPDFiD.launch.count > 0:
self.cur_file.add_log_details('launch', True)
self.cur_file.make_dangerous()
def _archive(self):
'''Way to process Archive'''
"""Processes an archive using 7zip. The archive is extracted to a
temporary directory and self.processdir is called on that directory.
The recursive archive depth is increased to protect against archive
bombs."""
self.cur_file.add_log_details('processing_type', 'archive')
self.cur_file.is_recursive = True
self.cur_file.log_string += 'Archive extracted, processing content.'
tmpdir = self.cur_file.dst_path + '_temp'
self._safe_mkdir(tmpdir)
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ, self.cur_file.src_path, tmpdir)
extract_command = '{} -p1 x "{}" -o"{}" -bd -aoa'.format(SEVENZ_PATH, self.cur_file.src_path, tmpdir)
self._run_process(extract_command)
self.recursive += 1
self.recursive_archive_depth += 1
self.tree(tmpdir)
self.processdir(tmpdir, self.cur_file.dst_path)
self.recursive -= 1
self.recursive_archive_depth -= 1
self._safe_rmtree(tmpdir)
def _handle_archivebomb(self, src_dir):
self.cur_file.make_dangerous()
self.cur_file.add_log_details('Archive Bomb', True)
self.log_name.warning('ARCHIVE BOMB.')
self.log_name.warning('The content of the archive contains recursively other archives.')
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
self._safe_rmtree(src_dir)
if src_dir.endswith('_temp'):
bomb_path = src_dir[:-len('_temp')]
self._safe_remove(bomb_path)
def _unknown_app(self):
'''Way to process an unknown file'''
"""Processes an unknown file."""
self.cur_file.make_unknown()
self._safe_copy()
def _binary_app(self):
'''Way to process an unknown binary file'''
"""Processses an unknown binary file."""
self.cur_file.make_binary()
self._safe_copy()
#######################
# Metadata extractors
def _metadata_exif(self, metadataFile):
def _metadata_exif(self, metadata_file):
img = open(self.cur_file.src_path, 'rb')
tags = None
@ -459,11 +456,11 @@ class KittenGroomerFileCheck(KittenGroomerBase):
# Exifreader truncates data.
if len(printable) > 25 and printable.endswith(", ... ]"):
value = tags[tag].values
if isinstance(value, basestring):
if isinstance(value, str):
printable = value
else:
printable = str(value)
metadataFile.write("Key: {}\tValue: {}\n".format(tag, printable))
metadata_file.write("Key: {}\tValue: {}\n".format(tag, printable))
self.cur_file.add_log_details('metadata', 'exif')
img.close()
return True
@ -487,22 +484,36 @@ class KittenGroomerFileCheck(KittenGroomerBase):
return False
def extract_metadata(self):
metadataFile = self._safe_metadata_split(".metadata.txt")
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadataFile)
metadataFile.close()
metadata_file = self._safe_metadata_split(".metadata.txt")
success = self.metadata_processing_options.get(self.cur_file.mimetype)(metadata_file)
metadata_file.close()
if not success:
# FIXME Delete empty metadata file
pass
#######################
# ##### Not converted, checking the mime type ######
# ##### Media - audio and video aren't converted ######
def audio(self):
'''Way to process an audio file'''
"""Processes an audio file."""
self.cur_file.log_string += 'Audio file'
self._media_processing()
def video(self):
"""Processes a video."""
self.cur_file.log_string += 'Video file'
self._media_processing()
def _media_processing(self):
"""Generic way to process all media files."""
self.cur_file.add_log_details('processing_type', 'media')
self._safe_copy()
def image(self):
'''Way to process an image'''
"""Processes an image.
Extracts metadata if metadata is present. Creates a temporary
directory, opens the using PIL.Image, saves it to the temporary
directory, and copies it to the destination."""
if self.cur_file.has_metadata():
self.extract_metadata()
@ -534,52 +545,40 @@ class KittenGroomerFileCheck(KittenGroomerBase):
self.cur_file.log_string += 'Image file'
self.cur_file.add_log_details('processing_type', 'image')
def video(self):
'''Way to process a video'''
self.cur_file.log_string += 'Video file'
self._media_processing()
def _media_processing(self):
'''Generic way to process all the media files'''
self.cur_file.add_log_details('processing_type', 'media')
self._safe_copy()
#######################
def process_file(self, srcpath, dstpath, relative_path):
self.cur_file = File(srcpath, dstpath)
self.log_name.info('Processing {} ({}/{})',
relative_path,
self.cur_file.main_type,
self.cur_file.sub_type)
if not self.cur_file.is_dangerous():
self.mime_processing_options.get(self.cur_file.main_type, self.unknown)()
else:
self._safe_copy()
if not self.cur_file.is_recursive:
self._print_log()
def processdir(self, src_dir=None, dst_dir=None):
'''
Main function doing the processing
'''
"""Main function coordinating file processing."""
if src_dir is None:
src_dir = self.src_root_dir
if dst_dir is None:
dst_dir = self.dst_root_dir
if self.recursive > 0:
if self.recursive_archive_depth > 0:
self._print_log()
if self.recursive >= self.max_recursive:
self.cur_file.make_dangerous()
self.cur_file.add_log_details('Archive Bomb', True)
self.log_name.warning('ARCHIVE BOMB.')
self.log_name.warning('The content of the archive contains recursively other archives.')
self.log_name.warning('This is a bad sign so the archive is not extracted to the destination key.')
self._safe_rmtree(src_dir)
if src_dir.endswith('_temp'):
archbomb_path = src_dir[:-len('_temp')]
self._safe_remove(archbomb_path)
if self.recursive_archive_depth >= self.max_recursive_depth:
self._handle_archivebomb(src_dir)
for srcpath in self._list_all_files(src_dir):
self.cur_file = File(srcpath, srcpath.replace(src_dir, dst_dir))
dstpath = srcpath.replace(src_dir, dst_dir)
relative_path = srcpath.replace(src_dir + '/', '')
# which path do we want in the log?
self.process_file(srcpath, dstpath, relative_path)
self.log_name.info('Processing {} ({}/{})', srcpath.replace(src_dir + '/', ''),
self.cur_file.main_type, self.cur_file.sub_type)
if not self.cur_file.is_dangerous():
self.mime_processing_options.get(self.cur_file.main_type, self.unknown)()
else:
self._safe_copy()
if not self.cur_file.is_recursive:
self._print_log()
if __name__ == '__main__':
main(KittenGroomerFileCheck, 'Generic version of the KittenGroomer. Convert and rename files.')
main(KittenGroomerFileCheck, 'File sanitizer used in CIRCLean. Renames potentially dangerous files.')

56
examples/README.md Normal file
View File

@ -0,0 +1,56 @@
Examples
========
These are several sanitizers that demonstrate PyCIRCLean's capabilities. Feel free to
adapt or modify any of them to suit your requirements. In order to use any of these scripts,
you will first need to install the PyCIRCLean dependencies (preferably in a virtualenv):
```
pip install .
```
Requirements per script
=======================
generic.py
----------
This is a script that was used by an older version of CIRCLean.
Requirements by type of document:
* Office and all text files: unoconv, libreoffice
* PDF: ghostscript, pdf2htmlEX
```
# required for pdf2htmlEX
sudo add-apt-repository ppa:fontforge/fontforge --yes
sudo add-apt-repository ppa:coolwanglu/pdf2htmlex --yes
sudo apt-get update -qq
sudo apt-get install -qq libpoppler-dev libpoppler-private-dev libspiro-dev libcairo-dev libpango1.0-dev libfreetype6-dev libltdl-dev libfontforge-dev python-imaging python-pip firefox xvfb
# install pdf2htmlEX
git clone https://github.com/coolwanglu/pdf2htmlEX.git
pushd pdf2htmlEX
cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_SVG=ON .
make
sudo make install
popd
# Installing the rest
sudo apt-get install ghostscript p7zip-full p7zip-rar libreoffice unoconv
```
pier9.py
--------
This script contains a list of file formats for various brands of industrial
manufacturing equipment, such as 3d printers, CNC machines, etc. It only
copies files that match these file formats.
No external dependencies required.
specific.py
-----------
As the name suggests, this script copies only specific file formats according
to the configuration provided by the user.
No external dependencies required.

View File

@ -1,40 +0,0 @@
%!
% This is a sample prefix file for creating a PDF/A document.
% Feel free to modify entries marked with "Customize".
% This assumes an ICC profile to reside in the file (ISO Coated sb.icc),
% unless the user modifies the corresponding line below.
% Define entries in the document Info dictionary :
/ICCProfile (srgb.icc) % Customise
def
[ /Title (Title) % Customise
/DOCINFO pdfmark
% Define an ICC profile :
[/_objdef {icc_PDFA} /type /stream /OBJ pdfmark
[{icc_PDFA}
<<
/N currentpagedevice /ProcessColorModel known {
currentpagedevice /ProcessColorModel get dup /DeviceGray eq
{pop 1} {
/DeviceRGB eq
{3}{4} ifelse
} ifelse
} {
(ERROR, unable to determine ProcessColorModel) == flush
} ifelse
>> /PUT pdfmark
[{icc_PDFA} ICCProfile (r) file /PUT pdfmark
% Define the output intent dictionary :
[/_objdef {OutputIntent_PDFA} /type /dict /OBJ pdfmark
[{OutputIntent_PDFA} <<
/Type /OutputIntent % Must be so (the standard requires).
/S /GTS_PDFA1 % Must be so (the standard requires).
/DestOutputProfile {icc_PDFA} % Must be so (see above).
/OutputConditionIdentifier (sRGB) % Customize
>> /PUT pdfmark
[{Catalog} <</OutputIntents [ {OutputIntent_PDFA} ]>> /PUT pdfmark

Binary file not shown.

View File

@ -45,9 +45,13 @@ class FileBase(object):
self.dst_path = dst_path
self.log_details = {'filepath': self.src_path}
self.log_string = ''
_, self.extension = os.path.splitext(self.src_path)
self._determine_extension()
self._determine_mimetype()
def _determine_extension(self):
_, ext = os.path.splitext(self.src_path)
self.extension = ext.lower()
def _determine_mimetype(self):
if os.path.islink(self.src_path):
# magic will throw an IOError on a broken symlink
@ -55,6 +59,7 @@ class FileBase(object):
else:
try:
mt = magic.from_file(self.src_path, mime=True)
# magic will always return something, even if it's just 'data'
except UnicodeEncodeError as e:
# FIXME: The encoding of the file is broken (possibly UTF-16)
mt = ''
@ -76,7 +81,6 @@ class FileBase(object):
Returns False + updates log if self.main_type or self.sub_type
are not set.
"""
if not self.main_type or not self.sub_type:
self.log_details.update({'broken_mime': True})
return False
@ -88,16 +92,22 @@ class FileBase(object):
Returns False + updates self.log_details if self.extension is not set.
"""
if not self.extension:
if self.extension == '':
self.log_details.update({'no_extension': True})
return False
return True
def is_dangerous(self):
"""Returns True if self.log_details contains 'dangerous'."""
if self.log_details.get('dangerous'):
return True
return False
return ('dangerous' in self.log_details)
def is_unknown(self):
"""Returns True if self.log_details contains 'unknown'."""
return ('unknown' in self.log_details)
def is_binary(self):
"""returns True if self.log_details contains 'binary'."""
return ('binary' in self.log_details)
def is_symlink(self):
"""Returns True and updates log if file is a symlink."""
@ -115,10 +125,9 @@ class FileBase(object):
Marks a file as dangerous.
Prepends and appends DANGEROUS to the destination file name
to avoid double-click of death.
to help prevent double-click of death.
"""
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
self.log_details['dangerous'] = True
path, filename = os.path.split(self.dst_path)
@ -126,8 +135,7 @@ class FileBase(object):
def make_unknown(self):
"""Marks a file as an unknown type and prepends UNKNOWN to filename."""
if self.is_dangerous() or self.log_details.get('binary'):
# Already marked as dangerous or binary, do nothing
if self.is_dangerous() or self.is_binary():
return
self.log_details['unknown'] = True
path, filename = os.path.split(self.dst_path)
@ -136,7 +144,6 @@ class FileBase(object):
def make_binary(self):
"""Marks a file as a binary and appends .bin to filename."""
if self.is_dangerous():
# Already marked as dangerous, do nothing
return
self.log_details['binary'] = True
path, filename = os.path.split(self.dst_path)
@ -179,8 +186,8 @@ class KittenGroomerBase(object):
self.log_debug_out = os.devnull
def _computehash(self, path):
"""Returns a sha1 hash of a file at a given path."""
s = hashlib.sha1()
"""Returns a sha256 hash of a file at a given path."""
s = hashlib.sha256()
with open(path, 'rb') as f:
while True:
buf = f.read(0x100000)
@ -260,9 +267,10 @@ class KittenGroomerBase(object):
def _safe_metadata_split(self, ext):
"""Create a separate file to hold this file's metadata."""
# TODO: fix logic in this method
dst = self.cur_file.dst_path
try:
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
if os.path.exists(self.cur_file.src_path + ext): # should we check dst_path as well?
raise KittenGroomerError("Cannot create split metadata file for \"" +
self.cur_file.dst_path + "\", type '" +
ext + "': File exists.")

View File

@ -1 +0,0 @@
This directory contains extra files that may or may not be used in the project

View File

@ -1,16 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from usb.core import find
import usb.control
def is_mass_storage(dev):
import usb.util
for cfg in dev:
if usb.util.find_descriptor(cfg, bInterfaceClass=8) is not None:
return True
for mass in find(find_all=True, custom_match=is_mass_storage):
print(mass)

View File

@ -4,23 +4,21 @@ from setuptools import setup
setup(
name='kittengroomer',
version='2.0.2',
version='2.1',
author='Raphaël Vinot',
author_email='raphael.vinot@circl.lu',
maintainer='Raphaël Vinot',
url='https://github.com/CIRCL/CIRCLean',
description='Standalone CIRCLean/KittenGroomer code.',
packages=['kittengroomer'],
scripts=['bin/generic.py', 'bin/pier9.py', 'bin/specific.py', 'bin/filecheck.py'],
include_package_data=True,
package_data={'data': ['PDFA_def.ps', 'srgb.icc']},
test_suite="tests",
scripts=[
'bin/filecheck.py'
],
classifiers=[
'License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Science/Research',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Topic :: Communications :: File Sharing',
'Topic :: Security',

22
tests/logging.py Normal file
View File

@ -0,0 +1,22 @@
import os
def save_logs(groomer, test_description):
divider = ('=' * 10 + '{}' + '=' * 10 + '\n')
test_log_path = 'tests/test_logs/{}.log'.format(test_description)
with open(test_log_path, 'w+') as test_log:
test_log.write(divider.format('TEST LOG'))
with open(groomer.log_processing, 'r') as logfile:
log = logfile.read()
test_log.write(log)
if groomer.debug:
if os.path.exists(groomer.log_debug_err):
test_log.write(divider.format('ERR LOG'))
with open(groomer.log_debug_err, 'r') as debug_err:
err = debug_err.read()
test_log.write(err)
if os.path.exists(groomer.log_debug_out):
test_log.write(divider.format('OUT LOG'))
with open(groomer.log_debug_out, 'r') as debug_out:
out = debug_out.read()
test_log.write(out)

View File

@ -1,95 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import unittest
import os
import sys
if __name__ == '__main__':
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
from bin.specific import KittenGroomerSpec
from bin.pier9 import KittenGroomerPier9
from bin.generic import KittenGroomer
if sys.version_info.major == 2:
from bin.filecheck import KittenGroomerFileCheck
from kittengroomer import FileBase
class TestBasic(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.curpath = os.getcwd()
def dump_logs(self, kg):
print(open(kg.log_processing, 'rb').read())
if kg.debug:
if os.path.exists(kg.log_debug_err):
print(open(kg.log_debug_err, 'rb').read())
if os.path.exists(kg.log_debug_out):
print(open(kg.log_debug_out, 'rb').read())
def test_specific_valid(self):
src = os.path.join(self.curpath, 'tests/src2')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomerSpec(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_specific_invalid(self):
src = os.path.join(self.curpath, 'tests/src')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomerSpec(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_pier9(self):
src = os.path.join(self.curpath, 'tests/src')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomerPier9(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_generic(self):
src = os.path.join(self.curpath, 'tests/src2')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomer(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_generic_2(self):
src = os.path.join(self.curpath, 'tests/src')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomer(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_filecheck(self):
if sys.version_info.major >= 3:
return
src = os.path.join(self.curpath, 'tests/src')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomerFileCheck(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_filecheck_2(self):
if sys.version_info.major >= 3:
return
src = os.path.join(self.curpath, 'tests/src2')
dst = os.path.join(self.curpath, 'tests/dst')
spec = KittenGroomerFileCheck(src, dst, debug=True)
spec.processdir()
self.dump_logs(spec)
def test_help_file(self):
f = FileBase('tests/src/blah.conf', 'tests/dst/blah.conf')
f.make_unknown()
f.make_binary()
f.make_unknown()
f.make_dangerous()
f.make_binary()
f.make_dangerous()

View File

@ -0,0 +1,4 @@
[autorun]
open=setup.exe
icon=setup.ico
label=My install CD

View File

@ -0,0 +1 @@
blah

View File

@ -1,88 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pytest
from bin.specific import KittenGroomerSpec
from bin.pier9 import KittenGroomerPier9
from bin.generic import KittenGroomer
if sys.version_info.major == 2:
from bin.filecheck import KittenGroomerFileCheck
skip = pytest.mark.skip
py2_only = pytest.mark.skipif(sys.version_info.major == 3,
reason="filecheck.py only runs on python 2")
@pytest.fixture
def src_simple():
return os.path.join(os.getcwd(), 'tests/src_simple')
@pytest.fixture
def src_complex():
return os.path.join(os.getcwd(), 'tests/src_complex')
@pytest.fixture
def dst():
return os.path.join(os.getcwd(), 'tests/dst')
def test_specific_valid(src_simple, dst):
spec = KittenGroomerSpec(src_simple, dst, debug=True)
spec.processdir()
dump_logs(spec)
def test_specific_invalid(src_complex, dst):
spec = KittenGroomerSpec(src_complex, dst, debug=True)
spec.processdir()
dump_logs(spec)
def test_pier9(src_complex, dst):
spec = KittenGroomerPier9(src_complex, dst, debug=True)
spec.processdir()
dump_logs(spec)
def test_generic(src_simple, dst):
spec = KittenGroomer(src_simple, dst, debug=True)
spec.processdir()
dump_logs(spec)
def test_generic_2(src_complex, dst):
spec = KittenGroomer(src_complex, dst, debug=True)
spec.processdir()
dump_logs(spec)
@py2_only
def test_filecheck(src_complex, dst):
spec = KittenGroomerFileCheck(src_complex, dst, debug=True)
spec.processdir()
dump_logs(spec)
@py2_only
def test_filecheck_2(src_simple, dst):
spec = KittenGroomerFileCheck(src_simple, dst, debug=True)
spec.processdir()
dump_logs(spec)
## Helper functions
def dump_logs(spec):
print(open(spec.log_processing, 'rb').read())
if spec.debug:
if os.path.exists(spec.log_debug_err):
print(open(spec.log_debug_err, 'rb').read())
if os.path.exists(spec.log_debug_out):
print(open(spec.log_debug_out, 'rb').read())

48
tests/test_filecheck.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import pytest
from tests.logging import save_logs
try:
from bin.filecheck import KittenGroomerFileCheck, File, main
NODEPS = False
except ImportError:
NODEPS = True
skipif_nodeps = pytest.mark.skipif(NODEPS,
reason="Dependencies aren't installed")
@skipif_nodeps
class TestIntegration:
@pytest.fixture
def src_valid(self):
return os.path.join(os.getcwd(), 'tests/src_valid')
@pytest.fixture
def src_invalid(self):
return os.path.join(os.getcwd(), 'tests/src_invalid')
@pytest.fixture
def dst(self):
return os.path.join(os.getcwd(), 'tests/dst')
def test_filecheck(self, src_invalid, dst):
groomer = KittenGroomerFileCheck(src_invalid, dst, debug=True)
groomer.processdir()
test_description = "filecheck_invalid"
save_logs(groomer, test_description)
def test_filecheck_2(self, src_valid, dst):
groomer = KittenGroomerFileCheck(src_valid, dst, debug=True)
groomer.processdir()
test_description = "filecheck_valid"
save_logs(groomer, test_description)
class TestFileHandling:
pass

View File

@ -1,25 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import kittengroomer as kg
import bin.specific as specific
PATH = os.getcwd() + '/tests/'
def test_base():
assert kg.FileBase
assert kg.KittenGroomerBase
assert kg.main
def test_help_file():
f = kg.FileBase('tests/src_complex/blah.conf', 'tests/dst/blah.conf')
f.make_unknown()
f.make_binary()
f.make_unknown()
f.make_dangerous()
f.make_binary()
f.make_dangerous()

View File

@ -2,14 +2,12 @@
# -*- coding: utf-8 -*-
import os
import sys
import pytest
from kittengroomer import FileBase, KittenGroomerBase
from kittengroomer.helpers import ImplementationRequired
PY3 = sys.version_info.major == 3
skip = pytest.mark.skip
xfail = pytest.mark.xfail
fixture = pytest.fixture
@ -21,7 +19,7 @@ class TestFileBase:
@fixture
def source_file(self):
return 'tests/src_simple/blah.conf'
return 'tests/src_valid/blah.conf'
@fixture
def dest_file(self):
@ -84,23 +82,15 @@ class TestFileBase:
# We should probably catch everytime that happens and tell the user explicitly happened (and maybe put it in the log)
def test_create(self):
file = FileBase('tests/src_simple/blah.conf', '/tests/dst/blah.conf')
file = FileBase('tests/src_valid/blah.conf', '/tests/dst/blah.conf')
def test_create_broken(self, tmpdir):
with pytest.raises(TypeError):
file_no_args = FileBase()
if PY3:
with pytest.raises(FileNotFoundError):
file_empty_args = FileBase('', '')
else:
with pytest.raises(IOError):
file_empty_args = FileBase('', '')
if PY3:
with pytest.raises(IsADirectoryError):
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
else:
with pytest.raises(IOError):
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
with pytest.raises(FileNotFoundError):
file_empty_args = FileBase('', '')
with pytest.raises(IsADirectoryError):
file_directory = FileBase(tmpdir.strpath, tmpdir.strpath)
# are there other cases here? path to a file that doesn't exist? permissions?
def test_init(self, generic_conf_file):
@ -113,6 +103,13 @@ class TestFileBase:
# assert file.log_details == copied_log # this fails for now, we need to make log_details undeletable
# we should probably check for more extensions here
def test_extension_uppercase(self, tmpdir):
file_path = tmpdir.join('TEST.TXT')
file_path.write('testing')
file_path = file_path.strpath
file = FileBase(file_path, file_path)
assert file.extension == '.txt'
def test_mimetypes(self, generic_conf_file):
assert generic_conf_file.has_mimetype()
assert generic_conf_file.mimetype == 'text/plain'
@ -221,7 +218,7 @@ class TestKittenGroomerBase:
@fixture
def source_directory(self):
return 'tests/src_complex'
return 'tests/src_invalid'
@fixture
def dest_directory(self):

0
tests/test_logs/.keepdir Normal file
View File

View File

@ -1,5 +1,5 @@
[tox]
envlist=py27,py35
envlist=py35
[testenv]
deps=-rdev-requirements.txt
commands= pytest tests/test_helpers.py --cov=kittengroomer
commands= pytest --cov=kittengroomer --cov=bin