Déborah Servili 2016-09-05 13:50:35 +02:00
commit 95654e083c
24 changed files with 1922 additions and 228 deletions

25
.travis.yml Normal file
View File

@ -0,0 +1,25 @@
language: python
cache: pip
python:
- "2.7"
- "3.3"
- "3.4"
- "3.5"
- "3.5-dev"
- "nightly"
install:
- pip install -U nose
- pip install coveralls
- pip install codecov
- pip install requests-mock
- pip install .
script:
- nosetests --with-coverage --cover-package=pymisp tests/test_offline.py
after_success:
- codecov
- coveralls

View File

@ -1,3 +1,10 @@
README
======
[![Documentation Status](https://readthedocs.org/projects/pymisp/badge/?version=master)](http://pymisp.readthedocs.io/en/master/?badge=master)
[![Build Status](https://travis-ci.org/MISP/PyMISP.svg?branch=master)](https://travis-ci.org/MISP/PyMISP)
[![Coverage Status](https://coveralls.io/repos/github/MISP/PyMISP/badge.svg?branch=master)](https://coveralls.io/github/MISP/PyMISP?branch=master)
# PyMISP - Python Library to access MISP
PyMISP is a Python library to access [MISP](https://github.com/MISP/MISP) platforms via their REST API.
@ -9,38 +16,39 @@ PyMISP allows you to fetch events, add or update events/attributes, add or updat
* [requests](http://docs.python-requests.org)
## Install from pip
~~~~
```
pip install pymisp
~~~~
```
## Install the lastest version from repo
~~~~
```
git clone https://github.com/CIRCL/PyMISP.git && cd PyMISP
python setup.py install
~~~~
```
## Samples and how to use PyMISP
## Samples and how to use PyMISP
Various examples and samples scripts are in the [examples/](examples/) directory.
In the examples directory, you will need to change the keys.py.sample to enter your MISP url and API key.
~~~~
```
cd examples
cp keys.py.sample keys.py
vim keys.py
~~~~
```
The API key of MISP is available in the Automation section of the MISP web interface.
To test if your URL and API keys are correct, you can test with examples/last.py to
fetch the last 10 events published.
~~~~
```
cd examples
python last.py -l 10
~~~~
```
## Documentation
@ -48,6 +56,6 @@ python last.py -l 10
Documentation can be generated with epydoc:
~~~~
epydoc --url https://github.com/CIRCL/PyMISP --graph all --name PyMISP --pdf pymisp -o doc
~~~~
```
epydoc --url https://github.com/CIRCL/PyMISP --graph all --name PyMISP --pdf pymisp -o doc
```

225
docs/Makefile Normal file
View File

@ -0,0 +1,225 @@
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
.PHONY: help
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " applehelp to make an Apple Help Book"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " epub3 to make an epub3"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " texinfo to make Texinfo files"
@echo " info to make Texinfo files and run them through makeinfo"
@echo " gettext to make PO message catalogs"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " xml to make Docutils-native XML files"
@echo " pseudoxml to make pseudoxml-XML files for display purposes"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
@echo " coverage to run coverage check of the documentation (if enabled)"
@echo " dummy to check syntax errors of document sources"
.PHONY: clean
clean:
rm -rf $(BUILDDIR)/*
.PHONY: html
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
.PHONY: dirhtml
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
.PHONY: singlehtml
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
.PHONY: pickle
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
.PHONY: json
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
.PHONY: htmlhelp
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
.PHONY: qthelp
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/PyMISP.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/PyMISP.qhc"
.PHONY: applehelp
applehelp:
$(SPHINXBUILD) -b applehelp $(ALLSPHINXOPTS) $(BUILDDIR)/applehelp
@echo
@echo "Build finished. The help book is in $(BUILDDIR)/applehelp."
@echo "N.B. You won't be able to view it unless you put it in" \
"~/Library/Documentation/Help or install it in your application" \
"bundle."
.PHONY: devhelp
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/PyMISP"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/PyMISP"
@echo "# devhelp"
.PHONY: epub
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
.PHONY: epub3
epub3:
$(SPHINXBUILD) -b epub3 $(ALLSPHINXOPTS) $(BUILDDIR)/epub3
@echo
@echo "Build finished. The epub3 file is in $(BUILDDIR)/epub3."
.PHONY: latex
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
.PHONY: latexpdf
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
.PHONY: latexpdfja
latexpdfja:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through platex and dvipdfmx..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
.PHONY: text
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
.PHONY: man
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
.PHONY: texinfo
texinfo:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
@echo "Run \`make' in that directory to run these through makeinfo" \
"(use \`make info' here to do that automatically)."
.PHONY: info
info:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo "Running Texinfo files through makeinfo..."
make -C $(BUILDDIR)/texinfo info
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
.PHONY: gettext
gettext:
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
@echo
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
.PHONY: changes
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
.PHONY: linkcheck
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
.PHONY: doctest
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
.PHONY: coverage
coverage:
$(SPHINXBUILD) -b coverage $(ALLSPHINXOPTS) $(BUILDDIR)/coverage
@echo "Testing of coverage in the sources finished, look at the " \
"results in $(BUILDDIR)/coverage/python.txt."
.PHONY: xml
xml:
$(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
@echo
@echo "Build finished. The XML files are in $(BUILDDIR)/xml."
.PHONY: pseudoxml
pseudoxml:
$(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
@echo
@echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."
.PHONY: dummy
dummy:
$(SPHINXBUILD) -b dummy $(ALLSPHINXOPTS) $(BUILDDIR)/dummy
@echo
@echo "Build finished. Dummy builder generates no files."

449
docs/source/conf.py Normal file
View File

@ -0,0 +1,449 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# PyMISP documentation build configuration file, created by
# sphinx-quickstart on Fri Aug 26 11:39:17 2016.
#
# This file is execfile()d with the current directory set to its
# containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from recommonmark.parser import CommonMarkParser
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.coverage',
'sphinx.ext.mathjax',
'sphinx.ext.ifconfig',
'sphinx.ext.viewcode',
'sphinx.ext.githubpages',
'sphinx.ext.napoleon',
]
napoleon_google_docstring = False
napoleon_use_param = False
napoleon_use_ivar = True
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
source_parsers = {
'.md': CommonMarkParser,
}
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = ['.rst', '.md']
# The encoding of source files.
#
# source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = 'PyMISP'
copyright = '2016, Raphaël Vinot'
author = 'Raphaël Vinot'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '2.4.50'
# The full version, including alpha/beta/rc tags.
release = '2.4.50'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#
# today = ''
#
# Else, today_fmt is used as the format for a strftime call.
#
# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all
# documents.
#
# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#
# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#
# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
# modindex_common_prefix = []
# If true, keep warnings as "system message" paragraphs in the built documents.
# keep_warnings = False
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
# html_theme_path = []
# The name for this set of Sphinx documents.
# "<project> v<release> documentation" by default.
#
# html_title = 'PyMISP v2.4.50'
# A shorter title for the navigation bar. Default is the same as html_title.
#
# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#
# html_logo = None
# The name of an image file (relative to this directory) to use as a favicon of
# the docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#
# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
# directly to the root of the documentation.
#
# html_extra_path = []
# If not None, a 'Last updated on:' timestamp is inserted at every page
# bottom, using the given strftime format.
# The empty string is equivalent to '%b %d, %Y'.
#
# html_last_updated_fmt = None
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#
# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#
# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#
# html_additional_pages = {}
# If false, no module index is generated.
#
# html_domain_indices = True
# If false, no index is generated.
#
# html_use_index = True
# If true, the index is split into individual pages for each letter.
#
# html_split_index = False
# If true, links to the reST sources are added to the pages.
#
# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#
# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#
# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#
# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
# html_file_suffix = None
# Language to be used for generating the HTML full-text search index.
# Sphinx supports the following languages:
# 'da', 'de', 'en', 'es', 'fi', 'fr', 'h', 'it', 'ja'
# 'nl', 'no', 'pt', 'ro', 'r', 'sv', 'tr', 'zh'
#
# html_search_language = 'en'
# A dictionary with options for the search language support, empty by default.
# 'ja' uses this config value.
# 'zh' user can custom change `jieba` dictionary path.
#
# html_search_options = {'type': 'default'}
# The name of a javascript file (relative to the configuration directory) that
# implements a search results scorer. If empty, the default will be used.
#
# html_search_scorer = 'scorer.js'
# Output file base name for HTML help builder.
htmlhelp_basename = 'PyMISPdoc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'PyMISP.tex', 'PyMISP Documentation',
'Raphaël Vinot', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#
# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#
# latex_use_parts = False
# If true, show page references after internal links.
#
# latex_show_pagerefs = False
# If true, show URL addresses after external links.
#
# latex_show_urls = False
# Documents to append as an appendix to all manuals.
#
# latex_appendices = []
# It false, will not define \strong, \code, itleref, \crossref ... but only
# \sphinxstrong, ..., \sphinxtitleref, ... To help avoid clash with user added
# packages.
#
# latex_keep_old_macro_names = True
# If false, no module index is generated.
#
# latex_domain_indices = True
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'pymisp', 'PyMISP Documentation',
[author], 1)
]
# If true, show URL addresses after external links.
#
# man_show_urls = False
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'PyMISP', 'PyMISP Documentation',
author, 'PyMISP', 'One line description of project.',
'Miscellaneous'),
]
# Documents to append as an appendix to all manuals.
#
# texinfo_appendices = []
# If false, no module index is generated.
#
# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#
# texinfo_show_urls = 'footnote'
# If true, do not generate a @detailmenu in the "Top" node's menu.
#
# texinfo_no_detailmenu = False
# -- Options for Epub output ----------------------------------------------
# Bibliographic Dublin Core info.
epub_title = project
epub_author = author
epub_publisher = author
epub_copyright = copyright
# The basename for the epub file. It defaults to the project name.
# epub_basename = project
# The HTML theme for the epub output. Since the default themes are not
# optimized for small screen space, using the same theme for HTML and epub
# output is usually not wise. This defaults to 'epub', a theme designed to save
# visual space.
#
# epub_theme = 'epub'
# The language of the text. It defaults to the language option
# or 'en' if the language is not set.
#
# epub_language = ''
# The scheme of the identifier. Typical schemes are ISBN or URL.
# epub_scheme = ''
# The unique identifier of the text. This can be a ISBN number
# or the project homepage.
#
# epub_identifier = ''
# A unique identification for the text.
#
# epub_uid = ''
# A tuple containing the cover image and cover page html template filenames.
#
# epub_cover = ()
# A sequence of (type, uri, title) tuples for the guide element of content.opf.
#
# epub_guide = ()
# HTML files that should be inserted before the pages created by sphinx.
# The format is a list of tuples containing the path and title.
#
# epub_pre_files = []
# HTML files that should be inserted after the pages created by sphinx.
# The format is a list of tuples containing the path and title.
#
# epub_post_files = []
# A list of files that should not be packed into the epub file.
epub_exclude_files = ['search.html']
# The depth of the table of contents in toc.ncx.
#
# epub_tocdepth = 3
# Allow duplicate toc entries.
#
# epub_tocdup = True
# Choose between 'default' and 'includehidden'.
#
# epub_tocscope = 'default'
# Fix unsupported image types using the Pillow.
#
# epub_fix_images = False
# Scale large images.
#
# epub_max_image_width = 0
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#
# epub_show_urls = 'inline'
# If false, no index is generated.
#
# epub_use_index = True
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/': None}

25
docs/source/index.rst Normal file
View File

@ -0,0 +1,25 @@
.. PyMISP documentation master file, created by
sphinx-quickstart on Fri Aug 26 11:39:17 2016.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to PyMISP's documentation!
==================================
Contents:
.. toctree::
:maxdepth: 2
readme
modules
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

7
docs/source/modules.rst Normal file
View File

@ -0,0 +1,7 @@
pymisp
======
.. toctree::
:maxdepth: 4
pymisp

22
docs/source/pymisp.rst Normal file
View File

@ -0,0 +1,22 @@
pymisp package
==============
Submodules
----------
pymisp.api module
-----------------
.. automodule:: pymisp.api
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: pymisp
:members:
:undoc-members:
:show-inheritance:

1
docs/source/readme.rst Normal file
View File

@ -0,0 +1 @@
.. include:: ../../README.md

View File

@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, True, 'json', debug=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create an event on MISP.')
@ -26,7 +26,4 @@ if __name__ == '__main__':
misp = init(misp_url, misp_key)
event = misp.new_event(args.distrib, args.threat, args.analysis, args.info)
print event
response = misp.add_mutex(event, 'booh')
print response
print(event)

36
examples/del.py Executable file
View File

@ -0,0 +1,36 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
import argparse
# Usage for pipe masters: ./last.py -l 5h | jq .
def init(url, key):
return PyMISP(url, key, True, 'json', debug=True)
def del_event(m, eventid):
result = m.delete_event(eventid)
print(result)
def del_attr(m, attrid):
result = m.delete_attribute(attrid)
print(result)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Delete an event from a MISP instance.')
parser.add_argument("-e", "--event", help="Event ID to delete.")
parser.add_argument("-a", "--attribute", help="Attribute ID to delete.")
args = parser.parse_args()
misp = init(misp_url, misp_key)
if args.event:
del_event(misp, args.event)
else:
del_attr(misp, args.attribute)

126
examples/et2misp.py Executable file
View File

@ -0,0 +1,126 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copy Emerging Threats Block IPs list to several MISP events
# Because of the large size of the list the first run will take a minute
# Running it again will update the MISP events if changes are detected
#
# This script requires PyMISP 2.4.50 or later
import sys, json, time, requests
from pymisp import PyMISP
from keys import misp_url, misp_key
et_url = 'https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt'
et_str = 'Emerging Threats '
def init_misp():
global mymisp
mymisp = PyMISP(misp_url, misp_key)
def load_misp_event(eid):
global et_attr
global et_drev
global et_event
et_attr = {}
et_drev = {}
et_event = mymisp.get(eid)
echeck(et_event)
for a in et_event['Event']['Attribute']:
if a['category'] == 'Network activity':
et_attr[a['value']] = a['id']
continue
if a['category'] == 'Internal reference':
et_drev = a;
def init_et():
global et_data
global et_rev
requests.packages.urllib3.disable_warnings()
s = requests.Session()
r = s.get(et_url)
if r.status_code != 200:
raise Exception('Error getting ET data: {}'.format(r.text))
name = ''
et_data = {}
et_rev = 0
for line in r.text.splitlines():
if line.startswith('# Rev '):
et_rev = int(line[6:])
continue
if line.startswith('#'):
name = line[1:].strip()
if et_rev and not et_data.get(name):
et_data[name] = {}
continue
l = line.rstrip()
if l:
et_data[name][l] = name
def update_et_event(name):
if et_drev and et_rev and int(et_drev['value']) < et_rev:
# Copy MISP attributes to new dict
et_ips = dict.fromkeys(et_attr.keys())
# Weed out attributes still in ET data
for k,v in et_data[name].items():
et_attr.pop(k, None)
# Delete the leftover attributes from MISP
for k,v in et_attr.items():
r = mymisp.delete_attribute(v)
if r.get('errors'):
print "Error deleting attribute {} ({}): {}\n".format(v,k,r['errors'])
# Weed out ips already in the MISP event
for k,v in et_ips.items():
et_data[name].pop(k, None)
# Add new attributes to MISP event
ipdst = []
for i,k in enumerate(et_data[name].items(), 1-len(et_data[name])):
ipdst.append(k[0])
if i % 100 == 0:
r = mymisp.add_ipdst(et_event, ipdst)
echeck(r, et_event['Event']['id'])
ipdst = []
# Update revision number
et_drev['value'] = et_rev
et_drev.pop('timestamp', None)
attr = []
attr.append(et_drev)
# Publish updated MISP event
et_event['Event']['Attribute'] = attr
et_event['Event']['published'] = False
et_event['Event']['date'] = time.strftime('%Y-%m-%d')
r = mymisp.publish(et_event)
echeck(r, et_event['Event']['id'])
def echeck(r, eid=None):
if r.get('errors'):
if eid:
print "Processing event {} failed: {}".format(eid, r['errors'])
else:
print r['errors']
sys.exit(1)
if __name__ == '__main__':
init_misp()
init_et()
for et_type in set(et_data.keys()):
info = et_str + et_type
r = mymisp.search_index(eventinfo=info)
if r['response']:
eid=r['response'][0]['id']
else: # event not found, create it
new_event = mymisp.new_event(info=info, distribution=3, threat_level_id=4, analysis=1)
echeck(new_event)
eid=new_event['Event']['id']
r = mymisp.add_internal_text(new_event, 1, comment='Emerging Threats revision number')
echeck(r, eid)
load_misp_event(eid)
update_et_event(et_type)

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key,misp_verifycert
from keys import misp_url, misp_key, misp_verifycert
import argparse
import os
import json
@ -10,22 +10,28 @@ import json
# Usage for pipe masters: ./last.py -l 5h | jq .
proxies = {
'http': 'http://127.0.0.1:8123',
'https': 'http://127.0.0.1:8123',
}
proxies = None
def init(url, key):
return PyMISP(url, key, misp_verifycert, 'json')
return PyMISP(url, key, misp_verifycert, 'json', proxies=proxies)
def get_event(m, event, out=None):
result = m.get_event(event)
r = result.json()
if out is None:
print(json.dumps(r) + '\n')
print(json.dumps(result) + '\n')
else:
with open(out, 'w') as f:
f.write(json.dumps(r) + '\n')
f.write(json.dumps(result) + '\n')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get an event from a MISP instance.')
parser.add_argument("-e", "--event", required=True, help="Event ID to get.")
parser.add_argument("-o", "--output", help="Output file")

View File

@ -212,7 +212,7 @@ def createTreemap(data, title, treename='attribute_treemap.svg', tablename='attr
transition='400ms ease-in',
colors=tuple(colors.values()))
treemap = pygal.Treemap(pretty_print=True, legend_at_bottom=True, style=style)
treemap = pygal.Treemap(pretty_print=True, legend_at_bottom=True, style=style, explicit_size=True, width=2048, height=2048)
treemap.title = title
treemap.print_values = True
treemap.print_labels = True

View File

@ -7,7 +7,7 @@ import argparse
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, True)
def fetch(m, all_events, event):

View File

@ -10,13 +10,13 @@ import argparse
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, True, 'json', debug=True)
def up_event(m, event, content):
with open(content, 'r') as f:
result = m.update_event(event, f.read())
print result.text
print(result)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get an event from a MISP instance.')

View File

@ -1,3 +1,3 @@
__version__ = '2.4.48.2'
__version__ = '2.4.51'
from .api import PyMISP, PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Python API using the REST interface of MISP """
"""Python API using the REST interface of MISP"""
import json
import datetime
@ -33,6 +33,29 @@ except NameError:
basestring = str
class distributions(object):
"""Enumeration of the available distributions."""
your_organization = 0
this_community = 1
connected_communities = 2
all_communities = 3
class threat_level(object):
"""Enumeration of the available threat levels."""
high = 1
medium = 2
low = 3
undefined = 4
class analysis(object):
"""Enumeration of the available analysis statuses."""
initial = 0
ongoing = 1
completed = 2
class PyMISPError(Exception):
def __init__(self, message):
super(PyMISPError, self).__init__(message)
@ -63,23 +86,6 @@ class NoKey(PyMISPError):
pass
def deprecated(func):
'''This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.'''
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.warn_explicit(
"Call to deprecated function {}.".format(func.__name__),
category=DeprecationWarning,
filename=func.__code__.co_filename,
lineno=func.__code__.co_firstlineno + 1
)
return func(*args, **kwargs)
return new_func
class PyMISP(object):
"""
Python API for MISP
@ -90,10 +96,18 @@ class PyMISP(object):
of the certificate. Or a CA_BUNDLE in case of self
signed certiifcate (the concatenation of all the
*.crt of the chain)
:param out_type: Type of object (json or xml)
:param out_type: Type of object (json) NOTE: XML output isn't supported anymore, keeping the flag for compatibility reasons.
:param debug: print all the messages received from the server
:param proxies: Proxy dict as describes here: http://docs.python-requests.org/en/master/user/advanced/#proxies
:param cert: Client certificate, as described there: http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification
"""
def __init__(self, url, key, ssl=True, out_type='json', debug=False, proxies=None):
# So it can may be accessed from the misp object.
distributions = distributions
threat_level = threat_level
analysis = analysis
def __init__(self, url, key, ssl=True, out_type='json', debug=False, proxies=None, cert=None):
if not url:
raise NoURL('Please provide the URL of your MISP instance.')
if not key:
@ -103,7 +117,9 @@ class PyMISP(object):
self.key = key
self.ssl = ssl
self.proxies = proxies
self.out_type = out_type
self.cert = cert
if out_type != 'json':
raise PyMISPError('The only output type supported by PyMISP is JSON. If you still rely on XML, use PyMISP v2.4.49')
self.debug = debug
try:
@ -112,34 +128,31 @@ class PyMISP(object):
except Exception as e:
raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e))
session = self.__prepare_session(out_type)
self.describe_types = session.get(urljoin(self.root_url, 'attributes/describeTypes.json')).json()
session = self.__prepare_session()
response = session.get(urljoin(self.root_url, 'attributes/describeTypes.json'))
self.describe_types = self._check_response(response)
if self.describe_types.get('error'):
for e in self.describe_types.get('error'):
raise PyMISPError('Failed: {}'.format(e))
self.categories = self.describe_types['result']['categories']
self.types = self.describe_types['result']['types']
self.category_type_mapping = self.describe_types['result']['category_type_mappings']
def __prepare_session(self, force_out=None):
def __prepare_session(self, output='json'):
"""
Prepare the headers of the session
:param force_out: force the type of the expect output
(overwrite the constructor)
"""
if not HAVE_REQUESTS:
raise MissingDependency('Missing dependency, install requests (`pip install requests`)')
if force_out is not None:
out = force_out
else:
out = self.out_type
session = requests.Session()
session.verify = self.ssl
session.proxies = self.proxies
session.cert = self.cert
session.headers.update(
{'Authorization': self.key,
'Accept': 'application/' + out,
'content-type': 'application/' + out})
'Accept': 'application/{}'.format(output),
'content-type': 'application/{}'.format(output)})
return session
def flatten_error_messages(self, response):
@ -153,10 +166,21 @@ class PyMISP(object):
elif response.get('errors'):
if isinstance(response['errors'], dict):
for where, errors in response['errors'].items():
for e in errors:
for type_e, msgs in e.items():
for m in msgs:
messages.append('Error in {}: {}'.format(where, m))
if isinstance(errors, dict):
for where, msg in errors.items():
if isinstance(msg, list):
for m in msg:
messages.append('Error in {}: {}'.format(where, m))
else:
messages.append('Error in {}: {}'.format(where, msg))
else:
for e in errors:
if isinstance(e, str):
messages.append(e)
continue
for type_e, msgs in e.items():
for m in msgs:
messages.append('Error in {}: {}'.format(where, m))
return messages
def _check_response(self, response):
@ -194,94 +218,93 @@ class PyMISP(object):
# ############### Simple REST API ################
# ################################################
def get_index(self, force_out=None, filters=None):
def get_index(self, filters=None):
"""
Return the index.
Warning, there's a limit on the number of results
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/index')
if filters is not None:
filters = json.dumps(filters)
print(filters)
return session.post(url, data=filters)
response = session.post(url, data=filters)
else:
return session.get(url)
response = session.get(url)
return self._check_response(response)
def get_event(self, event_id, force_out=None):
def get_event(self, event_id):
"""
Get an event
:param event_id: Event id to get
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/{}'.format(event_id))
return session.get(url)
response = session.get(url)
return self._check_response(response)
def get_stix_event(self, event_id=None, out_format="json", with_attachments=False, from_date=False, to_date=False, tags=False):
def get_stix_event(self, event_id=None, with_attachments=False, from_date=False, to_date=False, tags=False):
"""
Get an event/events in STIX format
"""
out_format = out_format.lower()
if tags:
if isinstance(tags, list):
tags = "&&".join(tags)
session = self.__prepare_session(out_format)
session = self.__prepare_session()
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
event_id, with_attachments, tags, from_date, to_date))
if self.debug:
print("Getting STIX event from {}".format(url))
return session.get(url)
response = session.get(url)
return self._check_response(response)
def add_event(self, event, force_out=None):
def add_event(self, event):
"""
Add a new event
:param event: Event as JSON object / string or XML to add
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
url = urljoin(self.root_url, 'events')
if self.out_type == 'json':
if isinstance(event, basestring):
return session.post(url, data=event)
else:
return session.post(url, data=json.dumps(event))
if isinstance(event, basestring):
response = session.post(url, data=event)
else:
return session.post(url, data=event)
response = session.post(url, data=json.dumps(event))
return self._check_response(response)
def update_event(self, event_id, event, force_out=None):
def update_event(self, event_id, event):
"""
Update an event
:param event_id: Event id to update
:param event: Event as JSON object / string or XML to add
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/{}'.format(event_id))
if self.out_type == 'json':
if isinstance(event, basestring):
return session.post(url, data=event)
else:
return session.post(url, data=json.dumps(event))
if isinstance(event, basestring):
response = session.post(url, data=event)
else:
return session.post(url, data=event)
response = session.post(url, data=json.dumps(event))
return self._check_response(response)
def delete_event(self, event_id, force_out=None):
def delete_event(self, event_id):
"""
Delete an event
:param event_id: Event id to delete
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/{}'.format(event_id))
return session.delete(url)
response = session.delete(url)
return self._check_response(response)
def delete_attribute(self, attribute_id, force_out=None):
session = self.__prepare_session(force_out)
def delete_attribute(self, attribute_id):
session = self.__prepare_session()
url = urljoin(self.root_url, 'attributes/{}'.format(attribute_id))
return session.delete(url)
response = session.delete(url)
return self._check_response(response)
# ##############################################
# ######### Event handling (Json only) #########
@ -351,54 +374,55 @@ class PyMISP(object):
event['Event']['id'] = int(event['Event']['id'])
return event
def _one_or_more(self, value):
"""Returns a list/tuple of one or more items, regardless of input."""
return value if isinstance(value, (tuple, list)) else (value,)
# ########## Helpers ##########
def get(self, eid):
response = self.get_event(int(eid), 'json')
return self._check_response(response)
response = self.get_event(int(eid))
return response
def get_stix(self, **kwargs):
response = self.get_stix_event(**kwargs)
return self._check_response(response)
return response
def update(self, event):
eid = event['Event']['id']
response = self.update_event(eid, event, 'json')
return self._check_response(response)
response = self.update_event(eid, event)
return response
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False):
data = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published)
response = self.add_event(data, 'json')
return self._check_response(response)
response = self.add_event(data)
return response
def publish(self, event):
if event['Event']['published']:
return {'error': 'Already published'}
event = self._prepare_update(event)
event['Event']['published'] = True
response = self.update_event(event['Event']['id'], event, 'json')
return self._check_response(response)
response = self.update_event(event['Event']['id'], event)
return response
def add_tag(self, event, tag):
session = self.__prepare_session('json')
session = self.__prepare_session()
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
response = session.post(urljoin(self.root_url, 'events/addTag'), data=json.dumps(to_post))
return self._check_response(response)
def remove_tag(self, event, tag):
session = self.__prepare_session('json')
session = self.__prepare_session()
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
response = session.post(urljoin(self.root_url, 'events/removeTag'), data=json.dumps(to_post))
return self._check_response(response)
def change_threat_level(self, event, threat_level_id):
event['Event']['threat_level_id'] = threat_level_id
self._prepare_update(event)
response = self.update_event(event['Event']['id'], event)
return self._check_response(response)
return response
# ##### File attributes #####
@ -409,10 +433,10 @@ class PyMISP(object):
event = self._prepare_update(event)
for a in attributes:
if a.get('distribution') is None:
a['distribution'] = event['Event']['distribution']
a['distribution'] = 5
event['Event']['Attribute'] = attributes
response = self.update_event(event['Event']['id'], event, 'json')
return self._check_response(response)
response = self.update_event(event['Event']['id'], event)
return response
def add_named_attribute(self, event, category, type_value, value, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
@ -448,22 +472,23 @@ class PyMISP(object):
def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
for link in self._one_or_more(link):
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
for name in self._one_or_more(name):
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
for filename in self._one_or_more(filename):
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
type_value = '{}'
value = '{}'
if rvalue:
type_value = 'regkey|value'
value = '{}|{}'.format(regkey, rvalue)
@ -475,20 +500,36 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_regkeys(self, event, regkeys_values, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for regkey, rvalue in regkeys_values.items():
if rvalue:
type_value = 'regkey|value'
value = '{}|{}'.format(regkey, rvalue)
else:
type_value = 'regkey'
value = regkey
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
for pattern in self._one_or_more(pattern):
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
for named_pipe in self._one_or_more(named_pipe):
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
@ -500,29 +541,34 @@ class PyMISP(object):
def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
for yara in self._one_or_more(yara):
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Network attributes #####
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
for ipdst in self._one_or_more(ipdst):
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
for ipsrc in self._one_or_more(ipsrc):
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
for hostname in self._one_or_more(hostname):
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
for domain in self._one_or_more(domain):
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
@ -530,107 +576,132 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for domain, ip in domain_ips.items():
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
for url in self._one_or_more(url):
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
for useragent in self._one_or_more(useragent):
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
for pattern in self._one_or_more(pattern):
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
for snort in self._one_or_more(snort):
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Email attributes #####
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Target attributes #####
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Attribution attributes #####
def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Internal reference attributes #####
def add_internal_link(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_comment(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_text(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_other(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##################################################
@ -679,15 +750,17 @@ class PyMISP(object):
with open(path, 'rb') as f:
return str(base64.b64encode(f.read()))
def upload_sample(self, filename, filepath, event_id, distribution, to_ids,
category, comment, info, analysis, threat_level_id):
def upload_sample(self, filename, filepath, event_id, distribution=None,
to_ids=True, category=None, comment=None, info=None,
analysis=None, threat_level_id=None):
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
to_post['request']['files'] = [{'filename': filename, 'data': self._encode_file_to_upload(filepath)}]
return self._upload_sample(to_post)
def upload_samplelist(self, filepaths, event_id, distribution, to_ids, category,
info, analysis, threat_level_id):
def upload_samplelist(self, filepaths, event_id, distribution=None,
to_ids=True, category=None, info=None,
analysis=None, threat_level_id=None):
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
info, analysis, threat_level_id)
files = []
@ -699,7 +772,7 @@ class PyMISP(object):
return self._upload_sample(to_post)
def _upload_sample(self, to_post):
session = self.__prepare_session('json')
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/upload_sample')
response = session.post(url, data=json.dumps(to_post))
return self._check_response(response)
@ -724,7 +797,7 @@ class PyMISP(object):
return self._check_response(response)
def proposal_view(self, event_id=None, proposal_id=None):
session = self.__prepare_session('json')
session = self.__prepare_session()
if proposal_id is not None and event_id is not None:
return {'error': 'You can only view an event ID or a proposal ID'}
if event_id is not None:
@ -734,29 +807,31 @@ class PyMISP(object):
return self.__query_proposal(session, 'view', id)
def proposal_add(self, event_id, attribute):
session = self.__prepare_session('json')
session = self.__prepare_session()
return self.__query_proposal(session, 'add', event_id, attribute)
def proposal_edit(self, attribute_id, attribute):
session = self.__prepare_session('json')
session = self.__prepare_session()
return self.__query_proposal(session, 'edit', attribute_id, attribute)
def proposal_accept(self, proposal_id):
session = self.__prepare_session('json')
session = self.__prepare_session()
return self.__query_proposal(session, 'accept', proposal_id)
def proposal_discard(self, proposal_id):
session = self.__prepare_session('json')
session = self.__prepare_session()
return self.__query_proposal(session, 'discard', proposal_id)
# ##############################
# ######## REST Search #########
# ##############################
def __query(self, session, path, query):
def __query(self, session, path, query, controller='events'):
if query.get('error') is not None:
return query
url = urljoin(self.root_url, 'events/{}'.format(path.lstrip('/')))
if controller not in ['events', 'attributes']:
raise Exception('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes'])))
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
query = {'request': query}
response = session.post(url, data=json.dumps(query))
return self._check_response(response)
@ -801,14 +876,14 @@ class PyMISP(object):
buildup_url += '/search{}:{}'.format(rule, joined)
else:
buildup_url += '/search{}:{}'.format(rule, allowed[rule])
session = self.__prepare_session('json')
session = self.__prepare_session()
url = urljoin(self.root_url, buildup_url)
response = session.get(url)
return self._check_response(response)
def search_all(self, value):
query = {'value': value, 'searchall': 1}
session = self.__prepare_session('json')
session = self.__prepare_session()
return self.__query(session, 'restSearch/download', query)
def __prepare_rest_search(self, values, not_values):
@ -837,7 +912,7 @@ class PyMISP(object):
def search(self, values=None, not_values=None, type_attribute=None,
category=None, org=None, tags=None, not_tags=None, date_from=None,
date_to=None, last=None):
date_to=None, last=None, controller='events'):
"""
Search via the Rest API
@ -879,8 +954,8 @@ class PyMISP(object):
if last is not None:
query['last'] = last
session = self.__prepare_session('json')
return self.__query(session, 'restSearch/download', query)
session = self.__prepare_session()
return self.__query(session, 'restSearch/download', query, controller)
def get_attachement(self, event_id):
"""
@ -890,12 +965,13 @@ class PyMISP(object):
be fetched
"""
attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(event_id))
session = self.__prepare_session('json')
return session.get(attach)
session = self.__prepare_session()
response = session.get(attach)
return self._check_response(response)
def get_yara(self, event_id):
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
session = self.__prepare_session('json')
session = self.__prepare_session()
response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post))
result = self._check_response(response)
if result.get('error') is not None:
@ -907,7 +983,7 @@ class PyMISP(object):
def download_samples(self, sample_hash=None, event_id=None, all_samples=False):
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
session = self.__prepare_session('json')
session = self.__prepare_session()
response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post))
result = self._check_response(response)
if result.get('error') is not None:
@ -949,7 +1025,8 @@ class PyMISP(object):
"""
suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download')
session = self.__prepare_session('rules')
return session.get(suricata_rules)
response = session.get(suricata_rules)
return response
def download_suricata_rule_event(self, event_id):
"""
@ -959,12 +1036,13 @@ class PyMISP(object):
"""
template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
session = self.__prepare_session('rules')
return session.get(template)
response = session.get(template)
return response
# ########## Tags ##########
def get_all_tags(self, quiet=False):
session = self.__prepare_session('json')
session = self.__prepare_session()
url = urljoin(self.root_url, 'tags')
response = session.get(url)
r = self._check_response(response)
@ -978,7 +1056,7 @@ class PyMISP(object):
def new_tag(self, name=None, colour="#00ace6", exportable=False):
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}}
session = self.__prepare_session('json')
session = self.__prepare_session()
url = urljoin(self.root_url, 'tags/add')
response = session.post(url, data=json.dumps(to_post))
return self._check_response(response)
@ -1006,8 +1084,8 @@ class PyMISP(object):
"""
Returns the version of the instance.
"""
session = self.__prepare_session('json')
url = urljoin(self.root_url, 'servers/getVersion')
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/getVersion.json')
response = session.get(url)
return self._check_response(response)
@ -1032,24 +1110,25 @@ class PyMISP(object):
# ############## Statistics ##################
def get_attributes_statistics(self, context='type', percentage=None, force_out=None):
def get_attributes_statistics(self, context='type', percentage=None):
"""
Get attributes statistics from the MISP instance
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
if (context != 'category'):
context = 'type'
if percentage is not None:
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage))
else:
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context))
return session.get(url).json()
response = session.get(url)
return self._check_response(response)
def get_tags_statistics(self, percentage=None, name_sort=None, force_out=None):
def get_tags_statistics(self, percentage=None, name_sort=None):
"""
Get tags statistics from the MISP instance
"""
session = self.__prepare_session(force_out)
session = self.__prepare_session()
if percentage is not None:
percentage = 'true'
else:
@ -1059,55 +1138,34 @@ class PyMISP(object):
else:
name_sort = 'false'
url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort))
return session.get(url).json()
response = session.get(url).json()
return self._check_response(response)
# ############## Sightings ##################
def sighting_per_id(self, attribute_id, force_out=None):
session = self.__prepare_session(force_out)
def sighting_per_id(self, attribute_id):
session = self.__prepare_session()
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id))
return session.post(url)
response = session.post(url)
return self._check_response(response)
def sighting_per_uuid(self, attribute_uuid, force_out=None):
session = self.__prepare_session(force_out)
def sighting_per_uuid(self, attribute_uuid):
session = self.__prepare_session()
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid))
return session.post(url)
response = session.post(url)
return self._check_response(response)
def sighting_per_json(self, json_file, force_out=None):
session = self.__prepare_session(force_out)
def sighting_per_json(self, json_file):
session = self.__prepare_session()
jdata = json.load(open(json_file))
url = urljoin(self.root_url, 'sightings/add/')
return session.post(url, data=json.dumps(jdata))
response = session.post(url, data=json.dumps(jdata))
return self._check_response(response)
# ############## Sharing Groups ##################
def get_sharing_groups(self):
session = self.__prepare_session(force_out=None)
session = self.__prepare_session()
url = urljoin(self.root_url, 'sharing_groups/index.json')
response = session.get(url)
return self._check_response(response)['response'][0]
# ############## Deprecated (Pure XML API should not be used) ##################
@deprecated
def download_all(self):
"""
Download all event from the instance
"""
xml = urljoin(self.root_url, 'events/xml/download')
session = self.__prepare_session('xml')
return session.get(xml)
@deprecated
def download(self, event_id, with_attachement=False):
"""
Download one event in XML
:param event_id: Event id of the event to download (same as get)
"""
if with_attachement:
attach = 'true'
else:
attach = 'false'
template = urljoin(self.root_url, 'events/xml/download/{}/{}'.format(event_id, attach))
session = self.__prepare_session('xml')
return session.get(template)

View File

@ -17,9 +17,12 @@ setup(
'License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Operating System :: POSIX :: Linux',
'Intended Audience :: Science/Research',
'Intended Audience :: Telecommunications Industry',
'Programming Language :: Python',
'Intended Audience :: Information Technology',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Topic :: Security',
'Topic :: Internet',
],

368
tests/describeTypes.json Normal file
View File

@ -0,0 +1,368 @@
{
"result": {
"types": [
"md5",
"sha1",
"sha256",
"filename",
"pdb",
"filename|md5",
"filename|sha1",
"filename|sha256",
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"email-src",
"email-dst",
"email-subject",
"email-attachment",
"url",
"http-method",
"user-agent",
"regkey",
"regkey|value",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"vulnerability",
"attachment",
"malware-sample",
"link",
"comment",
"text",
"other",
"named pipe",
"mutex",
"target-user",
"target-email",
"target-machine",
"target-org",
"target-location",
"target-external",
"btc",
"iban",
"bic",
"bank-account-nr",
"aba-rtn",
"bin",
"cc-number",
"prtn",
"threat-actor",
"campaign-name",
"campaign-id",
"malware-type",
"uri",
"authentihash",
"ssdeep",
"imphash",
"pehash",
"sha224",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"tlsh",
"filename|authentihash",
"filename|ssdeep",
"filename|imphash",
"filename|pehash",
"filename|sha224",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|tlsh",
"windows-scheduled-task",
"windows-service-name",
"windows-service-displayname",
"whois-registrant-email",
"whois-registrant-phone",
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"targeted-threat-index",
"mailslot",
"pipe",
"ssl-cert-attributes",
"x509-fingerprint-sha1"
],
"categories": [
"Internal reference",
"Targeting data",
"Antivirus detection",
"Payload delivery",
"Artifacts dropped",
"Payload installation",
"Persistence mechanism",
"Network activity",
"Payload type",
"Attribution",
"External analysis",
"Financial fraud",
"Other"
],
"category_type_mappings": {
"Internal reference": [
"link",
"comment",
"text",
"other"
],
"Targeting data": [
"target-user",
"target-email",
"target-machine",
"target-org",
"target-location",
"target-external",
"comment"
],
"Antivirus detection": [
"link",
"comment",
"text",
"attachment",
"other"
],
"Payload delivery": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"pehash",
"tlsh",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"ip-src",
"ip-dst",
"hostname",
"domain",
"email-src",
"email-dst",
"email-subject",
"email-attachment",
"url",
"user-agent",
"AS",
"pattern-in-file",
"pattern-in-traffic",
"yara",
"attachment",
"malware-sample",
"link",
"malware-type",
"comment",
"text",
"vulnerability",
"x509-fingerprint-sha1",
"other"
],
"Artifacts dropped": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"regkey",
"regkey|value",
"pattern-in-file",
"pattern-in-memory",
"pdb",
"yara",
"attachment",
"malware-sample",
"named pipe",
"mutex",
"windows-scheduled-task",
"windows-service-name",
"windows-service-displayname",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Payload installation": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"pehash",
"tlsh",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"vulnerability",
"attachment",
"malware-sample",
"malware-type",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Persistence mechanism": [
"filename",
"regkey",
"regkey|value",
"comment",
"text",
"other"
],
"Network activity": [
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"email-dst",
"url",
"uri",
"user-agent",
"http-method",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"attachment",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Payload type": [
"comment",
"text",
"other"
],
"Attribution": [
"threat-actor",
"campaign-name",
"campaign-id",
"whois-registrant-phone",
"whois-registrant-email",
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"External analysis": [
"md5",
"sha1",
"sha256",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha256",
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"url",
"user-agent",
"regkey",
"regkey|value",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"vulnerability",
"attachment",
"malware-sample",
"link",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Financial fraud": [
"btc",
"iban",
"bic",
"bank-account-nr",
"aba-rtn",
"bin",
"cc-number",
"prtn",
"comment",
"text",
"other"
],
"Other": [
"comment",
"text",
"other"
]
}
}
}

78
tests/misp_event.json Normal file
View File

@ -0,0 +1,78 @@
{
"Attribute": [
{
"ShadowAttribute": [],
"SharingGroup": [],
"category": "Payload delivery",
"comment": "",
"deleted": false,
"distribution": "5",
"event_id": "2",
"id": "7",
"sharing_group_id": "0",
"timestamp": "1465681304",
"to_ids": false,
"type": "url",
"uuid": "575c8598-f1f0-4c16-a94a-0612c0a83866",
"value": "http://fake.website.com/malware/is/here"
},
{
"ShadowAttribute": [],
"SharingGroup": [],
"category": "Payload type",
"comment": "",
"deleted": false,
"distribution": "5",
"event_id": "2",
"id": "6",
"sharing_group_id": "0",
"timestamp": "1465681801",
"to_ids": false,
"type": "text",
"uuid": "575c8549-9010-4555-8b37-057ac0a83866",
"value": "Locky"
}
],
"Org": {
"id": "1",
"name": "ORGNAME",
"uuid": "57586e9a-4a64-4f79-9009-4dc1c0a83866"
},
"Orgc": {
"id": "1",
"name": "ORGNAME",
"uuid": "57586e9a-4a64-4f79-9009-4dc1c0a83866"
},
"RelatedEvent": [],
"ShadowAttribute": [],
"Tag": [
{
"colour": "#005a5a",
"exportable": true,
"id": "6",
"name": "ecsirt:malicious-code=\"ransomware\""
},
{
"colour": "#142bf7",
"exportable": true,
"id": "1",
"name": "for_intelmq_processing"
}
],
"analysis": "0",
"attribute_count": "2",
"date": "2016-06-09",
"distribution": "0",
"id": "2",
"info": "A Random Event",
"locked": false,
"org_id": "1",
"orgc_id": "1",
"proposal_email_lock": false,
"publish_timestamp": "0",
"published": false,
"sharing_group_id": "0",
"threat_level_id": "1",
"timestamp": "1465681801",
"uuid": "5758ebf5-c898-48e6-9fe9-5665c0a83866"
}

34
tests/new_misp_event.json Normal file
View File

@ -0,0 +1,34 @@
{
"Event": {
"uuid": "57c06bb1-625c-4d34-9b9f-4066950d210f",
"orgc_id": "1",
"publish_timestamp": "0",
"RelatedEvent": [],
"org_id": "1",
"Org": {
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f",
"name": "CIRCL",
"id": "1"
},
"attribute_count": null,
"distribution": "0",
"sharing_group_id": "0",
"threat_level_id": "1",
"locked": false,
"Attribute": [],
"published": false,
"ShadowAttribute": [],
"date": "2016-08-26",
"info": "This is a test",
"timestamp": "1472228273",
"Orgc": {
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f",
"name": "CIRCL",
"id": "1"
},
"id": "594",
"proposal_email_lock": false,
"analysis": "0"
}
}

100
tests/sharing_groups.json Normal file
View File

@ -0,0 +1,100 @@
{
"response": [
{
"SharingGroup": {
"id": "1",
"name": "PrivateTrustedGroup",
"description": "",
"releasability": "",
"local": true,
"active": true
},
"Organisation": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"SharingGroupOrg": [
{
"id": "1",
"sharing_group_id": "1",
"org_id": "1",
"extend": true,
"Organisation": {
"name": "CIRCL",
"id": "1",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
}
},
{
"id": "2",
"sharing_group_id": "1",
"org_id": "2",
"extend": false,
"Organisation": {
"name": "PifPafPoum",
"id": "2",
"uuid": "56bf12a7-c19c-4b98-83e7-d9bb02de0b81"
}
}
],
"SharingGroupServer": [
{
"all_orgs": false,
"server_id": "0",
"sharing_group_id": "1",
"Server": []
}
],
"editable": true
},
{
"SharingGroup": {
"id": "2",
"name": "test",
"description": "",
"releasability": "",
"local": true,
"active": true
},
"Organisation": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"SharingGroupOrg": [
{
"id": "3",
"sharing_group_id": "2",
"org_id": "1",
"extend": true,
"Organisation": {
"name": "CIRCL",
"id": "1",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
}
},
{
"id": "4",
"sharing_group_id": "2",
"org_id": "2",
"extend": false,
"Organisation": {
"name": "PifPafPoum",
"id": "2",
"uuid": "56bf12a7-c19c-4b98-83e7-d9bb02de0b81"
}
}
],
"SharingGroupServer": [
{
"all_orgs": false,
"server_id": "0",
"sharing_group_id": "2",
"Server": []
}
],
"editable": true
}
]
}

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
from pymisp import PyMISP
from keys import url, key
@ -47,7 +48,7 @@ class TestBasic(unittest.TestCase):
u'Org': {u'name': u'ORGNAME'},
u'Orgc': {u'name': u'ORGNAME'},
u'threat_level_id': u'1'}}
print event
print(event)
self.assertEqual(event, to_check, 'Failed at creating a new Event')
return int(event_id)
@ -99,15 +100,15 @@ class TestBasic(unittest.TestCase):
def delete(self, eventid):
event = self.misp.delete_event(eventid)
print event.json()
print(event)
def delete_attr(self, attrid):
event = self.misp.delete_attribute(attrid)
print event.json()
print(event)
def get(self, eventid):
event = self.misp.get_event(eventid)
print event.json()
print(event)
def get_stix(self, **kwargs):
event = self.misp.get_stix(kwargs)
@ -129,7 +130,7 @@ class TestBasic(unittest.TestCase):
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
event = self.misp.add_event(event)
print event.json()
print(event)
def test_create_event(self):
eventid = self.new_event()
@ -155,6 +156,9 @@ class TestBasic(unittest.TestCase):
time.sleep(1)
self.delete(eventid)
def test_one_or_more(self):
self.assertEqual(self.misp._one_or_more(1), (1,))
self.assertEqual(self.misp._one_or_more([1]), [1])
if __name__ == '__main__':
unittest.main()

122
tests/test_offline.py Normal file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import unittest
import requests_mock
import json
import pymisp as pm
from pymisp import PyMISP
@requests_mock.Mocker()
class TestOffline(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.domain = 'http://misp.local/'
self.key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
self.event = {'Event': json.load(open('tests/misp_event.json', 'r'))}
self.new_misp_event = {'Event': json.load(open('tests/new_misp_event.json', 'r'))}
self.types = json.load(open('tests/describeTypes.json', 'r'))
self.sharing_groups = json.load(open('tests/sharing_groups.json', 'r'))
self.auth_error_msg = {"name": "Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header.",
"message": "Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header.",
"url": "\/events\/1"}
def initURI(self, m):
m.register_uri('GET', self.domain + 'events/1', json=self.auth_error_msg, status_code=403)
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.50"})
m.register_uri('GET', self.domain + 'sharing_groups/index.json', json=self.sharing_groups)
m.register_uri('GET', self.domain + 'attributes/describeTypes.json', json=self.types)
m.register_uri('GET', self.domain + 'events/2', json=self.event)
m.register_uri('POST', self.domain + 'events/2', json=self.event)
m.register_uri('DELETE', self.domain + 'events/2', json={'message': 'Event deleted.'})
m.register_uri('DELETE', self.domain + 'events/3', json={'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
m.register_uri('DELETE', self.domain + 'attributes/2', json={'message': 'Attribute deleted.'})
def test_getEvent(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
e1 = pymisp.get_event(2)
e2 = pymisp.get(2)
self.assertEqual(e1, e2)
self.assertEqual(self.event, e2)
def test_updateEvent(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
e0 = pymisp.update_event(2, json.dumps(self.event))
e1 = pymisp.update_event(2, self.event)
self.assertEqual(e0, e1)
e2 = pymisp.update(e0)
self.assertEqual(e1, e2)
self.assertEqual(self.event, e2)
def test_deleteEvent(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
d = pymisp.delete_event(2)
self.assertEqual(d, {'message': 'Event deleted.'})
d = pymisp.delete_event(3)
self.assertEqual(d, {'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
def test_deleteAttribute(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
d = pymisp.delete_attribute(2)
self.assertEqual(d, {'message': 'Attribute deleted.'})
def test_publish(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
e = pymisp.publish(self.event)
pub = self.event
pub['Event']['published'] = True
self.assertEqual(e, pub)
e = pymisp.publish(self.event)
self.assertEqual(e, {'error': 'Already published'})
def test_getVersions(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
api_version = pymisp.get_api_version()
self.assertEqual(api_version, {'version': pm.__version__})
server_version = pymisp.get_version()
self.assertEqual(server_version, {"version": "2.4.50"})
def test_getSharingGroups(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
sharing_groups = pymisp.get_sharing_groups()
self.assertEqual(sharing_groups, self.sharing_groups['response'][0])
def test_auth_error(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
error = pymisp.get(1)
response = self.auth_error_msg
response['errors'] = [response['message']]
self.assertEqual(error, response)
def test_newEvent(self, m):
error_empty_info = {'message': 'The event could not be saved.', 'name': 'Add event failed.', 'errors': {'Event': {'info': ['Info cannot be empty.']}}, 'url': '/events/add'}
error_empty_info_flatten = {u'message': u'The event could not be saved.', u'name': u'Add event failed.', u'errors': [u"Error in info: Info cannot be empty."], u'url': u'/events/add'}
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
with self.assertRaises(pm.api.NewEventError):
pymisp.new_event()
with self.assertRaises(pm.api.NewEventError):
pymisp.new_event(0)
with self.assertRaises(pm.api.NewEventError):
pymisp.new_event(0, 1)
m.register_uri('POST', self.domain + 'events', json=error_empty_info)
response = pymisp.new_event(0, 1, 0)
self.assertEqual(response, error_empty_info_flatten)
m.register_uri('POST', self.domain + 'events', json=self.new_misp_event)
response = pymisp.new_event(0, 1, 0, "This is a test.", '2016-08-26', False)
self.assertEqual(response, self.new_misp_event)
if __name__ == '__main__':
unittest.main()