Merge pull request #1 from MISP/master

Syncing with upsteam
pull/178/head
Kory Kyzar 2018-01-19 12:42:06 -06:00 committed by GitHub
commit 9dd04f8664
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 12595 additions and 966 deletions

View File

@ -20,13 +20,13 @@ install:
- pip install -U nose pip setuptools
- pip install coveralls codecov requests-mock
- pip install git+https://github.com/kbandla/pydeep.git
- pip install .[fileobjects,neo,openioc]
- pip install .[fileobjects,neo,openioc,virustotal]
- pushd tests
- git clone https://github.com/viper-framework/viper-test-files.git
- popd
script:
- nosetests --with-coverage --cover-package=pymisp tests/test_offline.py
- nosetests --with-coverage --cover-package=pymisp tests/test_*.py
after_success:
- codecov

View File

@ -2,6 +2,370 @@ Changelog
=========
v2.4.85.1 (2018-01-10)
----------------------
Changes
~~~~~~~
- Bump version. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Cleanup from last commit. [Raphaël Vinot]
- Move MISPTag to Abstract MISP. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Fix tests (new template version) [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Add test for loading existing malware sample from MISP. [Raphaël
Vinot]
- Multiple changes. [Raphaël Vinot]
* Fix timestamp dump (properly enforce UTC)
* Properly handle proposals
* Add many getter/setter
* Add dedicated test cases for MISPEvent and other objects
- Allow do pass a category in default_attributes_parameters for object.
[Raphaël Vinot]
fix #166
- Default for sharing_group_id is 0. [Raphaël Vinot]
- Add MISPSighting class. [Raphaël Vinot]
- Bump Changelog. [Raphaël Vinot]
Fix
~~~
- Edited method works as expected, add tests. [Raphaël Vinot]
- Forgotten test files in last commit... [Raphaël Vinot]
- Disable_correlation from template not properly used. [Raphaël Vinot]
- Don't remove the distribution and sharing_group_id from
default_attributes_parameters. [Raphaël Vinot]
- The sharing_group_id isn't required. [Raphaël Vinot]
- Last commit was broken... [Raphaël Vinot]
- Properly set Tag to attributes within objects. [Raphaël Vinot]
- Add method to add tags to objects. [Raphaël Vinot]
Fix #160
- Typo in set_sightings. [Raphaël Vinot]
Fix #161
Other
~~~~~
- Merge pull request #164 from MISP/refactor. [Raphaël Vinot]
chg: Multiple changes
- Merge pull request #162 from AninaAntonie/patch-1. [Raphaël Vinot]
fix: set_sightings
- Set_sightings. [AninaAntonie]
Maybe I didn't use it correctly but the method set_sightings didn't work for me. It's working now but I'm not sure whether sending a request for every sighting in the list is the best solution.
- Merge pull request #165 from dadokkio/master. [Raphaël Vinot]
_default_attributes_parameters - if set - is a dict
- _default_attributes_parameters - if set - is a dict. [Arcuri Davide]
Manage distribution and sharing_group_id as dict key like the other fields.
-- Not sure about default
v2.4.85 (2017-12-22)
--------------------
New
~~~
- Add last field to get_csv. [Raphaël Vinot]
- (hopefully) Cleverer handling of timestamps in the objects. [Raphaël
Vinot]
& some cleanup
Changes
~~~~~~~
- Bump misp-objects. [Raphaël Vinot]
- Version bump. [Raphaël Vinot]
- Update documentation. [Raphaël Vinot]
- Update documentation, cleanup. [Raphaël Vinot]
- Bump describeTypes.json. [Raphaël Vinot]
- Validate attributes in attributes.setter. [Raphaël Vinot]
- Add get_attribute_tag method at MISPEvent level. [Raphaël Vinot]
Also add a MISPTag class for consistency.
- Bump misp-objects. [Raphaël Vinot]
- Bump describeTypes. [Raphaël Vinot]
- Add __repr__ methods (fix last commit) [Raphaël Vinot]
- Add __repr__ methods. [Raphaël Vinot]
- Use new format for filtering. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Bump describeTypes. [Raphaël Vinot]
Fix
~~~
- Properly use the edited flag. [Raphaël Vinot]
- Add setter for Attribute in MISPEvent. [Raphaël Vinot]
- Forgotten calls to master class. [Raphaël Vinot]
- Properly call datetime.datetime.utcfromtimestamp. [Raphaël Vinot]
- Fix typo. [Raphaël Vinot]
- Fix python2.7 support. [Raphaël Vinot]
- Initialize default class parameters. [Raphaël Vinot]
Fix #155
Other
~~~~~
- Merge branch 'cvandeplas-master' [Raphaël Vinot]
- Merge branch 'master' of https://github.com/cvandeplas/PyMISP into
cvandeplas-master. [Raphaël Vinot]
- Merge remote-tracking branch 'MISP/master' [Christophe Vandeplas]
- Fix MISPObject missing distribution and sharing_group_id. [Christophe
Vandeplas]
- fix MISPObject missing distribution concept
- fix language typo paramaters => parameters
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #156 from cvandeplas/master. [Alexandre Dulaunoy]
document submodule downloading
- Document submodule downloading. [Christophe Vandeplas]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #154 from wagner-certat/inc-meta. [Raphaël Vinot]
Include documentation and examples in source dist
- Include documentation and examples in source dist. [Sebastian Wagner]
v2.4.84 (2017-12-13)
--------------------
New
~~~
- Add methods to get taxonomy(ies) [Raphaël Vinot]
Thanks to @truckydev
- Add method to get all the events modified in an interval. [Raphaël
Vinot]
Changes
~~~~~~~
- Bump misp-objects. [Raphaël Vinot]
- Bump Changelog. [Raphaël Vinot]
- Bump version. [Raphaël Vinot]
- Make the library easier to use. [Raphaël Vinot]
- Allow to pass a pseudofile to LIEF. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Update changelog. [Raphaël Vinot]
Fix
~~~
- Disable pseudofile support in py2, skip tests. [Raphaël Vinot]
- Typo in error output text description. [Eric Jaw]
Other
~~~~~
- Merge pull request #151 from MISP/refactor. [Raphaël Vinot]
chg: Make the library easier to use
- Merge pull request #150 from sdrees/first-friendly-contribution-
enhance-coverage. [Raphaël Vinot]
First friendly contribution enhance coverage
- Further tests added (for public methods) [Stefan Hagen (Individual)]
- Changed asserts from dict usecases to set comparison to workaround non
3.6 behavior. [Stefan Hagen (Individual)]
- Merge branch 'master' of https://github.com/MISP/PyMISP into first-
friendly-contribution-enhance-coverage. [Stefan Hagen (Individual)]
- Enhance coverage and fix en passant with focus on api. [Stefan Hagen
(Individual)]
- Merge branch 'truckydev-get_last_modified_event' [Raphaël Vinot]
- Merge branch 'get_last_modified_event' of
https://github.com/truckydev/PyMISP into truckydev-
get_last_modified_event. [Raphaël Vinot]
- - Correction for 'last' param. 'last' gives the latest events that
have been published - add get_events_last_modified() this function
returns the modified events based on timestamp. [Tristan METAYER]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #149 from naisanza/master. [Raphaël Vinot]
fix: Typo in error output text description
v2.4.83 (2017-12-06)
--------------------
New
~~~
- Add get CSV method. [Raphaël Vinot]
Changes
~~~~~~~
- Allow to pass a proxy to query VT. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Bump version to 2.4.83. [Raphaël Vinot]
- Do not get the event from the server before publishing if
PyMISP.publish gets an ID. [Raphaël Vinot]
- Add live tests for recommended pymisp version and describeTypes up-to-
date. [Raphaël Vinot]
- Add a way to check if the ACL is up-to-date. [Raphaël Vinot]
- Add validators for describeTypes on the live instance. [Raphaël Vinot]
- Update PDF link to doc. [Raphaël Vinot]
- Add example file to push OpenIOC file to MISP. [Raphaël Vinot]
chg: Add some imports in the tool's init file
- Bump misp-objects. [Raphaël Vinot]
- Change version number to master in the doc. [Raphaël Vinot]
- Add new objects: MISPUser and MISPOrganisation. [Raphaël Vinot]
- Add a generic MISP object generator. [Raphaël Vinot]
- Allow to add multiple attribute of the same type. [Raphaël Vinot]
- Add fast publish method. [Raphaël Vinot]
Fix #86
- Improve documentation. [Raphaël Vinot]
Fix #121
Fix
~~~
- Typo in the tests. [Raphaël Vinot]
- Typo in live tests. [Raphaël Vinot]
- Bump describeTypes.json. [Raphaël Vinot]
Add testing
Other
~~~~~
- Merge pull request #147 from StrayLightning/master. [Raphaël Vinot]
Check explicitly for a 500 response from the server with no response content
- Improve the exception message for a server 500+ response with no
response content. [StrayLightning]
- Check for zero-length 500 response from the server and produce a
suitable error message. [StrayLightning]
In experimenting with PyMISP I am triggering problems on the server I
am using. Occasionally the server will return a 500 response with a
message indicating an internal error, but more often than not it returns
a 500 response with no contents, and _check_response falls over itself,
generating hard-to-fathom exception from the json internals.
This commit hardens _check_response by detecting zero-length responses
and raising a suitable exception.
Also fix a missing bracket in one of the subsequent exception strings.
- Merge pull request #146 from c-goes/lief_integrity_exception. [Raphaël
Vinot]
Make FileObject creation work if lief parsing fails
- Make FileObject creation work if lief parsing fails. [c-goes]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #144 from c-goes/objects_delete. [Raphaël Vinot]
allow deletion of objects and object references
- Allow deletion of objects and object references. [c-goes]
- Update doc badge links. [Raphaël Vinot]
- Merge pull request #143 from 3c7/feature/send_attributes. [Raphaël
Vinot]
Adding multiple named attributes (without proposal) require a single POST request now
- Adding multiple named attributes require a single POST request now.
[3c7]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #142 from c-goes/master. [Raphaël Vinot]
replaced is_digit() with isdigit()
- Fixed typo. [c-goes]
- Merge remote-tracking branch 'upstream/master' [c-goes]
- Merge pull request #141 from SteveClement/master. [Raphaël Vinot]
Remove CIRCL repo references from README.md & fix epydoc
- - Remove CIRCL reference from README.md - Updated 2 bad indentations
where epydoc was Warning. [Steve Clement]
- Merge remote-tracking branch 'upstream/master' [c-goes]
- Merge branch 'master' of https://github.com/MISP/PyMISP into
messageidtype. [c-goes]
- Added default_category for email-message-id. [c-goes]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge branch 'feature/feedgenerator_rework' [iglocska]
- Merge branch 'master' of https://github.com/MISP/PyMISP into
feature/feedgenerator_rework. [iglocska]
- Rework of the feed generator. [iglocska]
- use objects, attribute tags and object references correctly
- generate quickhashlist for fast lookups / future MISP caching mechanism
- saner structure (herp-a-derp)
v2.4.82 (2017-11-09)
--------------------
New
~~~
- Proper debug system. [Raphaël Vinot]
Make it easy to investigate the json blobs sent to the server.
Changes
~~~~~~~
- Bump PyMISP version. [Raphaël Vinot]
- Bump CHANGELOG. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Update readme for new logging system. [Raphaël Vinot]
- Small improvments in the logging system. [Raphaël Vinot]
- Properly use python logging module. [Raphaël Vinot]
- Update asciidoctor generator. [Raphaël Vinot]
- Remove warning if PyMISP is too new. [Raphaël Vinot]
- Add simple asciidoc generator for MISP event. [Raphaël Vinot]
- Update changelog. [Raphaël Vinot]
Fix
~~~
- Typo loger -> logger. [Raphaël Vinot]
- Let load unknown object relations in known templates. [Raphaël Vinot]
This isn't recommended, but happens very often.
- Allow to load non-malware ZIP files in MISP Event. [Raphaël Vinot]
Prior to his patch, any zip file loaded by MISP Event was unpacked and
processed as an excrypted malware from MISP.
- Properly pass the distribution when uploading a sample. [Raphaël
Vinot]
- Properly upload a sample in an existing event. [Raphaël Vinot]
Fix https://github.com/MISP/PyMISP/issues/123
- Properly set the distribution at event level. [Raphaël Vinot]
fix #120
- Properly pop the distribution key. [Raphaël Vinot]
- Update dependencies for VT generator. [Raphaël Vinot]
Other
~~~~~
- Merge pull request #126 from CenturyLinkCIRT/master. [Raphaël Vinot]
Added vt_to_misp.py example and VTReportObject
- Merge branch 'master' of https://github.com/MISP/PyMISP. [Thomas
Gardner]
- Fix test suite. [Raphaël Vinot]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #122 from LDO-CERT/master. [Raphaël Vinot]
Created add_generic_object.py
- Created add_generic_object.py. [garanews]
usage: add_generic_object.py [-h] -e EVENT -t TYPE -d DICT
Examples:
python3 add_generic_object.py -e 1683 -t email -d '{"subject":"The Pink Letter", "to":"jon@snow.org"}'
python3 add_generic_object.py -e 2343 -t person -d '{"first-name":"Daenerys", "last-name":"Targaryen", "place-of-birth":"Dragonstone"}'
python3 add_generic_object.py -e 3596 -t "domain|ip" -d '{"domain":"stormborn.org", "ip":"50.63.202.33"}'
- Added vtreportobject and vt_to_misp example. [Thomas Gardner]
- Created add_generic_object.py. [garanews]
usage: add_generic_object.py [-h] -e EVENT -t TYPE -d DICT
Examples:
python3 add_generic_object.py -e 1683 -t email -d '{"subject":"The Pink Letter", "to":"jon@snow.org"}'
python3 add_generic_object.py -e 2343 -t person -d '{"first-name":"Daenerys", "last-name":"Targaryen", "place-of-birth":"Dragonstone"}'
python3 add_generic_object.py -e 3596 -t "domain|ip" -d '{"domain":"stormborn.org", "ip":"50.63.202.33"}'
v2.4.81.2 (2017-10-24)
----------------------

View File

@ -1,4 +1,10 @@
graft docs
graft examples
graft tests
include CHANGELOG.txt
include LICENSE
include pymisp/data/*.json
include pymisp/data/misp-objects/*.json
include pymisp/data/misp-objects/objects/*/definition.json
include pymisp/data/misp-objects/relationships/definition.json
include README.md

View File

@ -1,7 +1,7 @@
README
======
[![Documentation Status](https://readthedocs.org/projects/pymisp/badge/?version=master)](http://pymisp.readthedocs.io/en/master/?badge=master)
[![Documentation Status](https://readthedocs.org/projects/pymisp/badge/?version=latest)](http://pymisp.readthedocs.io/?badge=latest)
[![Build Status](https://travis-ci.org/MISP/PyMISP.svg?branch=master)](https://travis-ci.org/MISP/PyMISP)
[![Coverage Status](https://coveralls.io/repos/github/MISP/PyMISP/badge.svg?branch=master)](https://coveralls.io/github/MISP/PyMISP?branch=master)
@ -24,7 +24,8 @@ pip3 install pymisp
## Install the latest version from repo
```
git clone https://github.com/CIRCL/PyMISP.git && cd PyMISP
git clone https://github.com/MISP/PyMISP.git && cd PyMISP
git submodule update --init
pip3 install -I .
```
@ -50,14 +51,41 @@ cd examples
python3 last.py -l 10
```
## Debugging
You have two options there:
1. Pass `debug=True` to `PyMISP` and it will enable logging.DEBUG to stderr on the whole module
2. Use the python logging module directly:
```python
import logging
logger = logging.getLogger('pymisp')
# Configure it as you whish, for example, enable DEBUG mode:
logger.setLevel(logging.DEBUG)
```
Or if you want to write the debug output to a file instead of stderr:
```python
import pymisp
import logging
logger = logging.getLogger('pymisp')
logging.basicConfig(level=logging.DEBUG, filename="debug.log", filemode='w', format=pymisp.FORMAT)
```
## Documentation
[PyMISP API documentation is available](https://media.readthedocs.org/pdf/pymisp/master/pymisp.pdf).
[PyMISP API documentation is available](https://media.readthedocs.org/pdf/pymisp/latest/pymisp.pdf).
Documentation can be generated with epydoc:
```
epydoc --url https://github.com/CIRCL/PyMISP --graph all --name PyMISP --pdf pymisp -o doc
epydoc --url https://github.com/MISP/PyMISP --graph all --name PyMISP --pdf pymisp -o doc
```
## Everything is a Mutable Mapping

View File

@ -17,10 +17,6 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from recommonmark.parser import CommonMarkParser
# -- General configuration ------------------------------------------------
@ -78,9 +74,9 @@ author = 'Raphaël Vinot'
# built documents.
#
# The short X.Y version.
version = '2.4'
version = 'master'
# The full version, including alpha/beta/rc tags.
release = '2.4.77'
release = 'master'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.

View File

@ -9,12 +9,11 @@ Welcome to PyMISP's documentation!
Contents:
.. toctree::
:maxdepth: 2
:maxdepth: 4
README
modules
tools
Indices and tables
==================
@ -22,4 +21,3 @@ Indices and tables
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -4,4 +4,82 @@ pymisp
.. toctree::
:maxdepth: 4
pymisp
.. automodule:: pymisp
:members:
PyMISP
------
.. autoclass:: PyMISP
:members:
MISPAbstract
------------
.. autoclass:: AbstractMISP
:members:
MISPEncode
----------
.. autoclass:: MISPEncode
:members:
MISPEvent
---------
.. autoclass:: MISPEvent
:members:
:inherited-members:
MISPAttribute
-------------
.. autoclass:: MISPAttribute
:members:
:inherited-members:
MISPObject
----------
.. autoclass:: MISPObject
:members:
:inherited-members:
MISPObjectAttribute
-------------------
.. autoclass:: MISPObjectAttribute
:members:
:inherited-members:
MISPObjectReference
-------------------
.. autoclass:: MISPObjectReference
:members:
:inherited-members:
MISPTag
-------
.. autoclass:: MISPTag
:members:
:inherited-members:
MISPUser
--------
.. autoclass:: MISPUser
:members:
:inherited-members:
MISPOrganisation
----------------
.. autoclass:: MISPOrganisation
:members:
:inherited-members:

View File

@ -1,22 +0,0 @@
pymisp package
==============
Submodules
----------
pymisp.api module
-----------------
.. automodule:: pymisp.api
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: pymisp
:members:
:undoc-members:
:show-inheritance:

69
docs/source/tools.rst Normal file
View File

@ -0,0 +1,69 @@
pymisp - Tools
==============
.. toctree::
:maxdepth: 4
.. automodule:: pymisp.tools
:members:
File Object
-----------
.. autoclass:: FileObject
:members:
:inherited-members:
ELF Object
----------
.. autoclass:: ELFObject
:members:
:inherited-members:
.. autoclass:: ELFSectionObject
:members:
:inherited-members:
PE Object
---------
.. autoclass:: PEObject
:members:
:inherited-members:
.. autoclass:: PESectionObject
:members:
:inherited-members:
Mach-O Object
-------------
.. autoclass:: MachOObject
:members:
:inherited-members:
.. autoclass:: MachOSectionObject
:members:
:inherited-members:
VT Report Object
----------------
.. autoclass:: VTReportObject
:members:
:inherited-members:
STIX
----
.. automodule:: pymisp.tools.stix
:members:
OpenIOC
--------
.. automethod:: pymisp.tools.load_openioc
.. automethod:: pymisp.tools.load_openioc_file

23
examples/add_generic_object.py Normal file → Executable file
View File

@ -1,24 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from pymisp import PyMISP
from pymisp.tools.abstractgenerator import AbstractMISPObjectGenerator
from pymisp.tools import GenericObjectGenerator
from keys import misp_url, misp_key, misp_verifycert
import argparse
class GenericObject(AbstractMISPObjectGenerator):
def __init__(self, type, data_dict):
super(GenericObject, self).__init__(type)
self.__data = data_dict
self.generate_attributes()
def generate_attributes(self):
for key, value in self.__data.items():
self.add_attribute(key, value=value)
"""
Sample usage:
./add_generic_object.py -e 5065 -t email -l '[{"to": "undisclosed@ppp.com"}, {"to": "second.to@mail.com"}]'
"""
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create a MISP Object selectable by type starting from a dictionary')
parser.add_argument("-e", "--event", required=True, help="Event ID to update")
parser.add_argument("-t", "--type", required=True, help="Type of the generic object")
parser.add_argument("-d", "--dict", required=True, help="Dict ")
parser.add_argument("-l", "--attr_list", required=True, help="List of attributes")
args = parser.parse_args()
pymisp = PyMISP(misp_url, misp_key, misp_verifycert)
@ -29,5 +27,6 @@ if __name__ == '__main__':
print ("Template for type %s not found! Valid types are: %s" % (args.type, valid_types))
exit()
misp_object = GenericObject(args.type.replace("|", "-"), json.loads(args.dict))
misp_object = GenericObjectGenerator(args.type.replace("|", "-"))
misp_object.generate_attributes(json.loads(args.attr_list))
r = pymisp.add_object(args.event, template_id, misp_object)

View File

@ -1,55 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
from datetime import date
import importlib
from pymisp import MISPEvent
from defang import defang
import argparse
from pytaxonomies import Taxonomies
from datetime import date
headers = """
:toc: right
:toclevels: 1
:toc-title: Daily Report
:icons: font
:sectanchors:
:sectlinks:
= Daily report by {org_name}
{date}
:icons: font
"""
event_level_tags = """
IMPORTANT: This event is classified TLP:{value}.
{expanded}
"""
attributes = """
=== Indicator(s) of compromise
{list_attributes}
"""
title = """
== ({internal_id}) {title}
{summary}
"""
types_to_attach = ['ip-dst', 'url', 'domain']
objects_to_attach = ['domain-ip']
class ReportGenerator():
def __init__(self):
def __init__(self, profile="daily_report"):
self.taxonomies = Taxonomies()
self.report = ''
profile_name = "profiles.{}".format(profile)
self.template = importlib.import_module(name=profile_name)
def from_remote(self, event_id):
from pymisp import PyMISP
@ -66,15 +32,16 @@ class ReportGenerator():
def attributes(self):
if not self.misp_event.attributes:
return ''
list_attributes = ''
list_attributes = []
for attribute in self.misp_event.attributes:
if attribute.type in types_to_attach:
list_attributes += "\n* {}\n".format(defang(attribute.value))
if attribute.type in self.template.types_to_attach:
list_attributes.append("* {}".format(defang(attribute.value)))
for obj in self.misp_event.Object:
for attribute in obj.Attribute:
if attribute.type in types_to_attach:
list_attributes += "\n* {}\n".format(defang(attribute.value))
return attributes.format(list_attributes=list_attributes)
if obj.name in self.template.objects_to_attach:
for attribute in obj.Attribute:
if attribute.type in self.template.types_to_attach:
list_attributes.append("* {}".format(defang(attribute.value)))
return self.template.attributes.format(list_attributes="\n".join(list_attributes))
def _get_tag_info(self, machinetag):
return self.taxonomies.revert_machinetag(machinetag)
@ -82,7 +49,7 @@ class ReportGenerator():
def report_headers(self):
content = {'org_name': 'name',
'date': date.today().isoformat()}
self.report += headers.format(**content)
self.report += self.template.headers.format(**content)
def event_level_tags(self):
if not self.misp_event.Tag:
@ -91,7 +58,7 @@ class ReportGenerator():
# Only look for TLP for now
if tag['name'].startswith('tlp'):
tax, predicate = self._get_tag_info(tag['name'])
return event_level_tags.format(value=predicate.predicate.upper(), expanded=predicate.expanded)
return self.template.event_level_tags.format(value=predicate.predicate.upper(), expanded=predicate.expanded)
def title(self):
internal_id = ''
@ -106,34 +73,42 @@ class ReportGenerator():
if a.object_relation == 'summary':
summary = a.value
return title.format(internal_id=internal_id, title=self.misp_event.info,
summary=summary)
return self.template.title.format(internal_id=internal_id, title=self.misp_event.info,
summary=summary)
def asciidoc(self, lang='en'):
self.report += self.title()
self.report += self.event_level_tags()
self.report += self.attributes()
if __name__ == '__main__':
try:
parser = argparse.ArgumentParser(description='Create a human-readable report out of a MISP event')
parser.add_argument("--profile", default="daily_report", help="Profile template to use")
parser.add_argument("-o", "--output", help="Output file to write to (generally ends in .adoc)")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-e", "--event", default=[], nargs='+', help="Event ID to get.")
group.add_argument("-p", "--path", default=[], nargs='+', help="Path to the JSON dump.")
parser = argparse.ArgumentParser(description='Create a human-readable report out of a MISP event')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-e", "--event", default=[], nargs='+', help="Event ID to get.")
group.add_argument("-p", "--path", default=[], nargs='+', help="Path to the JSON dump.")
args = parser.parse_args()
args = parser.parse_args()
report = ReportGenerator(args.profile)
report.report_headers()
report = ReportGenerator()
report.report_headers()
if args.event:
for eid in args.event:
report.from_remote(eid)
report.asciidoc()
else:
for f in args.path:
report.from_file(f)
report.asciidoc()
if args.event:
for eid in args.event:
report.from_remote(eid)
report.asciidoc()
else:
for f in args.path:
report.from_file(f)
report.asciidoc()
print(report.report)
if args.output:
with open(args.output, "w") as ofile:
ofile.write(report.report)
else:
print(report.report)
except ModuleNotFoundError as err:
print(err)

View File

@ -4,28 +4,79 @@
import sys
import json
import os
import hashlib
from pymisp import PyMISP
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels
objectsFields = {
'Attribute': {
'uuid',
'value',
'category',
'type',
'comment',
'data',
'timestamp',
'to_ids'
},
'Event': {
'uuid',
'info',
'threat_level_id',
'analysis',
'timestamp',
'publish_timestamp',
'published',
'date'
},
'Object': {
'name',
'meta-category',
'description',
'template_uuid',
'template_version',
'uuid',
'timestamp',
'distribution',
'sharing_group_id',
'comment'
},
'ObjectReference': {
'uuid',
'timestamp',
'relationship_type',
'comment',
'object_uuid',
'referenced_uuid'
},
'Orgc': {
'name',
'uuid'
},
'Tag': {
'name',
'colour',
'exportable'
}
}
objectsToSave = {'Orgc': {'fields': ['name', 'uuid'],
'multiple': False,
},
'Tag': {'fields': ['name', 'colour', 'exportable'],
'multiple': True,
},
'Attribute': {'fields': ['uuid', 'value', 'category', 'type',
'comment', 'data', 'timestamp', 'to_ids'],
'multiple': True,
},
}
fieldsToSave = ['uuid', 'info', 'threat_level_id', 'analysis',
'timestamp', 'publish_timestamp', 'published',
'date']
objectsToSave = {
'Orgc': {},
'Tag': {},
'Attribute': {
'Tag': {}
},
'Object': {
'Attribute': {
'Tag': {}
},
'ObjectReference': {}
}
}
valid_attribute_distributions = []
attributeHashes = []
def init():
# If we have an old settings.py file then this variable won't exist
@ -36,61 +87,65 @@ def init():
valid_attribute_distributions = ['0', '1', '2', '3', '4', '5']
return PyMISP(url, key, ssl)
def recursiveExtract(container, containerType, leaf, eventUuid):
temp = {}
if containerType in ['Attribute', 'Object']:
if (__blockByDistribution(container)):
return False
for field in objectsFields[containerType]:
if field in container:
temp[field] = container[field]
if (containerType == 'Attribute'):
global attributeHashes
if ('|' in container['type'] or container['type'] == 'malware-sample'):
split = container['value'].split('|')
attributeHashes.append([hashlib.md5(split[0].encode("utf-8")).hexdigest(), eventUuid])
attributeHashes.append([hashlib.md5(split[1].encode("utf-8")).hexdigest(), eventUuid])
else:
attributeHashes.append([hashlib.md5(container['value'].encode("utf-8")).hexdigest(), eventUuid])
children = leaf.keys()
for childType in children:
childContainer = container.get(childType)
if (childContainer):
if (type(childContainer) is dict):
temp[childType] = recursiveExtract(childContainer, childType, leaf[childType], eventUuid)
else:
temp[childType] = []
for element in childContainer:
processed = recursiveExtract(element, childType, leaf[childType], eventUuid)
if (processed):
temp[childType].append(processed)
return temp
def saveEvent(misp, uuid):
result = {}
event = misp.get_event(uuid)
if not event.get('Event'):
print('Error while fetching event: {}'.format(event['message']))
sys.exit('Could not create file for event ' + uuid + '.')
event = __cleanUpEvent(event)
event['Event'] = recursiveExtract(event['Event'], 'Event', objectsToSave, event['Event']['uuid'])
event = json.dumps(event)
eventFile = open(os.path.join(outputdir, uuid + '.json'), 'w')
eventFile.write(event)
eventFile.close()
def __cleanUpEvent(event):
temp = event
event = {'Event': {}}
__cleanupEventFields(event, temp)
__cleanupEventObjects(event, temp)
return event
def __cleanupEventFields(event, temp):
for field in fieldsToSave:
if field in temp['Event'].keys():
event['Event'][field] = temp['Event'][field]
return event
def __blockAttributeByDistribution(attribute):
if attribute['distribution'] not in valid_attribute_distributions:
def __blockByDistribution(element):
if element['distribution'] not in valid_attribute_distributions:
return True
return False
def saveHashes():
if not attributeHashes:
return False
try:
hashFile = open(os.path.join(outputdir, 'hashes.csv'), 'w')
for element in attributeHashes:
hashFile.write('{},{}\n'.format(element[0], element[1]))
hashFile.close()
except Exception as e:
print(e)
sys.exit('Could not create the quick hash lookup file.')
def __cleanupEventObjects(event, temp):
for objectType in objectsToSave.keys():
if objectsToSave[objectType]['multiple'] is True:
if objectType in temp['Event']:
for objectInstance in temp['Event'][objectType]:
if objectType is 'Attribute':
if __blockAttributeByDistribution(objectInstance):
continue
tempObject = {}
for field in objectsToSave[objectType]['fields']:
if field in objectInstance.keys():
tempObject[field] = objectInstance[field]
if objectType not in event['Event']:
event['Event'][objectType] = []
event['Event'][objectType].append(tempObject)
else:
tempObject = {}
for field in objectsToSave[objectType]['fields']:
tempObject[field] = temp['Event'][objectType][field]
event['Event'][objectType] = tempObject
return event
def saveManifest(manifest):
@ -138,4 +193,6 @@ if __name__ == '__main__':
print("Event " + str(counter) + "/" + str(total) + " exported.")
counter += 1
saveManifest(manifest)
print('Manifest saved. Feed creation completed.')
print('Manifest saved.')
saveHashes()
print('Hashes saved. Feed creation completed.')

View File

@ -16,9 +16,9 @@ outputdir = 'output'
# you can use on the event index, such as organisation, tags, etc.
# It uses the same joining and condition rules as the API parameters
# For example:
# filters = {'tag':'tlp:white|feed-export|!privint','org':'CIRCL'}
# the above would generate a feed for all events created by CIRCL, tagged
# tlp:white and/or feed-export but exclude anything tagged privint
# filters = {'tag':'tlp:white|feed-export|!privint','org':'CIRCL', 'published':1}
# the above would generate a feed for all published events created by CIRCL,
# tagged tlp:white and/or feed-export but exclude anything tagged privint
filters = {}

View File

@ -39,7 +39,7 @@ if __name__ == '__main__':
args = parser.parse_args()
if args.output is not None and os.path.exists(args.output):
print('Output file already exists, abord.')
print('Output file already exists, abort.')
exit(0)
misp = init(misp_url, misp_key)

28
examples/get_csv.py Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
from pymisp import PyMISP
from keys import misp_url, misp_key, misp_verifycert
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get MISP stuff as CSV.')
parser.add_argument("-e", "--event_id", help="Event ID to fetch. Without it, it will fetch the whole database.")
parser.add_argument("-a", "--attribute", nargs='+', help="Attribute column names")
parser.add_argument("-o", "--object_attribute", nargs='+', help="Object attribute column names")
parser.add_argument("-t", "--misp_types", nargs='+', help="MISP types to fetch (ip-src, hostname, ...)")
parser.add_argument("-c", "--context", action='store_true', help="Add event level context (tags...)")
parser.add_argument("-i", "--ignore", action='store_true', help="Returns the attributes even if the event isn't published, or the attribute doesn't have the to_ids flag")
parser.add_argument("-f", "--outfile", help="Output file to write the CSV.")
args = parser.parse_args()
pymisp = PyMISP(misp_url, misp_key, misp_verifycert, debug=True)
response = pymisp.get_csv(args.event_id, args.attribute, args.object_attribute, args.misp_types, args.context, args.ignore)
if args.outfile:
with open(args.outfile, 'w') as f:
f.write(response)
else:
print(response)

27
examples/openioc_to_misp.py Executable file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
from pymisp import PyMISP
from keys import misp_url, misp_key, misp_verifycert
from pymisp.tools import load_openioc_file
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Convert an OpenIOC file to a MISPEvent. Optionnaly send it to MISP.')
parser.add_argument("-i", "--input", required=True, help="Input file")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-o", "--output", help="Output file")
group.add_argument("-m", "--misp", action='store_true', help="Create new event on MISP")
args = parser.parse_args()
misp_event = load_openioc_file(args.input)
if args.misp:
pymisp = PyMISP(misp_url, misp_key, misp_verifycert, debug=True)
pymisp.add_event(misp_event)
else:
with open(args.output, 'w') as f:
f.write(misp_event.to_json())

View File

View File

@ -0,0 +1,37 @@
types_to_attach = ['ip-dst', 'url', 'domain']
objects_to_attach = ['domain-ip']
headers = """
:toc: right
:toclevels: 1
:toc-title: Daily Report
:icons: font
:sectanchors:
:sectlinks:
= Daily report by {org_name}
{date}
:icons: font
"""
event_level_tags = """
IMPORTANT: This event is classified TLP:{value}.
{expanded}
"""
attributes = """
=== Indicator(s) of compromise
{list_attributes}
"""
title = """
== ({internal_id}) {title}
{summary}
"""

View File

@ -0,0 +1,33 @@
types_to_attach = ['ip-dst', 'url', 'domain', 'md5']
objects_to_attach = ['domain-ip', 'file']
headers = """
:toc: right
:toclevels: 1
:toc-title: Weekly Report
:icons: font
:sectanchors:
:sectlinks:
= Weekly report by {org_name}
{date}
:icons: font
"""
event_level_tags = """
"""
attributes = """
=== Indicator(s) of compromise
{list_attributes}
"""
title = """
== ({internal_id}) {title}
{summary}
"""

View File

@ -1,13 +1,40 @@
__version__ = '2.4.81.2'
__version__ = '2.4.85.1'
import sys
import logging
import functools
import warnings
logger = logging.getLogger(__name__)
FORMAT = "%(levelname)s [%(filename)s:%(lineno)s - %(funcName)s() ] %(message)s"
logging.basicConfig(stream=sys.stderr, level=logging.WARNING, format=FORMAT)
def deprecated(func):
'''This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.'''
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.showwarning(
"Call to deprecated function {}.".format(func.__name__),
category=DeprecationWarning,
filename=func.__code__.co_filename,
lineno=func.__code__.co_firstlineno + 1
)
return func(*args, **kwargs)
return new_func
try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate # noqa
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat # noqa
from .api import PyMISP # noqa
from .abstract import AbstractMISP, MISPEncode # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa
from .tools import openioc # noqa
except ImportError:
pass
logger.debug('pymisp loaded properly')
except ImportError as e:
logger.warning('Unable to load pymisp properly: {}'.format(e))

View File

@ -2,14 +2,36 @@
# -*- coding: utf-8 -*-
import abc
import sys
import datetime
import json
from json import JSONEncoder
import collections
import six # Remove that import when discarding python2 support.
import logging
from .exceptions import PyMISPInvalidFormat
logger = logging.getLogger('pymisp')
if six.PY2:
import warnings
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.5")
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
class MISPEncode(JSONEncoder):
@ -25,7 +47,24 @@ class AbstractMISP(collections.MutableMapping):
__not_jsonable = []
def __init__(self, **kwargs):
"""Abstract class for all the MISP objects"""
super(AbstractMISP, self).__init__()
self.__edited = True # As we create a new object, we assume it is edited
# List of classes having tags
from .mispevent import MISPAttribute, MISPEvent
self.__has_tags = (MISPAttribute, MISPEvent)
if isinstance(self, self.__has_tags):
self.Tag = []
setattr(AbstractMISP, 'add_tag', AbstractMISP.__add_tag)
setattr(AbstractMISP, 'tags', property(AbstractMISP.__get_tags, AbstractMISP.__set_tags))
@property
def properties(self):
"""All the class public properties that will be dumped in the dictionary, and the JSON export.
Note: all the properties starting with a `_` (private), or listed in __not_jsonable will be skipped.
"""
to_return = []
for prop, value in vars(self).items():
if prop.startswith('_') or prop in self.__not_jsonable:
@ -34,15 +73,24 @@ class AbstractMISP(collections.MutableMapping):
return to_return
def from_dict(self, **kwargs):
"""Loading all the parameters as class properties, if they aren't `None`.
This method aims to be called when all the properties requiring a special
treatment are processed.
Note: This method is used when you initialize an object with existing data so by default,
the class is flaged as not edited."""
for prop, value in kwargs.items():
if value is None:
continue
setattr(self, prop, value)
# We load an existing dictionary, marking it an not-edited
self.__edited = False
def update_not_jsonable(self, *args):
"""Add entries to the __not_jsonable list"""
self.__not_jsonable += args
def set_not_jsonable(self, *args):
"""Set __not_jsonable to a new list"""
self.__not_jsonable = args
def from_json(self, json_string):
@ -50,22 +98,42 @@ class AbstractMISP(collections.MutableMapping):
self.from_dict(json.loads(json_string))
def to_dict(self):
"""Dump the lass to a dictionary.
This method automatically removes the timestamp recursively in every object
that has been edited is order to let MISP update the event accordingly."""
to_return = {}
for attribute in self.properties():
for attribute in self.properties:
val = getattr(self, attribute, None)
if val is None:
continue
elif isinstance(val, list) and len(val) == 0:
continue
if attribute == 'timestamp':
if self.edited:
# In order to be accepted by MISP, the timestamp of an object
# needs to be either newer, or None.
# If the current object is marked as edited, the easiest is to
# skip the timestamp and let MISP deal with it
continue
else:
val = self._datetime_to_timestamp(val)
to_return[attribute] = val
return to_return
def jsonable(self):
"""This method is used by the JSON encoder"""
return self.to_dict()
def to_json(self):
return json.dumps(self.to_dict(), cls=MISPEncode)
"""Dump recursively any class of type MISPAbstract to a json string"""
return json.dumps(self, cls=MISPEncode, sort_keys=True, indent=2)
def __getitem__(self, key):
return getattr(self, key)
try:
return getattr(self, key)
except AttributeError:
# Expected by pop and other dict-related methods
raise KeyError
def __setitem__(self, key, value):
setattr(self, key, value)
@ -78,3 +146,86 @@ class AbstractMISP(collections.MutableMapping):
def __len__(self):
return len(self.to_dict())
@property
def edited(self):
"""Recursively check if an object has been edited and update the flag accordingly
to the parent objects"""
if self.__edited:
return self.__edited
for p in self.properties:
if self.__edited:
break
val = getattr(self, p)
if isinstance(val, AbstractMISP) and val.edited:
self.__edited = True
elif isinstance(val, list) and all(isinstance(a, AbstractMISP) for a in val):
if any(a.edited for a in val):
self.__edited = True
return self.__edited
@edited.setter
def edited(self, val):
"""Set the edit flag"""
if isinstance(val, bool):
self.__edited = val
else:
raise Exception('edited can only be True or False')
def __setattr__(self, name, value):
if name in self.properties:
self.__edited = True
super(AbstractMISP, self).__setattr__(name, value)
def _datetime_to_timestamp(self, d):
"""Convert a datetime.datetime object to a timestamp (int)"""
if isinstance(d, (int, str)) or (sys.version_info < (3, 0) and isinstance(d, unicode)):
# Assume we already have a timestamp
return d
if sys.version_info >= (3, 3):
return int(d.timestamp())
else:
return int((d - datetime.datetime.fromtimestamp(0, UTC())).total_seconds())
def __add_tag(self, tag=None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(tag, str):
misp_tag = MISPTag()
misp_tag.from_dict(name=tag)
elif isinstance(tag, MISPTag):
misp_tag = tag
elif isinstance(tag, dict):
misp_tag = MISPTag()
misp_tag.from_dict(**tag)
elif kwargs:
misp_tag = MISPTag()
misp_tag.from_dict(**kwargs)
else:
raise PyMISPInvalidFormat("The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {}".format(tag))
self.Tag.append(misp_tag)
self.edited = True
def __get_tags(self):
"""Returns a lost of tags associated to this Attribute"""
return self.Tag
def __set_tags(self, tags):
"""Set a list of prepared MISPTag."""
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
else:
raise PyMISPInvalidFormat('All the attributes have to be of type MISPTag.')
class MISPTag(AbstractMISP):
def __init__(self):
super(MISPTag, self).__init__()
def from_dict(self, name, **kwargs):
self.name = name
super(MISPTag, self).from_dict(**kwargs)
def __repr__(self):
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)

File diff suppressed because it is too large Load Diff

View File

@ -121,6 +121,10 @@
"default_category": "Payload installation",
"to_ids": 1
},
"stix2-pattern": {
"default_category": "Payload installation",
"to_ids": 1
},
"sigma": {
"default_category": "Payload installation",
"to_ids": 1
@ -361,6 +365,10 @@
"default_category": "Attribution",
"to_ids": 0
},
"whois-registrant-org": {
"default_category": "Attribution",
"to_ids": 0
},
"whois-registrar": {
"default_category": "Attribution",
"to_ids": 0
@ -373,6 +381,14 @@
"default_category": "Network activity",
"to_ids": 1
},
"x509-fingerprint-md5": {
"default_category": "Network activity",
"to_ids": 1
},
"x509-fingerprint-sha256": {
"default_category": "Network activity",
"to_ids": 1
},
"dns-soa-email": {
"default_category": "Attribution",
"to_ids": 0
@ -409,6 +425,14 @@
"default_category": "Network activity",
"to_ids": 1
},
"mac-address": {
"default_category": "Network activity",
"to_ids": 0
},
"mac-eui-64": {
"default_category": "Network activity",
"to_ids": 0
},
"email-dst-display-name": {
"default_category": "Payload delivery",
"to_ids": 0
@ -482,7 +506,7 @@
"to_ids": 0
},
"gender": {
"default_category": "",
"default_category": "Person",
"to_ids": 0
},
"passport-number": {
@ -593,6 +617,7 @@
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"stix2-pattern",
"sigma",
"cookie",
"vulnerability",
@ -653,9 +678,12 @@
"whois-registrant-email",
"whois-registrant-phone",
"whois-registrant-name",
"whois-registrant-org",
"whois-registrar",
"whois-creation-date",
"x509-fingerprint-sha1",
"x509-fingerprint-md5",
"x509-fingerprint-sha256",
"dns-soa-email",
"size-in-bytes",
"counter",
@ -665,6 +693,8 @@
"ip-dst|port",
"ip-src|port",
"hostname|port",
"mac-address",
"mac-eui-64",
"email-dst-display-name",
"email-src-display-name",
"email-header",
@ -777,6 +807,8 @@
"filename|imphash",
"filename|impfuzzy",
"filename|pehash",
"mac-address",
"mac-eui-64",
"ip-src",
"ip-dst",
"ip-dst|port",
@ -793,6 +825,7 @@
"AS",
"pattern-in-file",
"pattern-in-traffic",
"stix2-pattern",
"yara",
"sigma",
"attachment",
@ -804,6 +837,8 @@
"hex",
"vulnerability",
"x509-fingerprint-sha1",
"x509-fingerprint-md5",
"x509-fingerprint-sha256",
"other",
"hostname|port",
"email-dst-display-name",
@ -850,6 +885,7 @@
"pattern-in-file",
"pattern-in-memory",
"pdb",
"stix2-pattern",
"yara",
"sigma",
"attachment",
@ -863,6 +899,8 @@
"text",
"hex",
"x509-fingerprint-sha1",
"x509-fingerprint-md5",
"x509-fingerprint-sha256",
"other",
"cookie"
],
@ -899,6 +937,7 @@
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"stix2-pattern",
"yara",
"sigma",
"vulnerability",
@ -909,6 +948,8 @@
"text",
"hex",
"x509-fingerprint-sha1",
"x509-fingerprint-md5",
"x509-fingerprint-sha256",
"mobile-application-id",
"other"
],
@ -930,6 +971,8 @@
"hostname",
"domain",
"domain|ip",
"mac-address",
"mac-eui-64",
"email-dst",
"url",
"uri",
@ -938,6 +981,7 @@
"AS",
"snort",
"pattern-in-file",
"stix2-pattern",
"pattern-in-traffic",
"attachment",
"comment",
@ -959,12 +1003,16 @@
"whois-registrant-phone",
"whois-registrant-email",
"whois-registrant-name",
"whois-registrant-org",
"whois-registrar",
"whois-creation-date",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
"x509-fingerprint-md5",
"x509-fingerprint-sha256",
"other",
"dns-soa-email"
],
"External analysis": [
"md5",
@ -978,6 +1026,8 @@
"ip-dst",
"ip-dst|port",
"ip-src|port",
"mac-address",
"mac-eui-64",
"hostname",
"domain",
"domain|ip",
@ -997,6 +1047,8 @@
"comment",
"text",
"x509-fingerprint-sha1",
"x509-fingerprint-md5",
"x509-fingerprint-sha256",
"github-repository",
"other",
"cortex"

@ -1 +1 @@
Subproject commit bbf3e45649af5af50c98ad90a86916cf75e8c74d
Subproject commit 21e58b3ddf1737028b556b93b20d848f86a71cd0

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
@ -40,6 +39,11 @@ class InvalidMISPObject(MISPObjectException):
"""Exception raised when an object doesn't respect the contrains in the definition"""
pass
class UnknownMISPObjectTemplate(MISPObjectException):
"""Exception raised when the template is unknown"""
pass
class PyMISPInvalidFormat(PyMISPError):
pass

File diff suppressed because it is too large Load Diff

View File

@ -6,3 +6,5 @@ from .elfobject import ELFObject, ELFSectionObject # noqa
from .machoobject import MachOObject, MachOSectionObject # noqa
from .create_misp_object import make_binary_objects # noqa
from .abstractgenerator import AbstractMISPObjectGenerator # noqa
from .genericgenerator import GenericObjectGenerator # noqa
from .openioc import load_openioc, load_openioc_file # noqa

View File

@ -1,9 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import six
from . import FileObject, PEObject, ELFObject, MachOObject
from ..exceptions import MISPObjectException
import warnings
import logging
logger = logging.getLogger('pymisp')
try:
import lief
@ -18,8 +22,8 @@ class FileTypeNotImplemented(MISPObjectException):
pass
def make_pe_objects(lief_parsed, misp_file):
pe_object = PEObject(parsed=lief_parsed)
def make_pe_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}):
pe_object = PEObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(pe_object.uuid, 'included-in', 'PE indicators')
pe_sections = []
for s in pe_object.sections:
@ -27,8 +31,8 @@ def make_pe_objects(lief_parsed, misp_file):
return misp_file, pe_object, pe_sections
def make_elf_objects(lief_parsed, misp_file):
elf_object = ELFObject(parsed=lief_parsed)
def make_elf_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}):
elf_object = ELFObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(elf_object.uuid, 'included-in', 'ELF indicators')
elf_sections = []
for s in elf_object.sections:
@ -36,8 +40,8 @@ def make_elf_objects(lief_parsed, misp_file):
return misp_file, elf_object, elf_sections
def make_macho_objects(lief_parsed, misp_file):
macho_object = MachOObject(parsed=lief_parsed)
def make_macho_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}):
macho_object = MachOObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(macho_object.uuid, 'included-in', 'MachO indicators')
macho_sections = []
for s in macho_object.sections:
@ -45,27 +49,45 @@ def make_macho_objects(lief_parsed, misp_file):
return misp_file, macho_object, macho_sections
def make_binary_objects(filepath=None, pseudofile=None, filename=None):
misp_file = FileObject(filepath=filepath, pseudofile=pseudofile, filename=filename)
if HAS_LIEF and filepath:
def make_binary_objects(filepath=None, pseudofile=None, filename=None, standalone=True, default_attributes_parameters={}):
misp_file = FileObject(filepath=filepath, pseudofile=pseudofile, filename=filename,
standalone=standalone, default_attributes_parameters=default_attributes_parameters)
if HAS_LIEF and filepath or (pseudofile and filename):
try:
lief_parsed = lief.parse(filepath)
if filepath:
lief_parsed = lief.parse(filepath=filepath)
else:
if six.PY2:
logger.critical('Pseudofile is not supported in python2. Just update.')
lief_parsed = None
else:
lief_parsed = lief.parse(raw=pseudofile.getvalue(), name=filename)
if isinstance(lief_parsed, lief.PE.Binary):
return make_pe_objects(lief_parsed, misp_file)
return make_pe_objects(lief_parsed, misp_file, standalone, default_attributes_parameters)
elif isinstance(lief_parsed, lief.ELF.Binary):
return make_elf_objects(lief_parsed, misp_file)
return make_elf_objects(lief_parsed, misp_file, standalone, default_attributes_parameters)
elif isinstance(lief_parsed, lief.MachO.Binary):
return make_macho_objects(lief_parsed, misp_file)
return make_macho_objects(lief_parsed, misp_file, standalone, default_attributes_parameters)
except lief.bad_format as e:
warnings.warn('\tBad format: {}'.format(e))
logger.warning('Bad format: {}'.format(e))
except lief.bad_file as e:
warnings.warn('\tBad file: {}'.format(e))
logger.warning('Bad file: {}'.format(e))
except lief.conversion_error as e:
logger.warning('Conversion file: {}'.format(e))
except lief.builder_error as e:
logger.warning('Builder file: {}'.format(e))
except lief.parser_error as e:
warnings.warn('\tParser error: {}'.format(e))
except FileTypeNotImplemented as e: # noqa
warnings.warn(e)
logger.warning('Parser error: {}'.format(e))
except lief.integrity_error as e:
logger.warning('Integrity error: {}'.format(e))
except lief.pe_error as e:
logger.warning('PE error: {}'.format(e))
except lief.type_error as e:
logger.warning('Type error: {}'.format(e))
except lief.exception as e:
logger.warning('Lief exception: {}'.format(e))
except FileTypeNotImplemented as e:
logger.warning(e)
if not HAS_LIEF:
warnings.warn('Please install lief, documentation here: https://github.com/lief-project/LIEF')
if not filepath:
warnings.warn('LIEF currently requires a filepath and not a pseudo file')
logger.warning('Please install lief, documentation here: https://github.com/lief-project/LIEF')
return misp_file, None, None

View File

@ -5,8 +5,9 @@ from .abstractgenerator import AbstractMISPObjectGenerator
from ..exceptions import InvalidMISPObject
from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
import warnings
import logging
logger = logging.getLogger('pymisp')
try:
import lief
@ -23,9 +24,9 @@ except ImportError:
class ELFObject(AbstractMISPObjectGenerator):
def __init__(self, parsed=None, filepath=None, pseudofile=None):
def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs):
if not HAS_PYDEEP:
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if not HAS_LIEF:
raise ImportError('Please install lief, documentation here: https://github.com/lief-project/LIEF')
if pseudofile:
@ -43,10 +44,8 @@ class ELFObject(AbstractMISPObjectGenerator):
self.__elf = parsed
else:
raise InvalidMISPObject('Not a lief.ELF.Binary: {}'.format(type(parsed)))
super(ELFObject, self).__init__('elf')
super(ELFObject, self).__init__('elf', standalone=standalone, **kwargs)
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable('ObjectReference')
def generate_attributes(self):
# General information
@ -59,7 +58,7 @@ class ELFObject(AbstractMISPObjectGenerator):
if self.__elf.sections:
pos = 0
for section in self.__elf.sections:
s = ELFSectionObject(section)
s = ELFSectionObject(section, self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'included-in', 'Section {} of ELF'.format(pos))
pos += 1
self.sections.append(s)
@ -68,15 +67,13 @@ class ELFObject(AbstractMISPObjectGenerator):
class ELFSectionObject(AbstractMISPObjectGenerator):
def __init__(self, section):
def __init__(self, section, standalone=True, **kwargs):
# Python3 way
# super().__init__('pe-section')
super(ELFSectionObject, self).__init__('elf-section')
super(ELFSectionObject, self).__init__('elf-section', standalone=standalone, **kwargs)
self.__section = section
self.__data = bytes(self.__section.content)
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable('ObjectReference')
def generate_attributes(self):
self.add_attribute('name', value=self.__section.name)

View File

@ -8,7 +8,10 @@ from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
import math
from collections import Counter
import warnings
import logging
logger = logging.getLogger('pymisp')
try:
import pydeep
@ -25,11 +28,11 @@ except ImportError:
class FileObject(AbstractMISPObjectGenerator):
def __init__(self, filepath=None, pseudofile=None, filename=None):
def __init__(self, filepath=None, pseudofile=None, filename=None, standalone=True, **kwargs):
if not HAS_PYDEEP:
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if not HAS_MAGIC:
warnings.warn("Please install python-magic: pip install python-magic.")
logger.warning("Please install python-magic: pip install python-magic.")
if filename:
# Useful in case the file is copied with a pre-defined name by a script but we want to keep the original name
self.__filename = filename
@ -48,11 +51,9 @@ class FileObject(AbstractMISPObjectGenerator):
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')
# PY3 way:
# super().__init__('file')
super(FileObject, self).__init__('file')
super(FileObject, self).__init__('file', standalone=standalone, **kwargs)
self.__data = self.__pseudofile.getvalue()
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable('ObjectReference')
def generate_attributes(self):
self.add_attribute('filename', value=self.__filename)

View File

@ -0,0 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .abstractgenerator import AbstractMISPObjectGenerator
class GenericObjectGenerator(AbstractMISPObjectGenerator):
def generate_attributes(self, attributes):
for attribute in attributes:
for object_relation, value in attribute.items():
if isinstance(value, dict):
self.add_attribute(object_relation, **value)
else:
# In this case, we need a valid template, as all the other parameters will be pre-set.
self.add_attribute(object_relation, value=value)

View File

@ -5,7 +5,9 @@ from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
import warnings
import logging
logger = logging.getLogger('pymisp')
try:
@ -23,9 +25,9 @@ except ImportError:
class MachOObject(AbstractMISPObjectGenerator):
def __init__(self, parsed=None, filepath=None, pseudofile=None):
def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs):
if not HAS_PYDEEP:
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if not HAS_LIEF:
raise ImportError('Please install lief, documentation here: https://github.com/lief-project/LIEF')
if pseudofile:
@ -45,10 +47,8 @@ class MachOObject(AbstractMISPObjectGenerator):
raise InvalidMISPObject('Not a lief.MachO.Binary: {}'.format(type(parsed)))
# Python3 way
# super().__init__('elf')
super(MachOObject, self).__init__('macho')
super(MachOObject, self).__init__('macho', standalone=standalone, **kwargs)
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable(['ObjectReference'])
def generate_attributes(self):
self.add_attribute('type', value=str(self.__macho.header.file_type).split('.')[1])
@ -61,7 +61,7 @@ class MachOObject(AbstractMISPObjectGenerator):
if self.__macho.sections:
pos = 0
for section in self.__macho.sections:
s = MachOSectionObject(section)
s = MachOSectionObject(section, self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'included-in', 'Section {} of MachO'.format(pos))
pos += 1
self.sections.append(s)
@ -70,15 +70,13 @@ class MachOObject(AbstractMISPObjectGenerator):
class MachOSectionObject(AbstractMISPObjectGenerator):
def __init__(self, section):
def __init__(self, section, standalone=True, **kwargs):
# Python3 way
# super().__init__('pe-section')
super(MachOSectionObject, self).__init__('macho-section')
super(MachOSectionObject, self).__init__('macho-section', standalone=standalone, **kwargs)
self.__section = section
self.__data = bytes(self.__section.content)
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable(['ObjectReference'])
def generate_attributes(self):
self.add_attribute('name', value=self.__section.name)

View File

@ -278,17 +278,3 @@ def set_all_attributes(openioc, misp_event):
misp_event.add_attribute(**attribute_values)
return misp_event
if __name__ == '__main__':
import requests
# test file for composite
url = 'https://raw.githubusercontent.com/fireeye/iocs/master/BlogPosts/9cee306d-5441-4cd3-932d-f3119752634c.ioc'
# ~ url = 'https://raw.githubusercontent.com/MISP/misp-modules/master/tests/openioc.xml'
x = requests.get(url)
mispEvent = load_openioc(x.text)
print(mispEvent)
# ~ from pymisp import PyMISP
# ~ misp = PyMISP('http://misp.local', 'xxxxx')
# ~ r = misp.add_event(mispEvent)
# ~ print(r)

View File

@ -6,8 +6,9 @@ from .abstractgenerator import AbstractMISPObjectGenerator
from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
from datetime import datetime
import warnings
import logging
logger = logging.getLogger('pymisp')
try:
import lief
@ -24,9 +25,9 @@ except ImportError:
class PEObject(AbstractMISPObjectGenerator):
def __init__(self, parsed=None, filepath=None, pseudofile=None):
def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs):
if not HAS_PYDEEP:
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if not HAS_LIEF:
raise ImportError('Please install lief, documentation here: https://github.com/lief-project/LIEF')
if pseudofile:
@ -46,10 +47,8 @@ class PEObject(AbstractMISPObjectGenerator):
raise InvalidMISPObject('Not a lief.PE.Binary: {}'.format(type(parsed)))
# Python3 way
# super().__init__('pe')
super(PEObject, self).__init__('pe')
super(PEObject, self).__init__('pe', standalone=standalone, **kwargs)
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable('ObjectReference')
def _is_exe(self):
if not self._is_dll() and not self._is_driver():
@ -105,7 +104,7 @@ class PEObject(AbstractMISPObjectGenerator):
if self.__pe.sections:
pos = 0
for section in self.__pe.sections:
s = PESectionObject(section)
s = PESectionObject(section, self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'included-in', 'Section {} of PE'.format(pos))
if ((self.__pe.entrypoint >= section.virtual_address) and
(self.__pe.entrypoint < (section.virtual_address + section.virtual_size))):
@ -118,15 +117,13 @@ class PEObject(AbstractMISPObjectGenerator):
class PESectionObject(AbstractMISPObjectGenerator):
def __init__(self, section):
def __init__(self, section, standalone=True, **kwargs):
# Python3 way
# super().__init__('pe-section')
super(PESectionObject, self).__init__('pe-section')
super(PESectionObject, self).__init__('pe-section', standalone=standalone, **kwargs)
self.__section = section
self.__data = bytes(self.__section.content)
self.generate_attributes()
# Mark as non_jsonable because we need to add them manually
self.update_not_jsonable('ObjectReference')
def generate_attributes(self):
self.add_attribute('name', value=self.__section.name)

View File

@ -4,7 +4,12 @@
import re
import requests
import validators
try:
import validators
has_validators = True
except ImportError:
has_validators = False
from .abstractgenerator import AbstractMISPObjectGenerator
from .. import InvalidMISPObject
@ -18,20 +23,22 @@ class VTReportObject(AbstractMISPObjectGenerator):
:indicator: IOC to search VirusTotal for
'''
def __init__(self, apikey, indicator):
def __init__(self, apikey, indicator, vt_proxies=None, standalone=True, **kwargs):
# PY3 way:
# super().__init__("virustotal-report")
super(VTReportObject, self).__init__("virustotal-report")
super(VTReportObject, self).__init__("virustotal-report", standalone=standalone, **kwargs)
indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator)
if self._resource_type:
self._proxies = vt_proxies
self._report = self.__query_virustotal(apikey, indicator)
self.generate_attributes()
else:
error_msg = "A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{}' instead".format(indicator)
raise InvalidMISPObject(error_msg)
# Mark as non_jsonable because we need to add the references manually after the object(s) have been created
self.update_not_jsonable('ObjectReference')
def get_report(self):
return self._report
def generate_attributes(self):
''' Parse the VirusTotal report for relevant attributes '''
@ -48,6 +55,8 @@ class VTReportObject(AbstractMISPObjectGenerator):
:ioc: Indicator to search VirusTotal for
'''
if not has_validators:
raise Exception('You need to install validators: pip install validators')
if validators.url(ioc):
return "url"
elif re.match(r"\b([a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})\b", ioc):
@ -65,7 +74,10 @@ class VTReportObject(AbstractMISPObjectGenerator):
url = "https://www.virustotal.com/vtapi/v2/{}/report".format(self._resource_type)
params = {"apikey": apikey, "resource": resource}
# for now assume we're using a public API key - we'll figure out private keys later
report = requests.get(url, params=params)
if self._proxies:
report = requests.get(url, params=params, proxies=self._proxies)
else:
report = requests.get(url, params=params)
report = report.json()
if report["response_code"] == 1:
return report

View File

@ -30,7 +30,8 @@ setup(
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema', 'setuptools>=36.4'],
extras_require={'fileobjects': ['lief>=0.8', 'python-magic'],
'neo': ['py2neo'],
'openioc': ['beautifulsoup4']},
'openioc': ['beautifulsoup4'],
'virustotal': ['validators']},
tests_require=[
'jsonschema',
'python-dateutil',

View File

@ -0,0 +1,23 @@
{
"Event": {
"Attribute": [
{
"Tag": [
{
"name": "osint"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "bar.exe"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,25 @@
{
"Event": {
"Attribute": [
{
"Tag": [
{
"name": "osint"
}
],
"category": "Payload delivery",
"deleted": true,
"disable_correlation": false,
"id": "42",
"to_ids": true,
"type": "filename",
"value": "bar.exe"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,55 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"category": "Attribution",
"disable_correlation": false,
"object_relation": "registrar",
"to_ids": false,
"type": "whois-registrar",
"value": "registar.example.com"
},
{
"category": "Network activity",
"disable_correlation": false,
"object_relation": "domain",
"to_ids": true,
"type": "domain",
"value": "domain.example.com"
},
{
"category": "Network activity",
"disable_correlation": true,
"object_relation": "nameserver",
"to_ids": false,
"type": "hostname",
"value": "ns1.example.com"
},
{
"category": "External analysis",
"disable_correlation": false,
"object_relation": "nameserver",
"to_ids": true,
"type": "hostname",
"value": "ns2.example.com"
}
],
"description": "Whois records information for a domain name.",
"distribution": 5,
"meta-category": "network",
"name": "whois",
"sharing_group_id": 0,
"template_uuid": "429faea1-34ff-47af-8a00-7c62d3be5a6a",
"template_version": 7,
"uuid": "a"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,10 @@
{
"Event": {
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"published": true,
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,59 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"Tag": [
{
"name": "blah"
}
],
"category": "Payload delivery",
"disable_correlation": true,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "bar"
}
],
"ObjectReference": [
{
"comment": "foo",
"object_uuid": "a",
"referenced_uuid": "b",
"relationship_type": "baz"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 9,
"uuid": "a"
},
{
"Attribute": [
{
"category": "External analysis",
"disable_correlation": false,
"object_relation": "url",
"to_ids": true,
"type": "url",
"value": "https://www.circl.lu"
}
],
"description": "url object describes an url along with its normalized field (like extracted using faup parsing library) and its metadata.",
"distribution": 5,
"meta-category": "network",
"name": "url",
"sharing_group_id": 0,
"template_uuid": "60efb77b-40b5-4c46-871b-ed1ed999fce5",
"template_version": 5,
"uuid": "b"
}
]
}
}

View File

@ -0,0 +1,56 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"Tag": [
{
"name": "blah"
}
],
"category": "Payload delivery",
"disable_correlation": true,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "bar"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 9,
"uuid": "a"
},
{
"Attribute": [
{
"Tag": [
{
"name": "blah"
}
],
"category": "Payload delivery",
"disable_correlation": true,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "baz"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 9,
"uuid": "b"
}
]
}
}

View File

@ -0,0 +1,31 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"category": "Payload delivery",
"disable_correlation": false,
"object_relation": "filename",
"to_ids": true,
"type": "filename",
"value": "bar"
}
],
"Tag": [
{
"name": "osint"
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 9,
"uuid": "a"
}
]
}
}

View File

@ -0,0 +1,20 @@
{
"Event": {
"Tag": [
{
"name": "bar"
},
{
"name": "baz"
},
{
"name": "foo"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,21 @@
{
"Event": {
"Attribute": [
{
"category": "Payload delivery",
"data": "ewogICJFdmVudCI6IHsKICB9Cn0K",
"disable_correlation": false,
"encrypt": true,
"malware_filename": "bar.exe",
"to_ids": true,
"type": "malware-sample",
"value": "bar.exe|7637beddacbeac59d44469b2b120b9e6"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,165 @@
{"response":[{
"Event": {
"id": "6719",
"orgc_id": "1",
"org_id": "1",
"date": "2018-01-04",
"threat_level_id": "1",
"info": "Test existing malware PyMISP",
"published": false,
"uuid": "5a4e4fdd-1eb4-4ff3-9e87-43fa950d210f",
"attribute_count": "6",
"analysis": "0",
"timestamp": "1515081727",
"distribution": "0",
"proposal_email_lock": false,
"locked": false,
"publish_timestamp": "0",
"sharing_group_id": "0",
"disable_correlation": false,
"event_creator_email": "raphael.vinot@circl.lu",
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"Orgc": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"Attribute": [],
"ShadowAttribute": [],
"RelatedEvent": [],
"Galaxy": [],
"Object": [
{
"id": "2279",
"name": "file",
"meta-category": "file",
"description": "File object describing a file with meta-information",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "7",
"event_id": "6719",
"uuid": "5a4e4ffe-4cb8-48b1-bd5c-48fb950d210f",
"timestamp": "1515081726",
"distribution": "5",
"sharing_group_id": "0",
"comment": "",
"deleted": false,
"ObjectReference": [],
"Attribute": [
{
"id": "814967",
"type": "malware-sample",
"category": "Payload delivery",
"to_ids": true,
"uuid": "5a4e4fff-407c-40ff-9de5-43dc950d210f",
"event_id": "6719",
"distribution": "5",
"timestamp": "1515081727",
"comment": "",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": false,
"object_id": "2279",
"object_relation": "malware-sample",
"value": "simple.json|7637beddacbeac59d44469b2b120b9e6",
"data": "UEsDBAoACQAAAEOAJEyjHboUIQAAABUAAAAgABwANzYzN2JlZGRhY2JlYWM1OWQ0NDQ2OWIyYjEyMGI5ZTZVVAkAA\/5PTlr+T05adXgLAAEEIQAAAAQhAAAATvzonhGOj12MyB1QeGLJ5iZhOjD+zymV4FU2+kjD4oTYUEsHCKMduhQhAAAAFQAAAFBLAwQKAAkAAABDgCRMg45UABcAAAALAAAALQAcADc2MzdiZWRkYWNiZWFjNTlkNDQ0NjliMmIxMjBiOWU2LmZpbGVuYW1lLnR4dFVUCQAD\/k9OWv5PTlp1eAsAAQQhAAAABCEAAADDgZOh6307Bduy829xtRjpivO\/xFI3KVBLBwiDjlQAFwAAAAsAAABQSwECHgMKAAkAAABDgCRMox26FCEAAAAVAAAAIAAYAAAAAAABAAAApIEAAAAANzYzN2JlZGRhY2JlYWM1OWQ0NDQ2OWIyYjEyMGI5ZTZVVAUAA\/5PTlp1eAsAAQQhAAAABCEAAABQSwECHgMKAAkAAABDgCRMg45UABcAAAALAAAALQAYAAAAAAABAAAApIGLAAAANzYzN2JlZGRhY2JlYWM1OWQ0NDQ2OWIyYjEyMGI5ZTYuZmlsZW5hbWUudHh0VVQFAAP+T05adXgLAAEEIQAAAAQhAAAAUEsFBgAAAAACAAIA2QAAABkBAAAAAA==",
"ShadowAttribute": []
},
{
"id": "814968",
"type": "filename",
"category": "Payload delivery",
"to_ids": false,
"uuid": "5a4e4fff-9ec0-4822-a405-4e29950d210f",
"event_id": "6719",
"distribution": "5",
"timestamp": "1515081727",
"comment": "",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": false,
"object_id": "2279",
"object_relation": "filename",
"value": "simple.json",
"ShadowAttribute": []
},
{
"id": "814969",
"type": "md5",
"category": "Payload delivery",
"to_ids": true,
"uuid": "5a4e4fff-8000-49f9-8c3e-4598950d210f",
"event_id": "6719",
"distribution": "5",
"timestamp": "1515081727",
"comment": "",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": false,
"object_id": "2279",
"object_relation": "md5",
"value": "7637beddacbeac59d44469b2b120b9e6",
"ShadowAttribute": []
},
{
"id": "814970",
"type": "sha1",
"category": "Payload delivery",
"to_ids": true,
"uuid": "5a4e4fff-dae0-4aa4-81ea-4899950d210f",
"event_id": "6719",
"distribution": "5",
"timestamp": "1515081727",
"comment": "",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": false,
"object_id": "2279",
"object_relation": "sha1",
"value": "023853a4331db8d67e44553004cf338ec1b7440e",
"ShadowAttribute": []
},
{
"id": "814971",
"type": "sha256",
"category": "Payload delivery",
"to_ids": true,
"uuid": "5a4e4fff-03ec-4e88-b5f4-472b950d210f",
"event_id": "6719",
"distribution": "5",
"timestamp": "1515081727",
"comment": "",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": false,
"object_id": "2279",
"object_relation": "sha256",
"value": "6ae8b0f1c7d6f3238d1fc14038018c3b4704c8cc23dac1c2bfd2c81b5a278eef",
"ShadowAttribute": []
},
{
"id": "814972",
"type": "size-in-bytes",
"category": "Other",
"to_ids": false,
"uuid": "5a4e4fff-b6f4-41ba-a6eb-446c950d210f",
"event_id": "6719",
"distribution": "5",
"timestamp": "1515081727",
"comment": "",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": true,
"object_id": "2279",
"object_relation": "size-in-bytes",
"value": "21",
"ShadowAttribute": []
}
]
}
]
}
}]}

View File

@ -0,0 +1,40 @@
{
"Event": {
"Object": [
{
"Attribute": [
{
"category": "Other",
"disable_correlation": false,
"object_relation": "member3",
"to_ids": false,
"type": "text",
"value": "foo"
},
{
"category": "Other",
"disable_correlation": false,
"object_relation": "member1",
"to_ids": false,
"type": "text",
"value": "bar"
}
],
"description": "TestTemplate.",
"distribution": 5,
"meta-category": "file",
"misp_objects_path_custom": "tests/mispevent_testfiles",
"name": "test_object_template",
"sharing_group_id": 0,
"template_uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589",
"template_version": 1,
"uuid": "a"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,36 @@
{
"Event": {
"Attribute": [
{
"ShadowAttribute": [
{
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "bar.pdf"
}
],
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "bar.exe"
}
],
"ShadowAttribute": [
{
"category": "Payload delivery",
"disable_correlation": false,
"to_ids": true,
"type": "filename",
"value": "baz.jpg"
}
],
"analysis": "1",
"date": "2017-12-31",
"distribution": "1",
"info": "This is a test",
"threat_level_id": "1"
}
}

View File

@ -0,0 +1,149 @@
{
"Event": {
"Attribute": [
{
"ShadowAttribute": [
{
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"category": "Artifacts dropped",
"comment": "",
"disable_correlation": false,
"event_id": "6676",
"event_uuid": "5a4cb19a-f550-437f-bd29-48ed950d210f",
"id": "3770",
"old_id": "811578",
"org_id": "1",
"proposal_to_delete": false,
"timestamp": "1514975846",
"to_ids": true,
"type": "filename",
"uuid": "5a4cb1c7-fa84-45fa-8d27-4822950d210f",
"value": "blah.exe.jpg"
}
],
"category": "Artifacts dropped",
"comment": "",
"deleted": false,
"disable_correlation": false,
"distribution": "5",
"event_id": "6676",
"id": "811578",
"object_id": "0",
"sharing_group_id": "0",
"timestamp": "1514975687",
"to_ids": false,
"type": "filename",
"uuid": "5a4cb1c7-fa84-45fa-8d27-4822950d210f",
"value": "blah.exe"
}
],
"Object": [
{
"Attribute": [
{
"ShadowAttribute": [
{
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"category": "Payload delivery",
"comment": "",
"disable_correlation": false,
"event_id": "6676",
"event_uuid": "5a4cb19a-f550-437f-bd29-48ed950d210f",
"id": "3771",
"old_id": "811579",
"org_id": "1",
"proposal_to_delete": false,
"timestamp": "1514976196",
"to_ids": true,
"type": "filename",
"uuid": "5a4cb2b8-4748-4c72-96e6-4588950d210f",
"value": "baz.png.exe"
}
],
"category": "Payload delivery",
"comment": "",
"deleted": false,
"disable_correlation": false,
"distribution": "5",
"event_id": "6676",
"id": "811579",
"object_id": "2278",
"object_relation": "filename",
"sharing_group_id": "0",
"timestamp": "1514975928",
"to_ids": true,
"type": "filename",
"uuid": "5a4cb2b8-4748-4c72-96e6-4588950d210f",
"value": "baz.png"
},
{
"category": "Other",
"comment": "",
"deleted": false,
"disable_correlation": true,
"distribution": "5",
"event_id": "6676",
"id": "811580",
"object_id": "2278",
"object_relation": "state",
"sharing_group_id": "0",
"timestamp": "1514975928",
"to_ids": false,
"type": "text",
"uuid": "5a4cb2b9-92b4-4d3a-82df-4e86950d210f",
"value": "Malicious"
}
],
"comment": "",
"deleted": false,
"description": "File object describing a file with meta-information",
"distribution": "5",
"event_id": "6676",
"id": "2278",
"meta-category": "file",
"name": "file",
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "7",
"timestamp": "1514975928",
"uuid": "5a4cb2b8-7958-4323-852c-4d2a950d210f"
}
],
"Org": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"Orgc": {
"id": "1",
"name": "CIRCL",
"uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"
},
"analysis": "2",
"attribute_count": "3",
"date": "2018-01-03",
"disable_correlation": false,
"distribution": "0",
"event_creator_email": "raphael.vinot@circl.lu",
"id": "6676",
"info": "Test proposals / ShadowAttributes",
"locked": false,
"org_id": "1",
"orgc_id": "1",
"proposal_email_lock": true,
"publish_timestamp": "0",
"published": false,
"sharing_group_id": "0",
"threat_level_id": "1",
"timestamp": "1514975929",
"uuid": "5a4cb19a-f550-437f-bd29-48ed950d210f"
}
}

View File

@ -0,0 +1,5 @@
{
"timestamp": 11111111,
"type": "bar",
"value": "1"
}

View File

@ -0,0 +1,4 @@
{
"Event": {
}
}

View File

@ -0,0 +1,29 @@
{
"requiredOneOf": [
"member1",
"member2"
],
"required": [
"member3"
],
"attributes": {
"member1": {
"description": "FirstMember",
"misp-attribute": "text"
},
"member2": {
"description": "SecondMember",
"misp-attribute": "text",
"multiple": true
},
"member3": {
"description": "Thirdmember",
"misp-attribute": "text"
}
},
"version": 1,
"description": "TestTemplate.",
"meta-category": "file",
"uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589",
"name": "test_object_template"
}

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from pymisp import PyMISP, __version__
from keys import url, key
import time
@ -12,6 +12,7 @@ class TestBasic(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.misp = PyMISP(url, key, True, 'json')
self.live_describe_types = self.misp.get_live_describe_types()
def _clean_event(self, event):
event['Event'].pop('orgc_id', None)
@ -53,7 +54,17 @@ class TestBasic(unittest.TestCase):
def add_hashes(self, eventid):
r = self.misp.get_event(eventid)
event = r.json()
event = self.misp.add_hashes(event, 'Payload installation', 'dll_installer.dll', '0a209ac0de4ac033f31d6ba9191a8f7a', '1f0ae54ac3f10d533013f74f48849de4e65817a7', '003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9', 'Fanny modules', False, 2)
event = self.misp.add_hashes(event,
category='Payload installation',
filename='dll_installer.dll',
md5='0a209ac0de4ac033f31d6ba9191a8f7a',
sha1='1f0ae54ac3f10d533013f74f48849de4e65817a7',
sha256='003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
ssdeep=None,
comment='Fanny modules',
to_ids=False,
distribution=2,
proposal=False)
self._clean_event(event)
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
u'attribute_count': u'3', u'analysis': u'0',
@ -253,5 +264,47 @@ class TestBasic(unittest.TestCase):
def test_create_organisation(self):
self.add_organisation()
def test_describeTypes_sane_default(self):
sane_default = self.live_describe_types['sane_defaults']
self.assertEqual(sorted(sane_default.keys()), sorted(self.live_describe_types['types']))
def test_describeTypes_categories(self):
category_type_mappings = self.live_describe_types['category_type_mappings']
self.assertEqual(sorted(category_type_mappings.keys()), sorted(self.live_describe_types['categories']))
def test_describeTypes_types_in_categories(self):
category_type_mappings = self.live_describe_types['category_type_mappings']
for category, types in category_type_mappings.items():
existing_types = [t for t in types if t in self.live_describe_types['types']]
self.assertEqual(sorted(existing_types), sorted(types))
def test_describeTypes_types_have_category(self):
category_type_mappings = self.live_describe_types['category_type_mappings']
all_types = set()
for category, types in category_type_mappings.items():
all_types.update(types)
self.assertEqual(sorted(list(all_types)), sorted(self.live_describe_types['types']))
def test_describeTypes_sane_default_valid_category(self):
sane_default = self.live_describe_types['sane_defaults']
categories = self.live_describe_types['categories']
for t, sd in sane_default.items():
self.assertTrue(sd['to_ids'] in [0, 1])
self.assertTrue(sd['default_category'] in categories)
def test_describeTypes_uptodate(self):
self.assertEqual(self.live_describe_types, self.misp.get_local_describe_types())
def test_live_acl(self):
query_acl = self.misp.get_live_query_acl()
self.assertEqual(query_acl['response'], [])
def test_recommended_pymisp_version(self):
response = self.misp.get_recommended_api_version()
recommended_version_tup = tuple(int(x) for x in response['version'].split('.'))
pymisp_version_tup = tuple(int(x) for x in __version__.split('.'))[:3]
self.assertEqual(recommended_version_tup, pymisp_version_tup)
if __name__ == '__main__':
unittest.main()

271
tests/test_mispevent.py Normal file
View File

@ -0,0 +1,271 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import unittest
import json
import sys
from io import BytesIO
from pymisp import MISPEvent, MISPSighting, MISPTag
from pymisp.exceptions import InvalidMISPObject
class TestMISPEvent(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.mispevent = MISPEvent()
def init_event(self):
self.mispevent.info = 'This is a test'
self.mispevent.distribution = 1
self.mispevent.threat_level_id = 1
self.mispevent.analysis = 1
self.mispevent.set_date("2017-12-31") # test the set date method
def test_simple(self):
with open('tests/mispevent_testfiles/simple.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event(self):
self.init_event()
self.mispevent.publish()
with open('tests/mispevent_testfiles/event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_loadfile(self):
self.mispevent.load_file('tests/mispevent_testfiles/event.json')
with open('tests/mispevent_testfiles/event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event_tag(self):
self.init_event()
self.mispevent.add_tag('bar')
self.mispevent.add_tag(name='baz')
new_tag = MISPTag()
new_tag.from_dict(name='foo')
self.mispevent.add_tag(new_tag)
with open('tests/mispevent_testfiles/event_tags.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_attribute(self):
self.init_event()
self.mispevent.add_attribute('filename', 'bar.exe')
self.mispevent.add_attribute_tag('osint', 'bar.exe')
attr_tags = self.mispevent.get_attribute_tag('bar.exe')
self.assertEqual(self.mispevent.attributes[0].tags[0].name, 'osint')
self.assertEqual(attr_tags[0].name, 'osint')
with open('tests/mispevent_testfiles/attribute.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
# Fake setting an attribute ID for testing
self.mispevent.attributes[0].id = 42
self.mispevent.delete_attribute(42)
with open('tests/mispevent_testfiles/attribute_del.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_object_tag(self):
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}])
self.assertEqual(self.mispevent.objects[0].attributes[0].tags[0].name, 'blah')
self.assertTrue(self.mispevent.objects[0].has_attributes_by_relation(['filename']))
self.assertEqual(len(self.mispevent.objects[0].get_attributes_by_relation('filename')), 1)
self.mispevent.add_object(name='url', strict=True)
self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu')
self.mispevent.objects[0].uuid = 'a'
self.mispevent.objects[1].uuid = 'b'
self.mispevent.objects[0].add_reference('b', 'baz', comment='foo')
self.assertEqual(self.mispevent.objects[0].references[0].relationship_type, 'baz')
with open('tests/mispevent_testfiles/event_obj_attr_tag.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
@unittest.skip("Not supported on MISP: https://github.com/MISP/MISP/issues/2638 - https://github.com/MISP/PyMISP/issues/168")
def test_object_level_tag(self):
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar')
self.mispevent.objects[0].add_tag('osint')
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/event_obj_tag.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_malware(self):
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
pseudofile = BytesIO(f.read())
self.init_event()
self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile)
attribute = self.mispevent.attributes[0]
self.assertEqual(attribute.malware_binary, pseudofile)
with open('tests/mispevent_testfiles/malware.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_malware(self):
self.mispevent.load_file('tests/mispevent_testfiles/malware_exist.json')
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
pseudofile = BytesIO(f.read())
self.assertEqual(
self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary.read(),
pseudofile.read())
def test_sighting(self):
sighting = MISPSighting()
sighting.from_dict(value='1', type='bar', timestamp=11111111)
with open('tests/mispevent_testfiles/sighting.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(sighting.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_event(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
with open('tests/mispevent_testfiles/existing_event.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_shadow_attributes_existing(self):
self.mispevent.load_file('tests/mispevent_testfiles/shadow.json')
with open('tests/mispevent_testfiles/shadow.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_shadow_attributes(self):
self.init_event()
self.mispevent.add_proposal(type='filename', value='baz.jpg')
self.mispevent.add_attribute('filename', 'bar.exe')
self.mispevent.attributes[0].add_proposal(type='filename', value='bar.pdf')
with open('tests/mispevent_testfiles/proposals.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_default_attributes(self):
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}])
self.mispevent.add_object(name='file', strict=False, default_attributes_parameters=self.mispevent.objects[0].attributes[0])
self.mispevent.objects[1].add_attribute('filename', value='baz')
self.mispevent.objects[0].uuid = 'a'
self.mispevent.objects[1].uuid = 'b'
with open('tests/mispevent_testfiles/event_obj_def_param.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_default_values(self):
self.init_event()
self.mispevent.add_object(name='whois', strict=True)
self.mispevent.objects[0].add_attribute('registrar', value='registar.example.com')
self.mispevent.objects[0].add_attribute('domain', value='domain.example.com')
self.mispevent.objects[0].add_attribute('nameserver', value='ns1.example.com')
self.mispevent.objects[0].add_attribute('nameserver', value='ns2.example.com', disable_correlation=False, to_ids=True, category='External analysis')
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/def_param.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event_not_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
def test_event_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.mispevent.info = 'blah'
self.assertTrue(self.mispevent.edited)
def test_event_tag_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.add_tag('foo')
self.assertTrue(self.mispevent.edited)
def test_event_attribute_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.mispevent.attributes[0].value = 'blah'
self.assertTrue(self.mispevent.attributes[0].edited)
self.assertFalse(self.mispevent.attributes[1].edited)
self.assertTrue(self.mispevent.edited)
def test_event_attribute_tag_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.attributes[0].tags[0].name = 'blah'
self.assertTrue(self.mispevent.attributes[0].tags[0].edited)
self.assertFalse(self.mispevent.attributes[0].tags[1].edited)
self.assertTrue(self.mispevent.attributes[0].edited)
self.assertTrue(self.mispevent.edited)
def test_event_attribute_tag_edited_second(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.attributes[0].add_tag(name='blah')
self.assertTrue(self.mispevent.attributes[0].edited)
self.assertTrue(self.mispevent.edited)
def test_event_object_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.objects[0].comment = 'blah'
self.assertTrue(self.mispevent.objects[0].edited)
self.assertFalse(self.mispevent.objects[1].edited)
self.assertTrue(self.mispevent.edited)
def test_event_object_attribute_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.objects[0].attributes[0].comment = 'blah'
self.assertTrue(self.mispevent.objects[0].attributes[0].edited)
self.assertTrue(self.mispevent.objects[0].edited)
self.assertTrue(self.mispevent.edited)
def test_event_object_attribute_edited_tag(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.objects[0].attributes[0].add_tag('blah')
self.assertTrue(self.mispevent.objects[0].attributes[0].edited)
self.assertTrue(self.mispevent.objects[0].edited)
self.assertTrue(self.mispevent.edited)
with open('tests/mispevent_testfiles/existing_event_edited.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_by_id(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
misp_obj = self.mispevent.get_object_by_id(1556)
self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f')
def test_userdefined_object(self):
self.init_event()
self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles')
with self.assertRaises(InvalidMISPObject) as e:
# Fail on required
self.mispevent.to_json()
if sys.version_info >= (3, ):
self.assertEqual(e.exception.message, '{\'member3\'} are required.')
else:
# Python2 bullshit
self.assertEqual(e.exception.message, 'set([u\'member3\']) are required.')
self.mispevent.objects[0].add_attribute('member3', value='foo')
with self.assertRaises(InvalidMISPObject) as e:
# Fail on requiredOneOf
self.mispevent.to_json()
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
self.mispevent.objects[0].add_attribute('member1', value='bar')
self.mispevent.objects[0].add_attribute('member1', value='baz')
with self.assertRaises(InvalidMISPObject) as e:
# member1 is not a multiple
self.mispevent.to_json()
self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed')
self.mispevent.objects[0].attributes = self.mispevent.objects[0].attributes[:2]
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/misp_custom_obj.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(), json.dumps(ref_json, sort_keys=True, indent=2))
if __name__ == '__main__':
unittest.main()

View File

@ -5,6 +5,8 @@ import unittest
import requests_mock
import json
import os
import six
from io import BytesIO
import pymisp as pm
from pymisp import PyMISP
@ -49,6 +51,13 @@ class TestOffline(unittest.TestCase):
m.register_uri('DELETE', self.domain + 'events/3', json={'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
m.register_uri('GET', self.domain + 'attributes/delete/2', json={'message': 'Attribute deleted.'})
m.register_uri('POST', self.domain + 'events/index', json=self.search_index_result)
m.register_uri('POST', self.domain + 'attributes/edit/' + self.key, json={})
m.register_uri('GET', self.domain + 'shadow_attributes/view/None', json={})
m.register_uri('GET', self.domain + 'shadow_attributes/view/1', json={})
m.register_uri('POST', self.domain + 'events/freeTextImport/1', json={})
m.register_uri('POST', self.domain + 'attributes/restSearch', json={})
m.register_uri('POST', self.domain + 'attributes/downloadSample', json={})
m.register_uri('GET', self.domain + 'tags', json={'Tag': 'foo'})
def test_getEvent(self, m):
self.initURI(m)
@ -210,9 +219,12 @@ class TestOffline(unittest.TestCase):
p.add_internal_other(evt, 'foobar')
p.add_attachment(evt, "testFile")
def make_objects(self, path):
def make_objects(self, path=None, pseudofile=None, filename=None):
to_return = {'objects': [], 'references': []}
fo, peo, seos = make_binary_objects(path)
if path:
fo, peo, seos = make_binary_objects(path)
else:
fo, peo, seos = make_binary_objects(pseudofile=pseudofile, filename=filename)
if seos:
for s in seos:
@ -229,8 +241,31 @@ class TestOffline(unittest.TestCase):
to_return['objects'].append(fo)
if fo.ObjectReference:
to_return['references'] += fo.ObjectReference
# Remove UUIDs for comparing the objects.
for o in to_return['objects']:
o.pop('uuid')
for o in to_return['references']:
o.pop('referenced_uuid')
o.pop('object_uuid')
return json.dumps(to_return, cls=MISPEncode)
def test_objects_pseudofile(self, m):
if six.PY2:
return unittest.SkipTest()
paths = ['cmd.exe', 'tmux', 'MachO-OSX-x64-ls']
try:
for path in paths:
with open(os.path.join('tests', 'viper-test-files', 'test_files', path), 'rb') as f:
pseudo = BytesIO(f.read())
json_blob = self.make_objects(pseudofile=pseudo, filename=path)
# Compare pseudo file / path
filepath_blob = self.make_objects(os.path.join('tests', 'viper-test-files', 'test_files', path))
self.assertEqual(json_blob, filepath_blob)
except IOError: # Can be replaced with FileNotFoundError when support for python 2 is dropped
return unittest.SkipTest()
print(json_blob)
def test_objects(self, m):
paths = ['cmd.exe', 'tmux', 'MachO-OSX-x64-ls']
try:
@ -241,5 +276,125 @@ class TestOffline(unittest.TestCase):
return unittest.SkipTest()
print(json_blob)
def test_describeTypes_sane_default(self, m):
sane_default = self.types['result']['sane_defaults']
self.assertEqual(sorted(sane_default.keys()), sorted(self.types['result']['types']))
def test_describeTypes_categories(self, m):
category_type_mappings = self.types['result']['category_type_mappings']
self.assertEqual(sorted(category_type_mappings.keys()), sorted(self.types['result']['categories']))
def test_describeTypes_types_in_categories(self, m):
category_type_mappings = self.types['result']['category_type_mappings']
for category, types in category_type_mappings.items():
existing_types = [t for t in types if t in self.types['result']['types']]
self.assertEqual(sorted(existing_types), sorted(types))
def test_describeTypes_types_have_category(self, m):
category_type_mappings = self.types['result']['category_type_mappings']
all_types = set()
for category, types in category_type_mappings.items():
all_types.update(types)
self.assertEqual(sorted(list(all_types)), sorted(self.types['result']['types']))
def test_describeTypes_sane_default_valid_category(self, m):
sane_default = self.types['result']['sane_defaults']
categories = self.types['result']['categories']
for t, sd in sane_default.items():
self.assertTrue(sd['to_ids'] in [0, 1])
self.assertTrue(sd['default_category'] in categories)
def test_flatten_error_messages_singular(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
error = pymisp.get(1)
response = self.auth_error_msg
response['error'] = ['foo', 'bar', 'baz']
messages = pymisp.flatten_error_messages(response)
self.assertEqual(["foo", "bar", "baz"], messages)
def test_flatten_error_messages_plural(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
error = pymisp.get(1)
response = self.auth_error_msg
response['errors'] = {'foo': 42, 'bar': False, 'baz': ['oo', 'ka']}
messages = pymisp.flatten_error_messages(response)
self.assertEqual(set(['42 (foo)', 'False (bar)', 'oo', 'ka']), set(messages))
def test_flatten_error_messages_nested(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
error = pymisp.get(1)
response = self.auth_error_msg
response['errors'] = {
'fo': {'o': 42}, 'ba': {'r': True}, 'b': {'a': ['z']}, 'd': {'e': {'e': ['p']}}}
messages = pymisp.flatten_error_messages(response)
self.assertEqual(set(['Error in o: 42', 'Error in r: True', 'Error in a: z', "Error in e: {'e': ['p']}"]), set(messages))
def test_test_connection(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertTrue(pymisp.test_connection())
def test_change_toids(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual({}, pymisp.change_toids(self.key, 1))
def test_change_toids_invalid(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
try:
_ = pymisp.change_toids(self.key, 42)
self.assertFalse('Exception required for off domain value')
except Exception:
pass
def test_proposal_view_default(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual({}, pymisp.proposal_view())
def test_proposal_view_event_1(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual({}, pymisp.proposal_view(event_id=1))
def test_proposal_view_event_overdetermined(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertTrue(pymisp.proposal_view(event_id=1, proposal_id=42).get('error') is not None)
def test_freetext(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual({}, pymisp.freetext(1, 'foo', adhereToWarninglists=True, distribution=42))
def test_freetext_offdomain(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
try:
_ = pymisp.freetext(1, None, adhereToWarninglists='hard')
self.assertFalse('Exception required for off domain value')
except Exception:
pass
def test_get_yara(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual((False, None), pymisp.get_yara(1))
def test_download_samples(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual((False, None), pymisp.download_samples())
def test_get_all_tags(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
self.assertEqual({'Tag': 'foo'}, pymisp.get_all_tags())
if __name__ == '__main__':
unittest.main()