Merge remote-tracking branch 'upstream/master'

pull/312/head
Steve Clement 2018-10-08 09:34:09 +09:00
commit 04e089b3e6
27 changed files with 3271 additions and 1178 deletions

View File

@ -11,7 +11,6 @@ addons:
python:
- "2.7"
- "3.5"
- "3.5-dev"
- "3.6"
- "3.6-dev"

View File

@ -2,6 +2,97 @@ Changelog
=========
v2.4.95 (2018-09-06)
--------------------
New
~~~
- Add helpers for new server related APIs. [Raphaël Vinot]
Fix #266
- [test] Attribute modification. [Raphaël Vinot]
- More test cases, bug fixes. [Raphaël Vinot]
- Reworking the REST API (WiP) [Raphaël Vinot]
- Add Jupyter for search. [Raphaël Vinot]
Changes
~~~~~~~
- Bump misp-objects. [Raphaël Vinot]
- Version bump. [Raphaël Vinot]
- [data-model] updated describeTypes file. [Alexandre Dulaunoy]
- Fix testing. [Raphaël Vinot]
- More testing improvments. [Raphaël Vinot]
- Finish rewrite testing. [Raphaël Vinot]
- Rework test cases. [Raphaël Vinot]
- Add more test cases. [Raphaël Vinot]
- Make it possible to run the tests manually. [Raphaël Vinot]
- Print error message. [Raphaël Vinot]
- Remove tests on python 3.5. [Raphaël Vinot]
- Added email-header attribute. [Tom King]
- Updated types/categories mapping. [Christophe Vandeplas]
- Open all json files as bytes before loading in json. [Raphaël Vinot]
- [MISP] update to the latest version of the describeTypes. [Alexandre
Dulaunoy]
- Bump misp-objects. [Raphaël Vinot]
- Add travis tests on python 3.7. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Add comments. [Raphaël Vinot]
Fix #242
- Bump misp-objects. [Raphaël Vinot]
- [PyMISP] describeTypes.json updated to add XMR type. [Alexandre
Dulaunoy]
Fix
~~~
- Normalizing the outputs. [Raphaël Vinot]
- Jerry rig support for old python. [Raphaël Vinot]
- Format of the describeTypes. [Alexandre Dulaunoy]
- [search.py] more example of query type added. [Alexandre Dulaunoy]
- Tests are passing fine now. [Raphaël Vinot]
- Properly validate the last-type search query. [Raphaël Vinot]
- Live test failing on list order. [Raphaël Vinot]
- Add dependency. [Raphaël Vinot]
- Py3.5 compat, take 2. [Raphaël Vinot]
- Py3.5 compat. [Raphaël Vinot]
- Opening the json blobs as bytes was buggy. [Raphaël Vinot]
- One more failing test. [Raphaël Vinot]
- Tests were failing. [Raphaël Vinot]
- Allow boolean parameters in search_index. [Raphaël Vinot]
- Typo in OpenIOC script. [Raphaël Vinot]
Fix #237
- Bad URL in get_attachment. [Raphaël Vinot]
Fix #240
- Improve error message in case the object template is unknown. [Raphaël
Vinot]
Other
~~~~~
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #271 from SHSauler/patch-4. [Raphaël Vinot]
Fix #270 uniquely identifying sample
- Fix #270 uniquely identifying sample. [Steffen Sauler]
- Fix print. [Deborah Servili]
- Merge branch 'master' of github.com:MISP/PyMISP. [Alexandre Dulaunoy]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #251 from tomking2/master. [Alexandre Dulaunoy]
chg: Added email-header attribute
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Revert "chg: Add travis tests on python 3.7" [Raphaël Vinot]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #252 from cvandeplas/master. [Christophe Vandeplas]
yara_dump - fixed private rules causing issues
- Yara_dump - fixed private rules causing issues. [Christophe Vandeplas]
v2.4.93 (2018-07-01)
--------------------
@ -17,6 +108,7 @@ New
Changes
~~~~~~~
- Bump changelog, again. [Raphaël Vinot]
- Bump changelog & version. [Raphaël Vinot]
- Moar jupyter. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]

365
docs/tutorial/Search.ipynb Normal file
View File

@ -0,0 +1,365 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'https://<URL>/'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = '<KEY>'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = True"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pymisp import PyMISP\n",
"\n",
"misp = PyMISP(misp_url, misp_key, misp_verifycert, debug=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Index Search (fast, only returns events metadata)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search unpublished events\n",
"\n",
"**WARNING**: By default, the search query will only return all the events listed on teh index page"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(published=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Get the meta data of events"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(eventid=[17217, 1717, 1721, 17218])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search Tag & mix with other parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tag='TODO:VT-ENRICHMENT')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tag='TODO:VT-ENRICHMENT', published=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tag=['!TODO:VT-ENRICHMENT', 'tlp:white'], published=False) # ! means \"not this tag\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Full text search on event info field"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(eventinfo='circl')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search in the values of each attributes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(attribute='8.8.8.8')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search by org"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(org='CIRCL')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search updated events"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(timestamp='1h')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Search full events (Slower, returns full events)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Getting timestamps"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime, date, timedelta\n",
"from dateutil.parser import parse\n",
"\n",
"int(datetime.now().timestamp())\n",
"\n",
"d = parse('2018-03-24')\n",
"int(d.timestamp())\n",
"\n",
"today = int(datetime.today().timestamp())\n",
"yesterday = int((datetime.today() - timedelta(days=1)).timestamp())\n",
"\n",
"print(today, yesterday)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(not_values='8.8.8.8')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(category='Payload delivery')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', metadata=True) # no attributes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(timestamp=['2h', '1h'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', enforceWarninglist=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', deleted=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', publish_timestamp=1521846000) # everything published since that timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', last='1d') # everything published in the last <interval>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', timestamp=[yesterday, today]) # everything updated since that timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(values='8.8.8.8', withAttachments=True) # Return attachments"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Search for attributes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(controller='attributes', values='8.8.8.8')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(controller='attributes', values='wrapper.no', event_timestamp='5d') # only consider events updated since this timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -22,11 +22,10 @@ def search(m, quiet, url, controller, out=None, **kwargs):
else:
with open(out, 'w') as f:
f.write(json.dumps(result['response']))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get all the events matching a value for a given param.')
parser.add_argument("-p", "--param", required=True, help="Parameter to search (e.g. category, org, etc.)")
parser.add_argument("-p", "--param", required=True, help="Parameter to search (e.g. category, org, values, type_attribute, etc.)")
parser.add_argument("-s", "--search", required=True, help="String to search.")
parser.add_argument("-a", "--attributes", action='store_true', help="Search attributes instead of events")
parser.add_argument("-q", "--quiet", action='store_true', help="Only display URLs to MISP")

View File

@ -21,5 +21,5 @@ if __name__ == '__main__':
misp = init(misp_url, misp_key)
sharing_groups = misp.get_sharing_groups()
print sharing_groups
print (sharing_groups)

View File

@ -17,7 +17,8 @@ def dirty_cleanup(value):
('', '"'),
('', '"'),
('`', "'"),
('\r', '')
('\r', ''),
('Rule ', 'rule ') # some people write this with the wrong case
# ('$ ', '$'), # this breaks rules
# ('\t\t', '\n'), # this breaks rules
)
@ -49,6 +50,10 @@ if 'response' in result and 'Attribute' in result['response']:
attr_cnt_changed += 1
if 'global rule' in value: # refuse any global rules as they might disable everything
continue
if 'private rule' in value: # private rules need some more rewriting
priv_rules = re.findall('private rule (\w+)', value, flags=re.MULTILINE)
for priv_rule in priv_rules:
value = re.sub(priv_rule, 'misp_e{}_{}'.format(event_id, priv_rule), value, flags=re.MULTILINE)
# compile the yara rule to confirm it's validity
# if valid, ignore duplicate rules

View File

@ -1,7 +1,8 @@
__version__ = '2.4.93'
__version__ = '2.4.95'
import logging
import functools
import warnings
import sys
FORMAT = "%(levelname)s [%(filename)s:%(lineno)s - %(funcName)s() ] %(message)s"
formatter = logging.Formatter(FORMAT)
@ -31,9 +32,9 @@ def deprecated(func):
try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat # noqa
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse # noqa
from .api import PyMISP # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
@ -41,6 +42,8 @@ try:
from .tools import openioc # noqa
from .tools import load_warninglists # noqa
from .tools import ext_lookups # noqa
if sys.version_info >= (3, 6):
from .aping import ExpandedPyMISP # noqa
logger.debug('pymisp loaded properly')
except ImportError as e:
logger.warning('Unable to load pymisp properly: {}'.format(e))

View File

@ -9,6 +9,7 @@ from json import JSONEncoder
import collections
import six # Remove that import when discarding python2 support.
import logging
from enum import Enum
from .exceptions import PyMISPInvalidFormat
@ -16,7 +17,7 @@ from .exceptions import PyMISPInvalidFormat
logger = logging.getLogger('pymisp')
if six.PY2:
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
logger.warning("You're using python 2, it is strongly recommended to use python >=3.6")
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
@ -34,6 +35,36 @@ if six.PY2:
return timedelta(0)
class Distribution(Enum):
your_organisation_only = 0
this_community_only = 1
connected_communities = 2
all_communities = 3
sharing_group = 4
inherit = 5
class ThreatLevel(Enum):
high = 1
medium = 2
low = 3
undefined = 4
class Analysis(Enum):
initial = 0
ongoing = 1
completed = 2
def _int_to_str(d):
# transform all integer back to string
for k, v in d.items():
if isinstance(v, (int, float)) and not isinstance(v, bool):
d[k] = str(v)
return d
class MISPEncode(JSONEncoder):
def default(self, obj):
@ -41,6 +72,8 @@ class MISPEncode(JSONEncoder):
return obj.jsonable()
elif isinstance(obj, datetime.datetime):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
return JSONEncoder.default(self, obj)
@ -54,6 +87,12 @@ class AbstractMISP(collections.MutableMapping):
super(AbstractMISP, self).__init__()
self.__edited = True # As we create a new object, we assume it is edited
if kwargs.get('force_timestamps') is not None:
# Ignore the edited objects and keep the timestamps.
self.__force_timestamps = True
else:
self.__force_timestamps = False
# List of classes having tags
from .mispevent import MISPAttribute, MISPEvent
self.__has_tags = (MISPAttribute, MISPEvent)
@ -111,7 +150,7 @@ class AbstractMISP(collections.MutableMapping):
elif isinstance(val, list) and len(val) == 0:
continue
if attribute == 'timestamp':
if self.edited:
if not self.__force_timestamps and self.edited:
# In order to be accepted by MISP, the timestamp of an object
# needs to be either newer, or None.
# If the current object is marked as edited, the easiest is to
@ -120,6 +159,7 @@ class AbstractMISP(collections.MutableMapping):
else:
val = self._datetime_to_timestamp(val)
to_return[attribute] = val
to_return = _int_to_str(to_return)
return to_return
def jsonable(self):
@ -183,7 +223,7 @@ class AbstractMISP(collections.MutableMapping):
"""Convert a datetime.datetime object to a timestamp (int)"""
if isinstance(d, (int, str)) or (sys.version_info < (3, 0) and isinstance(d, unicode)):
# Assume we already have a timestamp
return d
return int(d)
if sys.version_info >= (3, 3):
return int(d.timestamp())
else:
@ -204,8 +244,9 @@ class AbstractMISP(collections.MutableMapping):
misp_tag.from_dict(**kwargs)
else:
raise PyMISPInvalidFormat("The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {}".format(tag))
self.Tag.append(misp_tag)
self.edited = True
if misp_tag not in self.tags:
self.Tag.append(misp_tag)
self.edited = True
def __get_tags(self):
"""Returns a lost of tags associated to this Attribute"""
@ -218,6 +259,14 @@ class AbstractMISP(collections.MutableMapping):
else:
raise PyMISPInvalidFormat('All the attributes have to be of type MISPTag.')
def __eq__(self, other):
if isinstance(other, AbstractMISP):
return self.to_dict() == other.to_dict()
elif isinstance(other, dict):
return self.to_dict() == other
else:
return False
class MISPTag(AbstractMISP):
def __init__(self):

View File

@ -36,6 +36,10 @@ try:
except ImportError:
HAVE_REQUESTS = False
if (3, 0) <= sys.version_info < (3, 6):
OLD_PY3 = True
else:
OLD_PY3 = False
try:
from requests_futures.sessions import FuturesSession
@ -122,16 +126,19 @@ class PyMISP(object):
def get_live_query_acl(self):
"""This should return an empty list, unless the ACL is outdated."""
response = self.__prepare_request('GET', urljoin(self.root_url, 'events/queryACL.json'))
response = self._prepare_request('GET', urljoin(self.root_url, 'events/queryACL.json'))
return self._check_response(response)
def get_local_describe_types(self):
with open(os.path.join(self.resources_path, 'describeTypes.json'), 'r') as f:
describe_types = json.load(f)
with open(os.path.join(self.resources_path, 'describeTypes.json'), 'rb') as f:
if OLD_PY3:
describe_types = json.loads(f.read().decode())
else:
describe_types = json.load(f)
return describe_types['result']
def get_live_describe_types(self):
response = self.__prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json'))
response = self._prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json'))
describe_types = self._check_response(response)
if describe_types.get('error'):
for e in describe_types.get('error'):
@ -141,8 +148,8 @@ class PyMISP(object):
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
return describe_types
def __prepare_request(self, request_type, url, data=None,
background_callback=None, output_type='json'):
def _prepare_request(self, request_type, url, data=None,
background_callback=None, output_type='json'):
if logger.isEnabledFor(logging.DEBUG):
logger.debug('{} - {}'.format(request_type, url))
if data is not None:
@ -152,21 +159,22 @@ class PyMISP(object):
else:
req = requests.Request(request_type, url, data=data)
if self.asynch and background_callback is not None:
s = FuturesSession()
local_session = FuturesSession
else:
s = requests.Session()
prepped = s.prepare_request(req)
prepped.headers.update(
{'Authorization': self.key,
'Accept': 'application/{}'.format(output_type),
'content-type': 'application/{}'.format(output_type),
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
if logger.isEnabledFor(logging.DEBUG):
logger.debug(prepped.headers)
if self.asynch and background_callback is not None:
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback)
else:
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert)
local_session = requests.Session
with local_session() as s:
prepped = s.prepare_request(req)
prepped.headers.update(
{'Authorization': self.key,
'Accept': 'application/{}'.format(output_type),
'content-type': 'application/{}'.format(output_type),
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
if logger.isEnabledFor(logging.DEBUG):
logger.debug(prepped.headers)
if self.asynch and background_callback is not None:
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback)
else:
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert)
# #####################
# ### Core helpers ####
@ -218,7 +226,7 @@ class PyMISP(object):
try:
json_response = response.json()
except ValueError:
# It the server didn't return a JSON blob, we've a problem.
# If the server didn't return a JSON blob, we've a problem.
raise PyMISPError(everything_broken.format(response.request.headers, response.request.body, response.text))
errors = []
@ -314,9 +322,9 @@ class PyMISP(object):
"""
url = urljoin(self.root_url, 'events/index')
if filters is None:
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
else:
response = self.__prepare_request('POST', url, json.dumps(filters))
response = self._prepare_request('POST', url, json.dumps(filters))
return self._check_response(response)
def get_event(self, event_id):
@ -325,7 +333,7 @@ class PyMISP(object):
:param event_id: Event id to get
"""
url = urljoin(self.root_url, 'events/{}'.format(event_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def add_event(self, event):
@ -338,7 +346,7 @@ class PyMISP(object):
event = event.to_json()
elif not isinstance(event, basestring):
event = json.dumps(event)
response = self.__prepare_request('POST', url, event)
response = self._prepare_request('POST', url, event)
return self._check_response(response)
def update_attribute(self, attribute_id, attribute):
@ -352,7 +360,7 @@ class PyMISP(object):
attribute = attribute.to_json()
elif not isinstance(attribute, basestring):
attribute = json.dumps(attribute)
response = self.__prepare_request('POST', url, attribute)
response = self._prepare_request('POST', url, attribute)
return self._check_response(response)
def update_event(self, event_id, event):
@ -366,7 +374,7 @@ class PyMISP(object):
event = event.to_json()
elif not isinstance(event, basestring):
event = json.dumps(event)
response = self.__prepare_request('POST', url, event)
response = self._prepare_request('POST', url, event)
return self._check_response(response)
def delete_event(self, event_id):
@ -375,7 +383,7 @@ class PyMISP(object):
:param event_id: Event id to delete
"""
url = urljoin(self.root_url, 'events/{}'.format(event_id))
response = self.__prepare_request('DELETE', url)
response = self._prepare_request('DELETE', url)
return self._check_response(response)
def delete_attribute(self, attribute_id, hard_delete=False):
@ -384,13 +392,18 @@ class PyMISP(object):
url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id))
else:
url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def pushEventToZMQ(self, event_id):
"""Force push an event on ZMQ"""
url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id))
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def direct_call(self, url, data):
'''Very lightweight call that posts a data blob (python dictionary) on the URL'''
response = self._prepare_request('POST', url, data)
return self._check_response(response)
# ##############################################
@ -419,7 +432,7 @@ class PyMISP(object):
url = urljoin(self.root_url, 'events/publish/{}'.format(event_id))
else:
url = urljoin(self.root_url, 'events/alert/{}'.format(event_id))
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def publish(self, event, alert=True):
@ -467,7 +480,7 @@ class PyMISP(object):
raise PyMISPError('Invalid UUID')
url = urljoin(self.root_url, 'tags/attachTagToObject')
to_post = {'uuid': uuid, 'tag': tag}
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response)
def untag(self, uuid, tag):
@ -476,7 +489,7 @@ class PyMISP(object):
raise PyMISPError('Invalid UUID')
url = urljoin(self.root_url, 'tags/removeTagFromObject')
to_post = {'uuid': uuid, 'tag': tag}
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response)
# ##### File attributes #####
@ -519,8 +532,8 @@ class PyMISP(object):
data = attributes[0].to_json()
else:
data = attributes.to_json()
# __prepare_request(...) returns a requests.Response Object
resp = self.__prepare_request('POST', url, json.dumps(data, cls=MISPEncode))
# _prepare_request(...) returns a requests.Response Object
resp = self._prepare_request('POST', url, json.dumps(data, cls=MISPEncode))
try:
responses.append(resp.json())
except Exception:
@ -766,6 +779,10 @@ class PyMISP(object):
"""Add an email atachment"""
return self.add_named_attribute(event, 'email-attachment', email, category, to_ids, comment, distribution, proposal, **kwargs)
def add_email_header(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False, **kwargs):
"""Add an email header"""
return self.add_named_attribute(event, 'email-header', email, category, to_ids, comment, distribution, proposal, **kwargs)
# ##### Target attributes #####
def add_target_email(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False, **kwargs):
@ -835,7 +852,7 @@ class PyMISP(object):
# ##################################################
def _prepare_upload(self, event_id, distribution, to_ids, category, comment, info,
analysis, threat_level_id):
analysis, threat_level_id, advanced_extraction):
"""Helper to prepare a sample to upload"""
to_post = {'request': {}}
@ -865,6 +882,7 @@ class PyMISP(object):
to_post['request']['category'] = category
to_post['request']['comment'] = comment
to_post['request']['advanced'] = 1 if advanced_extraction else 0
return to_post, event_id
def _encode_file_to_upload(self, filepath_or_bytes):
@ -881,19 +899,21 @@ class PyMISP(object):
def upload_sample(self, filename, filepath_or_bytes, event_id, distribution=None,
to_ids=True, category=None, comment=None, info=None,
analysis=None, threat_level_id=None):
analysis=None, threat_level_id=None, advanced_extraction=False):
"""Upload a sample"""
to_post, event_id = self._prepare_upload(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
comment, info, analysis, threat_level_id,
advanced_extraction)
to_post['request']['files'] = [{'filename': filename, 'data': self._encode_file_to_upload(filepath_or_bytes)}]
return self._upload_sample(to_post, event_id)
def upload_samplelist(self, filepaths, event_id, distribution=None,
to_ids=True, category=None, comment=None, info=None,
analysis=None, threat_level_id=None):
analysis=None, threat_level_id=None, advanced_extraction=False):
"""Upload a list of samples"""
to_post, event_id = self._prepare_upload(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
comment, info, analysis, threat_level_id,
advanced_extraction)
files = []
for path in filepaths:
if not os.path.isfile(path):
@ -908,7 +928,7 @@ class PyMISP(object):
url = urljoin(self.root_url, 'events/upload_sample')
else:
url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id))
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response)
# ############################
@ -920,11 +940,11 @@ class PyMISP(object):
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
if path in ['add', 'edit']:
query = {'request': {'ShadowAttribute': attribute}}
response = self.__prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
response = self._prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
elif path == 'view':
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
else: # accept or discard
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def proposal_view(self, event_id=None, proposal_id=None):
@ -995,14 +1015,14 @@ class PyMISP(object):
"""Helper to prepare a search query"""
if query.get('error') is not None:
return query
if controller not in ['events', 'attributes']:
raise Exception('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes'])))
if controller not in ['events', 'attributes', 'objects']:
raise ValueError('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes', 'objects'])))
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
if ASYNC_OK and async_callback:
response = self.__prepare_request('POST', url, json.dumps(query), async_callback)
response = self._prepare_request('POST', url, json.dumps(query), async_callback)
else:
response = self.__prepare_request('POST', url, json.dumps(query))
response = self._prepare_request('POST', url, json.dumps(query))
return self._check_response(response)
def search_index(self, published=None, eventid=None, tag=None, datefrom=None,
@ -1043,9 +1063,11 @@ class PyMISP(object):
if allowed.get(rule) is None:
continue
param = allowed[rule]
if isinstance(param, bool):
param = int(param)
if not isinstance(param, list):
param = [param]
param = [x for x in map(str, param)]
# param = [x for x in map(str, param)]
if rule in rule_levels:
if not set(param).issubset(rule_levels[rule]):
raise SearchError('Values in your {} are invalid, has to be in {}'.format(rule, ', '.join(str(x) for x in rule_levels[rule])))
@ -1053,9 +1075,9 @@ class PyMISP(object):
url = urljoin(self.root_url, buildup_url)
if self.asynch and async_callback:
response = self.__prepare_request('POST', url, json.dumps(to_post), async_callback)
response = self._prepare_request('POST', url, json.dumps(to_post), async_callback)
else:
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
res = self._check_response(response)
if normalize:
to_return = {'response': []}
@ -1185,8 +1207,8 @@ class PyMISP(object):
:param attribute_id: Attribute ID to fetched
"""
url = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(attribute_id))
response = self.__prepare_request('GET', url)
url = urljoin(self.root_url, 'attributes/download/{}'.format(attribute_id))
response = self._prepare_request('GET', url)
try:
response.json()
# The query fails, response contains a json blob
@ -1199,7 +1221,7 @@ class PyMISP(object):
"""Get the yara rules from an event"""
url = urljoin(self.root_url, 'attributes/restSearch')
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
response = self.__prepare_request('POST', url, data=json.dumps(to_post))
response = self._prepare_request('POST', url, data=json.dumps(to_post))
result = self._check_response(response)
if result.get('error') is not None:
return False, result.get('error')
@ -1209,10 +1231,18 @@ class PyMISP(object):
return True, rules
def download_samples(self, sample_hash=None, event_id=None, all_samples=False, unzip=True):
"""Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch"""
"""Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch
:param sample_hash: hash of sample
:param event_id: ID of event
:param all_samples: download all samples
:param unzip: whether to unzip or keep zipped
:return: A tuple with (success, [[event_id, sample_hash, sample_as_bytesio], [event_id,...]])
In case of legacy sample, the sample_hash will be replaced by the zip's filename
"""
url = urljoin(self.root_url, 'attributes/downloadSample')
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
response = self.__prepare_request('POST', url, data=json.dumps(to_post))
response = self._prepare_request('POST', url, data=json.dumps(to_post))
result = self._check_response(response)
if result.get('error') is not None:
return False, result.get('error')
@ -1228,10 +1258,11 @@ class PyMISP(object):
if f.get('md5') and f['md5'] in archive.namelist():
# New format
unzipped = BytesIO(archive.open(f['md5'], pwd=b'infected').read())
details.append([f['event_id'], f['md5'], unzipped])
else:
# Old format
unzipped = BytesIO(archive.open(f['filename'], pwd=b'infected').read())
details.append([f['event_id'], f['filename'], unzipped])
details.append([f['event_id'], f['filename'], unzipped])
except zipfile.BadZipfile:
# In case the sample isn't zipped
details.append([f['event_id'], f['filename'], zipped])
@ -1279,7 +1310,7 @@ class PyMISP(object):
def get_all_tags(self, quiet=False):
"""Get all the tags used on the instance"""
url = urljoin(self.root_url, 'tags')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
r = self._check_response(response)
if not quiet or r.get('errors'):
return r
@ -1293,7 +1324,7 @@ class PyMISP(object):
"""Create a new tag"""
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable, 'hide_tag': hide_tag}}
url = urljoin(self.root_url, 'tags/add')
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response)
# ########## Version ##########
@ -1314,13 +1345,13 @@ class PyMISP(object):
def get_recommended_api_version(self):
"""Returns the recommended API version from the server"""
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_version(self):
"""Returns the version of the instance."""
url = urljoin(self.root_url, 'servers/getVersion.json')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_version_master(self):
@ -1342,7 +1373,7 @@ class PyMISP(object):
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage))
else:
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_tags_statistics(self, percentage=None, name_sort=None):
@ -1356,7 +1387,7 @@ class PyMISP(object):
else:
name_sort = 'false'
url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
# ############## Sightings ##################
@ -1364,13 +1395,13 @@ class PyMISP(object):
def sighting_per_id(self, attribute_id):
"""Add a sighting to an attribute (by attribute ID)"""
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id))
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def sighting_per_uuid(self, attribute_uuid):
"""Add a sighting to an attribute (by attribute UUID)"""
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid))
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def set_sightings(self, sightings):
@ -1383,12 +1414,12 @@ class PyMISP(object):
elif isinstance(sighting, dict):
to_post = json.dumps(sighting)
url = urljoin(self.root_url, 'sightings/add/')
response = self.__prepare_request('POST', url, to_post)
response = self._prepare_request('POST', url, to_post)
return self._check_response(response)
def sighting_per_json(self, json_file):
"""Push a sighting (JSON file)"""
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
return self.set_sightings(jdata)
@ -1433,7 +1464,7 @@ class PyMISP(object):
org_id = ""
uri = 'sightings/listSightings/{}/{}/{}'.format(element_id, scope, org_id)
url = urljoin(self.root_url, uri)
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
# ############## Sharing Groups ##################
@ -1441,7 +1472,7 @@ class PyMISP(object):
def get_sharing_groups(self):
"""Get the existing sharing groups"""
url = urljoin(self.root_url, 'sharing_groups.json')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)['response']
# ############## Users ##################
@ -1458,7 +1489,7 @@ class PyMISP(object):
return self._rest_add('admin/users', new_user)
def add_user_json(self, json_file):
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
new_user = MISPUser()
new_user.from_dict(**jdata)
@ -1473,7 +1504,7 @@ class PyMISP(object):
return self._rest_edit('admin/users', edit_user, user_id)
def edit_user_json(self, json_file, user_id):
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
new_user = MISPUser()
new_user.from_dict(**jdata)
@ -1503,7 +1534,7 @@ class PyMISP(object):
return self._rest_add('admin/organisations', new_org)
def add_organisation_json(self, json_file):
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
new_org = MISPOrganisation()
new_org.from_dict(**jdata)
@ -1518,7 +1549,7 @@ class PyMISP(object):
return self._rest_edit('admin/organisations', edit_org, org_id)
def edit_organisation_json(self, json_file, org_id):
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
edit_org = MISPOrganisation()
edit_org.from_dict(**jdata)
@ -1588,14 +1619,14 @@ class PyMISP(object):
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
submitted_client_cert, None, None)
url = urljoin(self.root_url, 'servers/add')
response = self.__prepare_request('POST', url, json.dumps(new_server))
response = self._prepare_request('POST', url, json.dumps(new_server))
return self._check_response(response)
def add_server_json(self, json_file):
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
url = urljoin(self.root_url, 'servers/add')
response = self.__prepare_request('POST', url, json.dumps(jdata))
response = self._prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response)
def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False,
@ -1605,14 +1636,33 @@ class PyMISP(object):
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
submitted_client_cert, delete_cert, delete_client_cert)
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
response = self.__prepare_request('POST', url, json.dumps(new_server))
response = self._prepare_request('POST', url, json.dumps(new_server))
return self._check_response(response)
def edit_server_json(self, json_file, server_id):
with open(json_file, 'r') as f:
with open(json_file, 'rb') as f:
jdata = json.load(f)
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
response = self.__prepare_request('POST', url, json.dumps(jdata))
response = self._prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response)
def server_pull(self, server_id, event_id=None):
url = urljoin(self.root_url, 'servers/pull/{}'.format(server_id))
if event_id is not None:
url += '/{}'.format(event_id)
response = self._prepare_request('GET', url)
return self._check_response(response)
def server_push(self, server_id, event_id=None):
url = urljoin(self.root_url, 'servers/push/{}'.format(server_id))
if event_id is not None:
url += '/{}'.format(event_id)
response = self._prepare_request('GET', url)
return self._check_response(response)
def servers_index(self):
url = urljoin(self.root_url, 'servers/index')
response = self._prepare_request('GET', url)
return self._check_response(response)
# ############## Roles ##################
@ -1620,51 +1670,228 @@ class PyMISP(object):
def get_roles_list(self):
"""Get the list of existing roles"""
url = urljoin(self.root_url, '/roles')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)['response']
# ############## Tags ##################
def get_tags_list(self):
"""Get the list of existing tags"""
"""Get the list of existing tags."""
url = urljoin(self.root_url, '/tags')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)['Tag']
def get_tag(self, tag_id):
"""Get a tag by id."""
url = urljoin(self.root_url, '/tags/view/{}'.format(tag_id))
response = self._prepare_request('GET', url)
return self._check_response(response)
def _set_tag_parameters(self, name, colour, exportable, hide_tag, org_id, count, user_id, numerical_value,
attribute_count, old_tag):
tag = old_tag
if name is not None:
tag['name'] = name
if colour is not None:
tag['colour'] = colour
if exportable is not None:
tag['exportable'] = exportable
if hide_tag is not None:
tag['hide_tag'] = hide_tag
if org_id is not None:
tag['org_id'] = org_id
if count is not None:
tag['count'] = count
if user_id is not None:
tag['user_id'] = user_id
if numerical_value is not None:
tag['numerical_value'] = numerical_value
if attribute_count is not None:
tag['attribute_count'] = attribute_count
return {'Tag': tag}
def edit_tag(self, tag_id, name=None, colour=None, exportable=None, hide_tag=None, org_id=None, count=None,
user_id=None, numerical_value=None, attribute_count=None):
"""Edit only the provided parameters of a tag."""
old_tag = self.get_tag(tag_id)
new_tag = self._set_tag_parameters(name, colour, exportable, hide_tag, org_id, count, user_id,
numerical_value, attribute_count, old_tag)
url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id))
response = self._prepare_request('POST', url, json.dumps(new_tag))
return self._check_response(response)
def edit_tag_json(self, json_file, tag_id):
"""Edit the tag using a json file."""
with open(json_file, 'rb') as f:
jdata = json.load(f)
url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id))
response = self._prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response)
def enable_tag(self, tag_id):
"""Enable a tag by id."""
response = self.edit_tag(tag_id, hide_tag=False)
return response
def disable_tag(self, tag_id):
"""Disable a tag by id."""
response = self.edit_tag(tag_id, hide_tag=True)
return response
# ############## Taxonomies ##################
def get_taxonomies_list(self):
"""Get all the taxonomies."""
url = urljoin(self.root_url, '/taxonomies')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_taxonomy(self, taxonomy_id):
"""Get a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def update_taxonomies(self):
"""Update all the taxonomies."""
url = urljoin(self.root_url, '/taxonomies/update')
response = self._prepare_request('POST', url)
return self._check_response(response)
def enable_taxonomy(self, taxonomy_id):
"""Enable a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/enable/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)
def disable_taxonomy(self, taxonomy_id):
"""Disable a taxonomy by id."""
self.disable_taxonomy_tags(taxonomy_id)
url = urljoin(self.root_url, '/taxonomies/disable/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)
def get_taxonomy_tags_list(self, taxonomy_id):
"""Get all the tags of a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id))
response = self._prepare_request('GET', url)
return self._check_response(response)["entries"]
def enable_taxonomy_tags(self, taxonomy_id):
"""Enable all the tags of a taxonomy by id."""
enabled = self.get_taxonomy(taxonomy_id)['Taxonomy']['enabled']
if enabled:
url = urljoin(self.root_url, '/taxonomies/addTag/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)
def disable_taxonomy_tags(self, taxonomy_id):
"""Disable all the tags of a taxonomy by id."""
url = urljoin(self.root_url, '/taxonomies/disableTag/{}'.format(taxonomy_id))
response = self._prepare_request('POST', url)
return self._check_response(response)
# ############## WarningLists ##################
def get_warninglists(self):
"""Get all the warninglists."""
url = urljoin(self.root_url, '/warninglists')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_warninglist(self, warninglist_id):
"""Get a warninglist by id."""
url = urljoin(self.root_url, '/warninglists/view/{}'.format(warninglist_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def update_warninglists(self):
"""Update all the warninglists."""
url = urljoin(self.root_url, '/warninglists/update')
response = self._prepare_request('POST', url)
return self._check_response(response)
def toggle_warninglist(self, warninglist_id=None, warninglist_name=None, force_enable=None):
'''Toggle (enable/disable) the status of a warninglist by ID.
:param warninglist_id: ID of the WarningList
:param force_enable: Force the warning list in the enabled state (does nothing if already enabled)
'''
if warninglist_id is None and warninglist_name is None:
raise Exception('Either warninglist_id or warninglist_name is required.')
query = {}
if warninglist_id is not None:
if not isinstance(warninglist_id, list):
warninglist_id = [warninglist_id]
query['id'] = warninglist_id
if warninglist_name is not None:
if not isinstance(warninglist_name, list):
warninglist_name = [warninglist_name]
query['name'] = warninglist_name
if force_enable is not None:
query['enabled'] = force_enable
url = urljoin(self.root_url, '/warninglists/toggleEnable')
response = self._prepare_request('POST', url, json.dumps(query))
return self._check_response(response)
def enable_warninglist(self, warninglist_id):
"""Enable a warninglist by id."""
return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=True)
def disable_warninglist(self, warninglist_id):
"""Disable a warninglist by id."""
return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False)
# ############## NoticeLists ##################
def get_noticelists(self):
"""Get all the noticelists."""
url = urljoin(self.root_url, '/noticelists')
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_noticelist(self, noticelist_id):
"""Get a noticelist by id."""
url = urljoin(self.root_url, '/noticelists/view/{}'.format(noticelist_id))
response = self._prepare_request('GET', url)
return self._check_response(response)
def update_noticelists(self):
"""Update all the noticelists."""
url = urljoin(self.root_url, '/noticelists/update')
response = self._prepare_request('POST', url)
return self._check_response(response)
def enable_noticelist(self, noticelist_id):
"""Enable a noticelist by id."""
url = urljoin(self.root_url, '/noticelists/enableNoticelist/{}/true'.format(noticelist_id))
response = self._prepare_request('POST', url)
return self._check_response(response)
def disable_noticelist(self, noticelist_id):
"""Disable a noticelist by id."""
url = urljoin(self.root_url, '/noticelists/enableNoticelist/{}'.format(noticelist_id))
response = self._prepare_request('POST', url)
return self._check_response(response)
# ############## Galaxies/Clusters ##################
def get_galaxies(self):
"""Get all the galaxies."""
url = urljoin(self.root_url, '/galaxies')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_galaxy(self, galaxy_id):
"""Get a galaxy by id."""
url = urljoin(self.root_url, '/galaxies/view/{}'.format(galaxy_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def update_galaxies(self):
"""Update all the galaxies."""
url = urljoin(self.root_url, '/galaxies/update')
response = self._prepare_request('POST', url)
return self._check_response(response)
# ##############################################
@ -1676,7 +1903,7 @@ class PyMISP(object):
def download_all_suricata(self):
"""Download all suricata rules events."""
url = urljoin(self.root_url, 'events/nids/suricata/download')
response = self.__prepare_request('GET', url, output_type='rules')
response = self._prepare_request('GET', url, output_type='rules')
return response
def download_suricata_rule_event(self, event_id):
@ -1685,7 +1912,7 @@ class PyMISP(object):
:param event_id: ID of the event to download (same as get)
"""
url = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
response = self.__prepare_request('GET', url, output_type='rules')
response = self._prepare_request('GET', url, output_type='rules')
return response
# ############## Text ###############
@ -1693,7 +1920,7 @@ class PyMISP(object):
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
response = self.__prepare_request('GET', url, output_type='txt')
response = self._prepare_request('GET', url, output_type='txt')
return response
# ############## STIX ##############
@ -1706,7 +1933,7 @@ class PyMISP(object):
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
event_id, with_attachments, tags, from_date, to_date))
logger.debug("Getting STIX event from %s", url)
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def get_stix(self, **kwargs):
@ -1741,9 +1968,9 @@ class PyMISP(object):
if last:
to_post['last'] = last
if to_post:
response = self.__prepare_request('POST', url, json.dumps(to_post), output_type='json')
response = self._prepare_request('POST', url, json.dumps(to_post), output_type='json')
else:
response = self.__prepare_request('POST', url, output_type='json')
response = self._prepare_request('POST', url, output_type='json')
return response.text
# #######################################
@ -1752,32 +1979,32 @@ class PyMISP(object):
def _rest_list(self, urlpath):
url = urljoin(self.root_url, urlpath)
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def _rest_get_parameters(self, urlpath):
url = urljoin(self.root_url, '{}/add'.format(urlpath))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def _rest_view(self, urlpath, rest_id):
url = urljoin(self.root_url, '{}/view/{}'.format(urlpath, rest_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def _rest_add(self, urlpath, obj):
url = urljoin(self.root_url, '{}/add'.format(urlpath))
response = self.__prepare_request('POST', url, obj.to_json())
response = self._prepare_request('POST', url, obj.to_json())
return self._check_response(response)
def _rest_edit(self, urlpath, obj, rest_id):
url = urljoin(self.root_url, '{}/edit/{}'.format(urlpath, rest_id))
response = self.__prepare_request('POST', url, obj.to_json())
response = self._prepare_request('POST', url, obj.to_json())
return self._check_response(response)
def _rest_delete(self, urlpath, rest_id):
url = urljoin(self.root_url, '{}/delete/{}'.format(urlpath, rest_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
# ###########################
@ -1815,37 +2042,37 @@ class PyMISP(object):
def fetch_feed(self, feed_id):
"""Fetch one single feed"""
url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def cache_feeds_all(self):
""" Cache all the feeds"""
url = urljoin(self.root_url, 'feeds/cacheFeeds/all')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def cache_feed(self, feed_id):
"""Cache a specific feed"""
url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id))
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def cache_feeds_freetext(self):
"""Cache all the freetext feeds"""
url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def cache_feeds_misp(self):
"""Cache all the MISP feeds"""
url = urljoin(self.root_url, 'feeds/cacheFeeds/misp')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
def compare_feeds(self):
"""Generate the comparison matrix for all the MISP feeds"""
url = urljoin(self.root_url, 'feeds/compareFeeds')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)
@deprecated
@ -1875,7 +2102,7 @@ class PyMISP(object):
'''
to_jsonify = {'sg_id': sharing_group, 'org_id': organisation, 'extend': extend}
url = urljoin(self.root_url, '/sharingGroups/addOrg')
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
return self._check_response(response)
def sharing_group_org_remove(self, sharing_group, organisation):
@ -1885,7 +2112,7 @@ class PyMISP(object):
'''
to_jsonify = {'sg_id': sharing_group, 'org_id': organisation}
url = urljoin(self.root_url, '/sharingGroups/removeOrg')
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
return self._check_response(response)
def sharing_group_server_add(self, sharing_group, server, all_orgs=False):
@ -1896,7 +2123,7 @@ class PyMISP(object):
'''
to_jsonify = {'sg_id': sharing_group, 'server_id': server, 'all_orgs': all_orgs}
url = urljoin(self.root_url, '/sharingGroups/addServer')
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
return self._check_response(response)
def sharing_group_server_remove(self, sharing_group, server):
@ -1906,7 +2133,7 @@ class PyMISP(object):
'''
to_jsonify = {'sg_id': sharing_group, 'server_id': server}
url = urljoin(self.root_url, '/sharingGroups/removeServer')
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
return self._check_response(response)
# ###################
@ -1933,7 +2160,7 @@ class PyMISP(object):
url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id))
else:
url = urljoin(self.root_url, 'objects/add/{}'.format(event_id))
response = self.__prepare_request('POST', url, misp_object.to_json())
response = self._prepare_request('POST', url, misp_object.to_json())
return self._check_response(response)
def edit_object(self, misp_object, object_id=None):
@ -1947,31 +2174,31 @@ class PyMISP(object):
else:
raise PyMISPError('In order to update an object, you have to provide an object ID (either in the misp_object, or as a parameter)')
url = urljoin(self.root_url, 'objects/edit/{}'.format(param))
response = self.__prepare_request('POST', url, misp_object.to_json())
response = self._prepare_request('POST', url, misp_object.to_json())
return self._check_response(response)
def delete_object(self, id):
"""Deletes an object"""
url = urljoin(self.root_url, 'objects/delete/{}'.format(id))
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def add_object_reference(self, misp_object_reference):
"""Add a reference to an object"""
url = urljoin(self.root_url, 'object_references/add')
response = self.__prepare_request('POST', url, misp_object_reference.to_json())
response = self._prepare_request('POST', url, misp_object_reference.to_json())
return self._check_response(response)
def delete_object_reference(self, id):
"""Deletes a reference to an object"""
url = urljoin(self.root_url, 'object_references/delete/{}'.format(id))
response = self.__prepare_request('POST', url)
response = self._prepare_request('POST', url)
return self._check_response(response)
def get_object_templates_list(self):
"""Returns the list of Object templates available on the MISP instance"""
url = urljoin(self.root_url, 'objectTemplates')
response = self.__prepare_request('GET', url)
response = self._prepare_request('GET', url)
return self._check_response(response)['response']
def get_object_template_id(self, object_uuid):
@ -1982,6 +2209,11 @@ class PyMISP(object):
return t['ObjectTemplate']['id']
raise Exception('Unable to find template uuid {} on the MISP instance'.format(object_uuid))
def update_object_templates(self):
url = urljoin(self.root_url, '/objectTemplates/update')
response = self._prepare_request('POST', url)
return self._check_response(response)
# ###########################
# ####### Deprecated ########
# ###########################
@ -1998,7 +2230,7 @@ class PyMISP(object):
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
path = 'events/addTag'
url = urljoin(self.root_url, path)
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response)
@deprecated
@ -2010,5 +2242,5 @@ class PyMISP(object):
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
path = 'events/removeTag'
url = urljoin(self.root_url, path)
response = self.__prepare_request('POST', url, json.dumps(to_post))
response = self._prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response)

373
pymisp/aping.py Normal file
View File

@ -0,0 +1,373 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse
from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute
from typing import TypeVar, Optional, Tuple, List, Dict
from datetime import date, datetime
import json
import csv
import logging
from urllib.parse import urljoin
SearchType = TypeVar('SearchType', str, int)
# str: string to search / list: values to search (OR) / dict: {'OR': [list], 'NOT': [list], 'AND': [list]}
SearchParameterTypes = TypeVar('SearchParameterTypes', str, List[SearchType], Dict[str, SearchType])
DateTypes = TypeVar('DateTypes', datetime, date, SearchType, float)
DateInterval = TypeVar('DateInterval', DateTypes, Tuple[DateTypes, DateTypes])
logger = logging.getLogger('pymisp')
class ExpandedPyMISP(PyMISP):
def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None,
and_parameters: Optional[List[SearchType]]=None,
not_parameters: Optional[List[SearchType]]=None):
to_return = {}
if and_parameters:
to_return['AND'] = and_parameters
if not_parameters:
to_return['NOT'] = not_parameters
if or_parameters:
to_return['OR'] = or_parameters
return to_return
def toggle_warninglist(self, warninglist_id: List[int]=None, warninglist_name: List[str]=None, force_enable: bool=None):
'''Toggle (enable/disable) the status of a warninglist by ID.
:param warninglist_id: ID of the WarningList
:param force_enable: Force the warning list in the enabled state (does nothing is already enabled)
'''
return super().toggle_warninglist(warninglist_id, warninglist_name, force_enable)
def make_timestamp(self, value: DateTypes):
if isinstance(value, datetime):
return datetime.timestamp()
elif isinstance(value, date):
return datetime.combine(value, datetime.max.time()).timestamp()
elif isinstance(value, str):
if value.isdigit():
return value
else:
try:
float(value)
return value
except ValueError:
# The value can also be '1d', '10h', ...
return value
else:
return value
def _check_response(self, response):
"""Check if the response from the server is not an unexpected error"""
if response.status_code >= 500:
logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text))
raise MISPServerError('Error code 500:\n{}'.format(response.text))
elif 400 <= response.status_code < 500:
# The server returns a json message with the error details
error_message = response.json()
logger.error(f'Something went wrong ({response.status_code}): {error_message}')
return {'errors': [(response.status_code, error_message)]}
# At this point, we had no error.
try:
response = response.json()
if logger.isEnabledFor(logging.DEBUG):
logger.debug(response)
if isinstance(response, dict) and response.get('response') is not None:
# Cleanup.
return response.get('response')
return response
except Exception:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(response.text)
return response.text
def get_event(self, event_id: int):
event = super().get_event(event_id)
e = MISPEvent()
e.load(event)
return e
def add_event(self, event: MISPEvent):
created_event = super().add_event(event)
if isinstance(created_event, str):
raise NewEventError(f'Unexpected response from server: {created_event}')
e = MISPEvent()
e.load(created_event)
return e
def update_event(self, event: MISPEvent):
updated_event = super().update_event(event.uuid, event)
if isinstance(updated_event, str):
raise UpdateEventError(f'Unexpected response from server: {updated_event}')
e = MISPEvent()
e.load(updated_event)
return e
def update_attribute(self, attribute: MISPAttribute):
updated_attribute = super().update_attribute(attribute.uuid, attribute)
if isinstance(updated_attribute, str):
raise UpdateAttributeError(f'Unexpected response from server: {updated_attribute}')
a = MISPAttribute()
a.from_dict(**updated_attribute)
return a
# TODO: Make that thing async & test it.
def search(self, controller: str='events', return_format: str='json',
value: Optional[SearchParameterTypes]=None,
type_attribute: Optional[SearchParameterTypes]=None,
category: Optional[SearchParameterTypes]=None,
org: Optional[SearchParameterTypes]=None,
tags: Optional[SearchParameterTypes]=None,
quickfilter: Optional[bool]=None,
date_from: Optional[DateTypes]=None,
date_to: Optional[DateTypes]=None,
eventid: Optional[SearchType]=None,
with_attachments: Optional[bool]=None, withAttachments: Optional[bool]=None,
metadata: Optional[bool]=None,
uuid: Optional[str]=None,
publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None,
timestamp: Optional[DateInterval]=None,
published: Optional[bool]=None,
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
to_ids: Optional[str]=None,
deleted: Optional[str]=None,
include_event_uuid: Optional[str]=None, includeEventUuid: Optional[str]=None,
event_timestamp: Optional[DateTypes]=None,
sg_reference_only: Optional[bool]=None,
eventinfo: Optional[str]=None,
searchall: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
Search in the MISP instance
:param returnFormat: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload.
:param value: Search for the given value in the attributes' value field.
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
:param category: The attribute category, any valid MISP attribute category is accepted.
:param org: Search by the creator organisation by supplying the organisation identifier.
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param quickfilter: If set it makes the search ignore all of the other arguments, except for the auth key and value. MISP will return all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value fields, or in the attribute comment.
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param eventid: The events that should be included / excluded from the search
:param with_attachments: If set, encodes the attachments / zipped malware samples as base64 in the data field within each attribute
:param metadata: Only the metadata (event, tags, relations) is returned, attributes and proposals are omitted.
:param uuid: Restrict the results by uuid.
:param publish_timestamp: Restrict the results by the last publish timestamp (newer than).
:param timestamp: Restrict the results by the timestamp (last edit). Any event with a timestamp newer than the given timestamp will be returned. In case you are dealing with /attributes as scope, the attribute's timestamp will be used for the lookup.
:param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both.
:param enforce_warninglist: Remove any attributes from the result that would cause a hit on a warninglist entry.
:param to_ids: By default (0) all attributes are returned that match the other filter parameters, irregardless of their to_ids setting. To restrict the returned data set to to_ids only attributes set this parameter to 1. You can only use the special "exclude" setting to only return attributes that have the to_ids flag disabled.
:param deleted: If this parameter is set to 1, it will return soft-deleted attributes along with active ones. By using "only" as a parameter it will limit the returned data set to soft-deleted data only.
:param include_event_uuid: Instead of just including the event ID, also include the event UUID in each of the attributes.
:param event_timestamp: Only return attributes from events that have received a modification after the given timestamp.
:param sg_reference_only: If this flag is set, sharing group objects will not be included, instead only the sharing group ID is set.
:param eventinfo: Filter on the event's info field.
:param searchall: Search for a full or a substring (delimited by % for substrings) in the event info, event tags, attribute tags, attribute values or attribute comment fields.
:param pythonify: Returns a list of PyMISP Objects the the plain json output. Warning: it might use a lot of RAM
Deprecated:
:param withAttachments: synonym for with_attachments
:param last: synonym for publish_timestamp
:param enforceWarninglist: synonym for enforce_warninglist
:param includeEventUuid: synonym for include_event_uuid
'''
if controller not in ['events', 'attributes', 'objects']:
raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects'])))
# Deprecated stuff / synonyms
if withAttachments is not None:
with_attachments = withAttachments
if last is not None:
publish_timestamp = last
if enforceWarninglist is not None:
enforce_warninglist = enforceWarninglist
if includeEventUuid is not None:
include_event_uuid = includeEventUuid
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
query = kwargs
if return_format is not None:
if return_format not in ['json', 'xml', 'openioc', 'suricata', 'snort']:
raise ValueError('return_format has to be in {}'.format(', '.join(['json', 'xml', 'openioc', 'suricata', 'snort'])))
query['returnFormat'] = return_format
if value is not None:
query['value'] = value
if type_attribute is not None:
query['type'] = type_attribute
if category is not None:
query['category'] = category
if org is not None:
query['org'] = org
if tags is not None:
query['tags'] = tags
if quickfilter is not None:
query['quickfilter'] = quickfilter
if date_from is not None:
query['from'] = self.make_timestamp(date_from)
if date_to is not None:
query['to'] = self.make_timestamp(date_to)
if eventid is not None:
query['eventid'] = eventid
if with_attachments is not None:
query['withAttachments'] = with_attachments
if metadata is not None:
query['metadata'] = metadata
if uuid is not None:
query['uuid'] = uuid
if publish_timestamp is not None:
if isinstance(publish_timestamp, (list, tuple)):
query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
else:
query['publish_timestamp'] = self.make_timestamp(publish_timestamp)
if timestamp is not None:
if isinstance(timestamp, (list, tuple)):
query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1]))
else:
query['timestamp'] = self.make_timestamp(timestamp)
if published is not None:
query['published'] = published
if enforce_warninglist is not None:
query['enforceWarninglist'] = enforce_warninglist
if to_ids is not None:
if str(to_ids) not in ['0', '1', 'exclude']:
raise ValueError('to_ids has to be in {}'.format(', '.join(['0', '1', 'exclude'])))
query['to_ids'] = to_ids
if deleted is not None:
query['deleted'] = deleted
if include_event_uuid is not None:
query['includeEventUuid'] = include_event_uuid
if event_timestamp is not None:
if isinstance(event_timestamp, (list, tuple)):
query['event_timestamp'] = (self.make_timestamp(event_timestamp[0]), self.make_timestamp(event_timestamp[1]))
else:
query['event_timestamp'] = self.make_timestamp(event_timestamp)
if sg_reference_only is not None:
query['sgReferenceOnly'] = sg_reference_only
if eventinfo is not None:
query['eventinfo'] = eventinfo
if searchall is not None:
query['searchall'] = searchall
url = urljoin(self.root_url, f'{controller}/restSearch')
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
return normalized_response
elif return_format == 'json' and pythonify:
# The response is in json, we can convert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
for e in normalized_response:
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
to_return.append(ma)
elif controller == 'objects':
raise PyMISPNotImplementedYet('Not implemented yet')
return to_return
else:
return normalized_response
def get_csv(self,
eventid: Optional[SearchType]=None,
ignore: Optional[bool]=None,
tags: Optional[SearchParameterTypes]=None,
category: Optional[SearchParameterTypes]=None,
type_attribute: Optional[SearchParameterTypes]=None,
include_context: Optional[bool]=None, includeContext: Optional[bool]=None,
date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None,
publish_timestamp: Optional[DateInterval]=None, # converted internally to last (consistent with search)
headerless: Optional[bool]=None,
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
Get MISP data in CSV format.
:param eventid: Restrict the download to a single event
:param ignore: If true, the response includes attributes without the to_ids flag
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param category: The attribute category, any valid MISP attribute category is accepted.
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
:param include_context: Include the event data with each attribute.
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param publish_timestamp: Events published within the last x amount of time. This filter will use the published timestamp of the event.
:param headerless: The CSV created when this setting is set to true will not contain the header row.
:param enforceWarninglist: All attributes that have a hit on a warninglist will be excluded.
:param pythonify: Returns a list of dictionaries instead of the plain CSV
'''
# Deprecated stuff / synonyms
if includeContext is not None:
include_context = includeContext
if enforceWarninglist is not None:
enforce_warninglist = enforceWarninglist
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
query = kwargs
if eventid is not None:
query['eventid'] = eventid
if ignore is not None:
query['ignore'] = ignore
if tags is not None:
query['tags'] = tags
if category is not None:
query['category'] = category
if type_attribute is not None:
query['type'] = type_attribute
if include_context is not None:
query['includeContext'] = include_context
if date_from is not None:
query['from'] = self.make_timestamp(date_from)
if date_to is not None:
query['to'] = self.make_timestamp(date_to)
if publish_timestamp is not None:
if isinstance(publish_timestamp, (list, tuple)):
query['last'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
else:
query['last'] = self.make_timestamp(publish_timestamp)
if headerless is not None:
query['headerless'] = headerless
if enforce_warninglist is not None:
query['enforceWarninglist'] = enforce_warninglist
url = urljoin(self.root_url, '/events/csv/download/')
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str):
if pythonify and not headerless:
# Make it a list of dict
fieldnames, lines = normalized_response.split('\n', 1)
fieldnames = fieldnames.split(',')
to_return = []
for line in csv.reader(lines.split('\n')):
if line:
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
return to_return
return normalized_response
elif isinstance(normalized_response, dict):
# The server returned a dictionary, it contains the error message.
logger.critical(f'The server should have returned a CSV file as text. instead it returned an error message:\n{normalized_response}')
return normalized_response
else:
# Should not happen...
raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}')

File diff suppressed because it is too large Load Diff

@ -1 +1 @@
Subproject commit e9fd65cecb028dbc8ed54894c469de7d352469bb
Subproject commit 38071f4bd9e3de1138a096cbbf66089f5105d798

View File

@ -11,10 +11,18 @@ class NewEventError(PyMISPError):
pass
class UpdateEventError(PyMISPError):
pass
class NewAttributeError(PyMISPError):
pass
class UpdateAttributeError(PyMISPError):
pass
class SearchError(PyMISPError):
pass
@ -47,3 +55,15 @@ class UnknownMISPObjectTemplate(MISPObjectException):
class PyMISPInvalidFormat(PyMISPError):
pass
class MISPServerError(PyMISPError):
pass
class PyMISPNotImplementedYet(PyMISPError):
pass
class PyMISPUnexpectedResponse(PyMISPError):
pass

View File

@ -7,7 +7,6 @@ import os
import base64
from io import BytesIO
from zipfile import ZipFile
import hashlib
import sys
import uuid
from collections import defaultdict
@ -23,7 +22,7 @@ logger = logging.getLogger('pymisp')
if six.PY2:
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
logger.warning("You're using python 2, it is strongly recommended to use python >=3.6")
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
@ -40,6 +39,13 @@ if six.PY2:
def dst(self, dt):
return timedelta(0)
if (3, 0) <= sys.version_info < (3, 6):
OLD_PY3 = True
else:
OLD_PY3 = False
try:
from dateutil.parser import parse
except ImportError:
@ -93,8 +99,11 @@ class MISPAttribute(AbstractMISP):
super(MISPAttribute, self).__init__()
if not describe_types:
ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
with open(os.path.join(ressources_path, 'describeTypes.json'), 'r') as f:
t = json.load(f)
with open(os.path.join(ressources_path, 'describeTypes.json'), 'rb') as f:
if OLD_PY3:
t = json.loads(f.read().decode())
else:
t = json.load(f)
describe_types = t['result']
self.__categories = describe_types['categories']
self._types = describe_types['types']
@ -153,6 +162,8 @@ class MISPAttribute(AbstractMISP):
return misp_shadow_attribute
def from_dict(self, **kwargs):
if kwargs.get('Attribute'):
kwargs = kwargs.get('Attribute')
if kwargs.get('type') and kwargs.get('category'):
if kwargs['type'] not in self.__category_type_mapping[kwargs['category']]:
if self.__strict:
@ -235,7 +246,6 @@ class MISPAttribute(AbstractMISP):
to_return = super(MISPAttribute, self).to_dict()
if to_return.get('data'):
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
to_return = _int_to_str(to_return)
return to_return
def _prepare_new_malware_sample(self):
@ -245,11 +255,11 @@ class MISPAttribute(AbstractMISP):
else:
# Assuming the user only passed the filename
self.malware_filename = self.value
m = hashlib.md5()
m.update(self.data.getvalue())
# m = hashlib.md5()
# m.update(self.data.getvalue())
self.value = self.malware_filename
md5 = m.hexdigest()
self.value = '{}|{}'.format(self.malware_filename, md5)
# md5 = m.hexdigest()
# self.value = '{}|{}'.format(self.malware_filename, md5)
self._malware_binary = self.data
self.encrypt = True
@ -347,18 +357,27 @@ class MISPAttribute(AbstractMISP):
class MISPEvent(AbstractMISP):
def __init__(self, describe_types=None, strict_validation=False):
super(MISPEvent, self).__init__()
def __init__(self, describe_types=None, strict_validation=False, **kwargs):
super(MISPEvent, self).__init__(**kwargs)
ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
if strict_validation:
with open(os.path.join(ressources_path, 'schema.json'), 'r') as f:
self.__json_schema = json.load(f)
with open(os.path.join(ressources_path, 'schema.json'), 'rb') as f:
if OLD_PY3:
self.__json_schema = json.loads(f.read().decode())
else:
self.__json_schema = json.load(f)
else:
with open(os.path.join(ressources_path, 'schema-lax.json'), 'r') as f:
self.__json_schema = json.load(f)
with open(os.path.join(ressources_path, 'schema-lax.json'), 'rb') as f:
if OLD_PY3:
self.__json_schema = json.loads(f.read().decode())
else:
self.__json_schema = json.load(f)
if not describe_types:
with open(os.path.join(ressources_path, 'describeTypes.json'), 'r') as f:
t = json.load(f)
with open(os.path.join(ressources_path, 'describeTypes.json'), 'rb') as f:
if OLD_PY3:
t = json.loads(f.read().decode())
else:
t = json.load(f)
describe_types = t['result']
self._types = describe_types['types']
@ -427,15 +446,17 @@ class MISPEvent(AbstractMISP):
"""Load a JSON dump from a file on the disk"""
if not os.path.exists(event_path):
raise PyMISPError('Invalid path, unable to load the event.')
with open(event_path, 'r') as f:
with open(event_path, 'rb') as f:
self.load(f)
def load(self, json_event):
def load(self, json_event, validate=False):
"""Load a JSON dump from a pseudo file or a JSON string"""
if hasattr(json_event, 'read'):
# python2 and python3 compatible to find if we have a file
json_event = json_event.read()
if isinstance(json_event, basestring):
if isinstance(json_event, (basestring, bytes)):
if OLD_PY3 and isinstance(json_event, bytes):
json_event = json_event.decode()
json_event = json.loads(json_event)
if json_event.get('response'):
event = json_event.get('response')[0]
@ -448,9 +469,10 @@ class MISPEvent(AbstractMISP):
'attribute_count' in event.get('Event') and
event.get('Event').get('attribute_count') is None):
event['Event']['attribute_count'] = '0'
jsonschema.validate(event, self.__json_schema)
e = event.get('Event')
self.from_dict(**e)
if validate:
jsonschema.validate(json.loads(self.to_json()), self.__json_schema)
def set_date(self, date, ignore_invalid=False):
"""Set a date for the event (string, datetime, or date object)"""
@ -550,9 +572,7 @@ class MISPEvent(AbstractMISP):
if to_return.get('publish_timestamp'):
to_return['publish_timestamp'] = self._datetime_to_timestamp(self.publish_timestamp)
to_return = _int_to_str(to_return)
to_return = {'Event': to_return}
return to_return
return {'Event': _int_to_str(to_return)}
def add_proposal(self, shadow_attribute=None, **kwargs):
"""Alias for add_shadow_attribute"""
@ -777,12 +797,22 @@ class MISPUser(AbstractMISP):
def __init__(self):
super(MISPUser, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('User'):
kwargs = kwargs.get('User')
super(MISPUser, self).from_dict(**kwargs)
class MISPOrganisation(AbstractMISP):
def __init__(self):
super(MISPOrganisation, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Organisation'):
kwargs = kwargs.get('Organisation')
super(MISPOrganisation, self).from_dict(**kwargs)
class MISPFeed(AbstractMISP):
@ -897,8 +927,11 @@ class MISPObject(AbstractMISP):
else:
self._known_template = False
if self._known_template:
with open(template_path, 'r') as f:
self._definition = json.load(f)
with open(template_path, 'rb') as f:
if OLD_PY3:
self._definition = json.loads(f.read().decode())
else:
self._definition = json.load(f)
setattr(self, 'meta-category', self._definition['meta-category'])
self.template_uuid = self._definition['uuid']
self.description = self._definition['description']
@ -975,6 +1008,11 @@ class MISPObject(AbstractMISP):
else:
self._known_template = False
if kwargs.get('timestamp'):
if sys.version_info >= (3, 3):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc)
else:
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), UTC())
if kwargs.get('Attribute'):
for a in kwargs.pop('Attribute'):
self.add_attribute(**a)

View File

@ -7,6 +7,21 @@ from .abstractgenerator import AbstractMISPObjectGenerator
class GenericObjectGenerator(AbstractMISPObjectGenerator):
def generate_attributes(self, attributes):
"""Generates MISPObjectAttributes from a list of dictionaries.
Each entry if the list must be in one of the two following formats:
* {<object_relation>: <value>}
* {<object_relation>: {'value'=<value>, 'type'=<type>, <and any other key/value accepted by a MISPAttribute>]}
Note: Any missing parameter will default to the pre-defined value from the Object template.
If the object template isn't known by PyMISP, you *must* pass a type key/value, or it will fail.
Example:
[{'analysis_submitted_at': '2018-06-15T06:40:27'},
{'threat_score': {value=95, to_ids=False}},
{'permalink': 'https://panacea.threatgrid.com/mask/samples/2e445ef5389d8b'},
{'heuristic_raw_score': 7.8385159793597}, {'heuristic_score': 96},
{'original_filename': 'juice.exe'}, {'id': '2e445ef5389d8b'}]
"""
for attribute in attributes:
for object_relation, value in attribute.items():
if isinstance(value, dict):

View File

@ -138,7 +138,7 @@ iocMispCompositeMapping = {
'FileItem/FileName|FileItem/Sha1sum': {'type': 'filename|sha1'},
'FileItem/FileName|FileItem/Sha256sum': {'type': 'filename|sha256'},
'Network/DNS|PortItem/remoteIP': {'type': 'domain|ip'},
'PortItem/remoteIP|PortItem/remotePort': {'comment': 'ip-dst|port'},
'PortItem/remoteIP|PortItem/remotePort': {'type': 'ip-dst|port'},
'RegistryItem/Path|RegistryItem/Value': {'type': 'regkey|value'},
'RegistryItem/KeyPath|RegistryItem/Value': {'type': 'regkey|value'},
'RegistryItem/Path|RegistryItem/Text': {'type': 'regkey|value'}

View File

@ -1,8 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from os import path
from setuptools import setup
import pymisp
this_directory = path.abspath(path.dirname(__file__))
with open(path.join(this_directory, 'README.md'), 'r') as f:
long_description = f.read()
setup(
name='pymisp',
@ -11,7 +17,14 @@ setup(
author_email='raphael.vinot@circl.lu',
maintainer='Raphaël Vinot',
url='https://github.com/MISP/PyMISP',
project_urls={
'Documentation': 'http://pymisp.readthedocs.io',
'Source': 'https://github.com/MISP/PyMISP',
'Tracker': 'https://github.com/MISP/PyMISP/issues',
},
description='Python API for MISP.',
long_description=long_description,
long_description_content_type='text/markdown',
packages=['pymisp', 'pymisp.tools'],
classifiers=[
'License :: OSI Approved :: BSD License',
@ -26,8 +39,7 @@ setup(
'Topic :: Security',
'Topic :: Internet',
],
test_suite="tests.test_offline",
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema', 'setuptools>=36.4'],
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema', 'setuptools>=36.4', 'python-dateutil', 'enum34;python_version<"3.4"'],
extras_require={'fileobjects': ['lief>=0.8', 'python-magic'],
'neo': ['py2neo'],
'openioc': ['beautifulsoup4'],
@ -35,11 +47,11 @@ setup(
'warninglists': ['pymispwarninglists']},
tests_require=[
'jsonschema',
'python-dateutil',
'python-magic',
'requests-mock',
'six'
],
test_suite="tests.test_offline",
include_package_data=True,
package_data={'pymisp': ['data/*.json',
'data/misp-objects/schema_objects.json',

View File

@ -37,12 +37,12 @@
}
],
"description": "Whois records information for a domain name or an IP address.",
"distribution": 5,
"distribution": "5",
"meta-category": "network",
"name": "whois",
"sharing_group_id": 0,
"sharing_group_id": "0",
"template_uuid": "429faea1-34ff-47af-8a00-7c62d3be5a6a",
"template_version": 10,
"template_version": "10",
"uuid": "a"
}
],

View File

@ -26,12 +26,12 @@
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"distribution": "5",
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 11,
"template_version": "13",
"uuid": "a"
},
{
@ -46,12 +46,12 @@
}
],
"description": "url object describes an url along with its normalized field (like extracted using faup parsing library) and its metadata.",
"distribution": 5,
"distribution": "5",
"meta-category": "network",
"name": "url",
"sharing_group_id": 0,
"sharing_group_id": "0",
"template_uuid": "60efb77b-40b5-4c46-871b-ed1ed999fce5",
"template_version": 6,
"template_version": "6",
"uuid": "b"
}
]

View File

@ -18,12 +18,12 @@
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"distribution": "5",
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 11,
"template_version": "13",
"uuid": "a"
},
{
@ -43,12 +43,12 @@
}
],
"description": "File object describing a file with meta-information",
"distribution": 5,
"distribution": "5",
"meta-category": "file",
"name": "file",
"sharing_group_id": 0,
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": 11,
"template_version": "13",
"uuid": "b"
}
]

View File

@ -9,7 +9,7 @@
"malware_filename": "bar.exe",
"to_ids": true,
"type": "malware-sample",
"value": "bar.exe|7637beddacbeac59d44469b2b120b9e6"
"value": "bar.exe"
}
],
"analysis": "1",

View File

@ -21,13 +21,13 @@
}
],
"description": "TestTemplate.",
"distribution": 5,
"distribution": "5",
"meta-category": "file",
"misp_objects_path_custom": "tests/mispevent_testfiles",
"name": "test_object_template",
"sharing_group_id": 0,
"sharing_group_id": "0",
"template_uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589",
"template_version": 1,
"template_version": "1",
"uuid": "a"
}
],

View File

@ -1,5 +1,5 @@
{
"timestamp": 11111111,
"timestamp": "11111111",
"type": "bar",
"value": "1"
}

View File

@ -21,7 +21,7 @@
"misp-attribute": "text"
}
},
"version": 1,
"version": "1",
"description": "TestTemplate.",
"meta-category": "file",
"uuid": "4ec55cc6-9e49-4c64-b794-03c25c1a6589",

View File

@ -1,7 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import PyMISP, __version__
from keys import url, key
try:
from keys import url, key
except ImportError as e:
print(e)
url = 'http://localhost:8080'
key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'
import time
import unittest
@ -294,7 +300,12 @@ class TestBasic(unittest.TestCase):
self.assertTrue(sd['default_category'] in categories)
def test_describeTypes_uptodate(self):
self.assertEqual(self.live_describe_types, self.misp.get_local_describe_types())
local_describe = self.misp.get_local_describe_types()
for temp_key in local_describe.keys():
if isinstance(local_describe[temp_key], list):
self.assertEqual(sorted(self.live_describe_types[temp_key]), sorted(local_describe[temp_key]))
else:
self.assertEqual(self.live_describe_types[temp_key], local_describe[temp_key])
def test_live_acl(self):
query_acl = self.misp.get_live_query_acl()

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import unittest
@ -166,18 +166,18 @@ class TestOffline(unittest.TestCase):
Regression tests for #174
"""
hashes_fname = mock.add_hashes(event,
md5='68b329da9893e34099c7d8ad5cb9c940',
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b',
filename='foobar.exe')
md5='68b329da9893e34099c7d8ad5cb9c940',
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b',
filename='foobar.exe')
self.assertEqual(3, len(hashes_fname))
for attr in hashes_fname:
self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute))
self.assertIn("filename|", attr["type"])
hashes_only = mock.add_hashes(event, md5='68b329da9893e34099c7d8ad5cb9c940',
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b')
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b')
self.assertEqual(3, len(hashes_only))
for attr in hashes_only:
self.assertTrue(isinstance(attr, pm.mispevent.MISPAttribute))
@ -204,7 +204,6 @@ class TestOffline(unittest.TestCase):
self.assertEqual(key[0]["type"], "regkey|value")
self.assertIn("foobar|foobar", key[0]["value"])
def test_addAttributes(self, m):
self.initURI(m)
p = MockPyMISP(self.domain, self.key)
@ -364,7 +363,7 @@ class TestOffline(unittest.TestCase):
def test_flatten_error_messages_singular(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
error = pymisp.get(1)
pymisp.get(1)
response = self.auth_error_msg
response['error'] = ['foo', 'bar', 'baz']
messages = pymisp.flatten_error_messages(response)
@ -405,7 +404,7 @@ class TestOffline(unittest.TestCase):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
try:
_ = pymisp.change_toids(self.key, 42)
pymisp.change_toids(self.key, 42)
self.assertFalse('Exception required for off domain value')
except Exception:
pass
@ -434,7 +433,7 @@ class TestOffline(unittest.TestCase):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
try:
_ = pymisp.freetext(1, None, adhereToWarninglists='hard')
pymisp.freetext(1, None, adhereToWarninglists='hard')
self.assertFalse('Exception required for off domain value')
except Exception:
pass
@ -452,9 +451,9 @@ class TestOffline(unittest.TestCase):
def test_sample_upload(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
upload = pymisp.upload_sample("tmux", "tests/viper-test-files/test_files/tmux", 1)
upload = pymisp.upload_sample("tmux", "non_existing_file", 1)
upload = pymisp.upload_sample("tmux", b"binblob", 1)
pymisp.upload_sample("tmux", "tests/viper-test-files/test_files/tmux", 1)
pymisp.upload_sample("tmux", "non_existing_file", 1)
pymisp.upload_sample("tmux", b"binblob", 1)
def test_get_all_tags(self, m):
self.initURI(m)

View File

@ -0,0 +1,873 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import unittest
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis
from datetime import datetime, timedelta, date
from io import BytesIO
import time
try:
from keys import url, key
except ImportError as e:
print(e)
url = 'http://localhost:8080'
key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'
from uuid import uuid4
travis_run = True
class TestComprehensive(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.maxDiff = None
# Connect as admin
cls.admin_misp_connector = ExpandedPyMISP(url, key, debug=False)
# Creates an org
org = cls.admin_misp_connector.add_organisation(name='Test Org')
cls.test_org = MISPOrganisation()
cls.test_org.from_dict(**org)
# Creates a user
usr = cls.admin_misp_connector.add_user(email='testusr@user.local', org_id=cls.test_org.id, role_id=3)
cls.test_usr = MISPUser()
cls.test_usr.from_dict(**usr)
cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey)
# Creates a publisher
pub = cls.admin_misp_connector.add_user(email='testpub@user.local', org_id=cls.test_org.id, role_id=4)
cls.test_pub = MISPUser()
cls.test_pub.from_dict(**pub)
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey)
@classmethod
def tearDownClass(cls):
# Delete publisher
cls.admin_misp_connector.delete_user(user_id=cls.test_pub.id)
# Delete user
cls.admin_misp_connector.delete_user(user_id=cls.test_usr.id)
# Delete org
cls.admin_misp_connector.delete_organisation(org_id=cls.test_org.id)
def create_simple_event(self, force_timestamps=False):
mispevent = MISPEvent(force_timestamps=force_timestamps)
mispevent.info = 'This is a super simple test'
mispevent.distribution = Distribution.your_organisation_only
mispevent.threat_level_id = ThreatLevel.low
mispevent.analysis = Analysis.completed
mispevent.add_attribute('text', str(uuid4()))
return mispevent
def environment(self):
first_event = MISPEvent()
first_event.info = 'First event - org only - low - completed'
first_event.distribution = Distribution.your_organisation_only
first_event.threat_level_id = ThreatLevel.low
first_event.analysis = Analysis.completed
first_event.set_date("2017-12-31")
first_event.add_attribute('text', str(uuid4()))
first_event.attributes[0].add_tag('admin_only')
first_event.attributes[0].add_tag('tlp:white___test')
first_event.add_attribute('text', str(uuid4()))
first_event.attributes[1].add_tag('unique___test')
second_event = MISPEvent()
second_event.info = 'Second event - org only - medium - ongoing'
second_event.distribution = Distribution.your_organisation_only
second_event.threat_level_id = ThreatLevel.medium
second_event.analysis = Analysis.ongoing
second_event.set_date("Aug 18 2018")
second_event.add_attribute('text', str(uuid4()))
second_event.attributes[0].add_tag('tlp:white___test')
second_event.add_attribute('ip-dst', '1.1.1.1')
# Same value as in first event.
second_event.add_attribute('text', first_event.attributes[0].value)
third_event = MISPEvent()
third_event.info = 'Third event - all orgs - high - initial'
third_event.distribution = Distribution.all_communities
third_event.threat_level_id = ThreatLevel.high
third_event.analysis = Analysis.initial
third_event.set_date("Jun 25 2018")
third_event.add_tag('tlp:white___test')
third_event.add_attribute('text', str(uuid4()))
third_event.attributes[0].add_tag('tlp:amber___test')
third_event.attributes[0].add_tag('foo_double___test')
third_event.add_attribute('ip-src', '8.8.8.8')
third_event.attributes[1].add_tag('tlp:amber___test')
third_event.add_attribute('ip-dst', '9.9.9.9')
# Create first and third event as admin
# usr won't be able to see the first one
first = self.admin_misp_connector.add_event(first_event)
third = self.admin_misp_connector.add_event(third_event)
# Create second event as user
second = self.user_misp_connector.add_event(second_event)
return first, second, third
def test_search_value_event(self):
'''Search a value on the event controller
* Test ACL admin user vs normal user in an other org
* Make sure we have one match
'''
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [first.id, second.id])
# Search as user
events = self.user_misp_connector.search(value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [second.id])
# Non-existing value
events = self.user_misp_connector.search(value=str(uuid4()), pythonify=True)
self.assertEqual(events, [])
finally:
# Delete events
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_value_attribute(self):
'''Search value in attributes controller'''
try:
first, second, third = self.environment()
# Search as admin
attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(attributes), 2)
for a in attributes:
self.assertIn(a.event_id, [first.id, second.id])
# Search as user
attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(attributes), 1)
for a in attributes:
self.assertIn(a.event_id, [second.id])
# Non-existing value
attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()), pythonify=True)
self.assertEqual(attributes, [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_type_event(self):
'''Search multiple events, search events containing attributes with specific types'''
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search, pythonify=True)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [second.id, third.id])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_type_attribute(self):
'''Search multiple attributes, search attributes with specific types'''
try:
first, second, third = self.environment()
# Search as admin
attributes = self.admin_misp_connector.search(controller='attributes',
timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(attributes), 8)
for a in attributes:
self.assertIn(a.event_id, [first.id, second.id, third.id])
# Search as user
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
attributes = self.admin_misp_connector.search(controller='attributes',
timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search, pythonify=True)
self.assertEqual(len(attributes), 3)
for a in attributes:
self.assertIn(a.event_id, [second.id, third.id])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_event(self):
'''Search Tags at events level'''
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(tags='tlp:white___test', pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
events = self.admin_misp_connector.search(tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
events = self.admin_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [first.id])
# Search as user
events = self.user_misp_connector.search(tags='tlp:white___test', pythonify=True)
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [second.id, third.id])
events = self.user_misp_connector.search(tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [third.id])
events = self.user_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(events, [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_attribute(self):
'''Search Tags at attributes level'''
try:
first, second, third = self.environment()
# Search as admin
attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:white___test', pythonify=True)
self.assertEqual(len(attributes), 5)
attributes = self.admin_misp_connector.search(controller='attributes', tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(attributes), 2)
attributes = self.admin_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(len(attributes), 1)
# Search as user
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test', pythonify=True)
self.assertEqual(len(attributes), 4)
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test', pythonify=True)
self.assertEqual(len(attributes), 2)
attributes = self.user_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(attributes, [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_advanced_event(self):
'''Advanced search Tags at events level'''
try:
first, second, third = self.environment()
complex_query = self.admin_misp_connector.build_complex_query(or_parameters=['tlp:white___test'],
not_parameters=['tlp:amber___test',
'foo_double___test'])
events = self.admin_misp_connector.search(tags=complex_query, pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
for a in e.attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
for a in e.attributes:
self.assertEqual([t for t in a.tags if t.name == 'foo_double___test'], [])
complex_query = self.admin_misp_connector.build_complex_query(or_parameters=['unique___test'],
not_parameters=['tlp:white___test'])
events = self.admin_misp_connector.search(tags=complex_query, pythonify=True)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [first.id, second.id])
for a in e.attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:white___test'], [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_tag_advanced_attributes(self):
'''Advanced search Tags at attributes level'''
try:
first, second, third = self.environment()
complex_query = self.admin_misp_connector.build_complex_query(or_parameters=['tlp:white___test'],
not_parameters=['tlp:amber___test',
'foo_double___test'])
attributes = self.admin_misp_connector.search(controller='attributes', tags=complex_query, pythonify=True)
self.assertEqual(len(attributes), 3)
for a in attributes:
self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
for a in attributes:
self.assertEqual([t for t in a.tags if t.name == 'foo_double___test'], [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_timestamp_event(self):
'''Search specific update timestamps at events level'''
# Creating event 1 - timestamp 5 min ago
first = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_first = datetime.now() - timedelta(minutes=5)
first.timestamp = event_creation_timestamp_first
# Creating event 2 - timestamp 2 min ago
second = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_second = datetime.now() - timedelta(minutes=2)
second.timestamp = event_creation_timestamp_second
try:
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
# Search as user
# # Test - last 4 min
events = self.user_misp_connector.search(timestamp='4m', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event
events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp(), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min
events = self.user_misp_connector.search(timestamp=['6m', '4m'], pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_search_timestamp_attribute(self):
'''Search specific update timestamps at attributes level'''
# Creating event 1 - timestamp 5 min ago
first = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_first = datetime.now() - timedelta(minutes=5)
first.timestamp = event_creation_timestamp_first
first.attributes[0].timestamp = event_creation_timestamp_first
# Creating event 2 - timestamp 2 min ago
second = self.create_simple_event(force_timestamps=True)
event_creation_timestamp_second = datetime.now() - timedelta(minutes=2)
second.timestamp = event_creation_timestamp_second
second.attributes[0].timestamp = event_creation_timestamp_second
try:
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
# Search as user
# # Test - last 4 min
attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m', pythonify=True)
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event
attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp(), pythonify=True)
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min
attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'], pythonify=True)
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, first.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_user_perms(self):
'''Test publish rights'''
try:
first = self.create_simple_event()
first.publish()
# Add event as user, no publish rights
first = self.user_misp_connector.add_event(first)
self.assertFalse(first.published)
# Add event as publisher
first.publish()
first = self.pub_misp_connector.update_event(first)
self.assertTrue(first.published)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
# @unittest.skip("Uncomment when adding new tests, it has a 10s sleep")
def test_search_publish_timestamp(self):
'''Search for a specific publication timestamp, an interval, and invalid values.'''
# Creating event 1
first = self.create_simple_event()
first.publish()
# Creating event 2
second = self.create_simple_event()
second.publish()
try:
first = self.pub_misp_connector.add_event(first)
time.sleep(10)
second = self.pub_misp_connector.add_event(second)
# Test invalid query
events = self.pub_misp_connector.search(publish_timestamp='5x', pythonify=True)
self.assertEqual(events, [])
events = self.pub_misp_connector.search(publish_timestamp='ad', pythonify=True)
self.assertEqual(events, [])
events = self.pub_misp_connector.search(publish_timestamp='aaad', pythonify=True)
self.assertEqual(events, [])
# Test - last 4 min
events = self.pub_misp_connector.search(publish_timestamp='5s', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Test 5 sec before timestamp of 2nd event
events = self.pub_misp_connector.search(publish_timestamp=(second.publish_timestamp.timestamp()), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Test interval -6 min -> -4 min
events = self.pub_misp_connector.search(publish_timestamp=[first.publish_timestamp.timestamp() - 5,
second.publish_timestamp.timestamp() - 5], pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_simple_event(self):
'''Search a bunch of parameters:
* Value not existing
* only return metadata
* published yes/no
* event id
* uuid
* creator org
* substring search in value and eventinfo
* quickfilter
* date_from
* date_to
* deleted
* to_ids
* include_event_uuid
warning list
'''
first = self.create_simple_event()
first.info = 'foo bar blah'
# First has one text attribute
second = self.create_simple_event()
second.info = 'foo blah'
second.set_date('2018-09-01')
second.add_attribute('ip-src', '8.8.8.8')
# second has two attributes: text and ip-src
try:
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5]
# Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't
events = self.user_misp_connector.search(timestamp=timeframe, pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(events[0].id, first.id)
self.assertEqual(events[1].id, second.id)
events = self.user_misp_connector.search(timestamp=timeframe, value='nothere', pythonify=True)
self.assertEqual(events, [])
events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=[first.timestamp.timestamp() - 50,
first.timestamp.timestamp() - 10],
value=first.attributes[0].value, pythonify=True)
self.assertEqual(events, [])
# Test return content
events = self.user_misp_connector.search(timestamp=timeframe, metadata=False, pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 1)
self.assertEqual(len(events[1].attributes), 2)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=True, pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 0)
self.assertEqual(len(events[1].attributes), 0)
# other things
events = self.user_misp_connector.search(timestamp=timeframe, published=True, pythonify=True)
self.assertEqual(events, [])
events = self.user_misp_connector.search(timestamp=timeframe, published=False, pythonify=True)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(eventid=first.id, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(uuid=first.uuid, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(org=first.orgc_id, pythonify=True)
self.assertEqual(len(events), 2)
# test like search
events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
# quickfilter
events = self.user_misp_connector.search(timestamp=timeframe, quickfilter='%bar%', pythonify=True)
# FIXME: should return one event
# self.assertEqual(len(events), 1)
# self.assertEqual(events[0].id, second.id)
# date_from / date_to
events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', pythonify=True)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Category
events = self.user_misp_connector.search(timestamp=timeframe, category='Network activity', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# toids
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 1)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='exclude', pythonify=True)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 1)
self.assertEqual(len(events[1].attributes), 1)
# deleted
second.attributes[1].delete()
self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(eventid=second.id, pythonify=True)
self.assertEqual(len(events[0].attributes), 1)
events = self.user_misp_connector.search(eventid=second.id, deleted=True, pythonify=True)
self.assertEqual(len(events[0].attributes), 2)
# include_event_uuid
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, include_event_uuid=True, pythonify=True)
self.assertEqual(attributes[0].event_uuid, second.uuid)
# event_timestamp
second.add_attribute('ip-src', '8.8.8.9')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(events), 1)
# searchall
# FIXME: searchall doesn't seem to do anything
# second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment')
# second = self.user_misp_connector.update_event(second)
# events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True)
# self.assertEqual(len(events), 1)
# events = self.user_misp_connector.search(value='stuff', searchall=True, pythonify=True)
# self.assertEqual(len(events), 1)
# warninglist
self.admin_misp_connector.update_warninglists()
response = self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%', force_enable=True) # enable ipv4 DNS.
self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) enabled'})
second.add_attribute('ip-src', '1.11.71.4')
second.add_attribute('ip-src', '9.9.9.9')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(eventid=second.id, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 4)
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 4)
if not travis_run:
# FIXME: This is fialing on travis for no discernable reason...
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 2)
response = self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%') # disable ipv4 DNS.
self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) toggled'})
time.sleep(1) # make sure the next attribute is added one at least one second later
# attachments
with open('tests/testlive_comprehensive.py', 'rb') as f:
first.add_attribute('malware-sample', value='testfile.py', data=BytesIO(f.read()))
first = self.user_misp_connector.update_event(first)
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=True,
pythonify=True)
self.assertEqual(len(events), 1)
self.assertIs(type(events[0].attributes[-1].malware_binary), BytesIO)
events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=False,
pythonify=True)
self.assertEqual(len(events), 1)
self.assertIs(events[0].attributes[-1].malware_binary, None)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_edit_attribute(self):
first = self.create_simple_event()
try:
first.attributes[0].comment = 'This is the original comment'
first = self.user_misp_connector.add_event(first)
first.attributes[0].comment = 'This is the modified comment'
attribute = self.user_misp_connector.update_attribute(first.attributes[0])
self.assertEqual(attribute.comment, 'This is the modified comment')
attribute = self.user_misp_connector.change_comment(first.attributes[0].uuid, 'This is the modified comment, again')
self.assertEqual(attribute['Attribute']['comment'], 'This is the modified comment, again')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
def test_get_csv(self):
first = self.create_simple_event()
second = self.create_simple_event()
second.info = 'foo blah'
second.set_date('2018-09-01')
second.add_attribute('ip-src', '8.8.8.8')
try:
first.attributes[0].comment = 'This is the original comment'
first = self.user_misp_connector.add_event(first)
response = self.user_misp_connector.fast_publish(first.id, alert=False)
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
# default search, all attributes with to_ids == False
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
# FIXME: Should not return anything (to_ids is False)
# self.assertEqual(len(csv), 0)
# Also export attributes with to_ids set to false
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, ignore=True, pythonify=True)
self.assertEqual(len(csv), 1)
# Default search, attribute with to_ids == True
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# eventid
csv = self.user_misp_connector.get_csv(eventid=first.id, pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# category
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True)
self.assertEqual(len(csv), 0)
# type_attribute
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True)
self.assertEqual(len(csv), 0)
# context
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True)
self.assertEqual(len(csv), 1)
# print(csv[0])
# FIXME: there is no context.
# date_from date_to
second = self.user_misp_connector.add_event(second)
csv = self.user_misp_connector.get_csv(date_from=date.today().isoformat(), pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
self.assertEqual(len(csv), 2)
# headerless
csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', headerless=True)
# FIXME: The header is here.
# print(csv)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_upload_sample(self):
first = self.create_simple_event()
second = self.create_simple_event()
third = self.create_simple_event()
try:
# Simple, not executable
first = self.user_misp_connector.add_event(first)
with open('tests/testlive_comprehensive.py', 'rb') as f:
response = self.user_misp_connector.upload_sample(filename='testfile.py', filepath_or_bytes=f.read(),
event_id=first.id)
self.assertEqual(response['message'], 'Success, saved all attributes.')
first = self.user_misp_connector.get_event(first.id)
self.assertEqual(len(first.objects), 1)
self.assertEqual(first.objects[0].name, 'file')
# Simple, executable
second = self.user_misp_connector.add_event(second)
with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f:
response = self.user_misp_connector.upload_sample(filename='whoami.exe', filepath_or_bytes=f.read(),
event_id=second.id)
self.assertEqual(response['message'], 'Success, saved all attributes.')
second = self.user_misp_connector.get_event(second.id)
self.assertEqual(len(second.objects), 1)
self.assertEqual(second.objects[0].name, 'file')
third = self.user_misp_connector.add_event(third)
if not travis_run:
# Advanced, executable
with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f:
response = self.user_misp_connector.upload_sample(filename='whoami.exe', filepath_or_bytes=f.read(),
event_id=third.id, advanced_extraction=True)
self.assertEqual(response['message'], 'Success, saved all attributes.')
third = self.user_misp_connector.get_event(third.id)
self.assertEqual(len(third.objects), 7)
self.assertEqual(third.objects[0].name, 'pe-section')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_update_modules(self):
# object templates
self.admin_misp_connector.update_object_templates()
r = self.admin_misp_connector.update_object_templates()
self.assertEqual(type(r), list)
def test_tags(self):
# Get list
tags = self.admin_misp_connector.get_tags_list()
self.assertTrue(isinstance(tags, list))
# Get tag
for tag in tags:
if not tag['hide_tag']:
break
tag = self.admin_misp_connector.get_tag(tags[0]['id'])
self.assertTrue('name' in tag)
self.admin_misp_connector.disable_tag(tag['id'])
# FIXME: returns the tag with ID 1
self.admin_misp_connector.enable_tag(tag['id'])
# FIXME: returns the tag with ID 1
def test_taxonomies(self):
# Make sure we're up-to-date
self.admin_misp_connector.update_taxonomies()
r = self.admin_misp_connector.update_taxonomies()
self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.')
# Get list
taxonomies = self.admin_misp_connector.get_taxonomies_list()
self.assertTrue(isinstance(taxonomies, list))
list_name_test = 'tlp'
for tax in taxonomies:
if tax['Taxonomy']['namespace'] == list_name_test:
break
r = self.admin_misp_connector.get_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['Taxonomy']['namespace'], list_name_test)
self.assertTrue('enabled' in r['Taxonomy'])
r = self.admin_misp_connector.enable_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['message'], 'Taxonomy enabled')
r = self.admin_misp_connector.disable_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['message'], 'Taxonomy disabled')
def test_warninglists(self):
# Make sure we're up-to-date
self.admin_misp_connector.update_warninglists()
r = self.admin_misp_connector.update_warninglists()
self.assertEqual(r['name'], 'All warninglists are up to date already.')
# Get list
r = self.admin_misp_connector.get_warninglists()
# FIXME It returns Warninglists object instead of a list of warning lists directly. This is inconsistent.
warninglists = r['Warninglists']
self.assertTrue(isinstance(warninglists, list))
list_name_test = 'List of known hashes with common false-positives (based on Florian Roth input list)'
for wl in warninglists:
if wl['Warninglist']['name'] == list_name_test:
break
testwl = wl['Warninglist']
r = self.admin_misp_connector.get_warninglist(testwl['id'])
self.assertEqual(r['Warninglist']['name'], list_name_test)
self.assertTrue('WarninglistEntry' in r['Warninglist'])
r = self.admin_misp_connector.enable_warninglist(testwl['id'])
self.assertEqual(r['success'], '1 warninglist(s) enabled')
r = self.admin_misp_connector.disable_warninglist(testwl['id'])
self.assertEqual(r['success'], '1 warninglist(s) disabled')
def test_noticelists(self):
# Make sure we're up-to-date
self.admin_misp_connector.update_noticelists()
r = self.admin_misp_connector.update_noticelists()
self.assertEqual(r['name'], 'All noticelists are up to date already.')
# Get list
noticelists = self.admin_misp_connector.get_noticelists()
self.assertTrue(isinstance(noticelists, list))
list_name_test = 'gdpr'
for nl in noticelists:
if nl['Noticelist']['name'] == list_name_test:
break
testnl = nl
r = self.admin_misp_connector.get_noticelist(testnl['Noticelist']['id'])
self.assertEqual(r['Noticelist']['name'], list_name_test)
self.assertTrue('NoticelistEntry' in r['Noticelist'])
r = self.admin_misp_connector.enable_noticelist(testnl['Noticelist']['id'])
self.assertTrue(r['Noticelist']['enabled'])
r = self.admin_misp_connector.disable_noticelist(testnl['Noticelist']['id'])
self.assertFalse(r['Noticelist']['enabled'])
def test_galaxies(self):
if not travis_run:
# Make sure we're up-to-date
self.admin_misp_connector.update_galaxies()
r = self.admin_misp_connector.update_galaxies()
self.assertEqual(r['name'], 'Galaxies updated.')
# Get list
galaxies = self.admin_misp_connector.get_galaxies()
self.assertTrue(isinstance(galaxies, list))
list_name_test = 'Mobile Attack - Attack Pattern'
for galaxy in galaxies:
if galaxy['Galaxy']['name'] == list_name_test:
break
r = self.admin_misp_connector.get_galaxy(galaxy['Galaxy']['id'])
self.assertEqual(r['Galaxy']['name'], list_name_test)
self.assertTrue('GalaxyCluster' in r)
@unittest.skip("Currently failing")
def test_search_type_event_csv(self):
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp())
print(events)
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
print(events)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
if __name__ == '__main__':
unittest.main()