chg: [tests] Add new test cases

Raphaël Vinot 2019-08-01 13:19:21 +02:00
parent 1f6c238370
commit 1b85f73d89
4 changed files with 206 additions and 99 deletions

Pipfile.lock generated
View File

@ -140,13 +140,13 @@
"pymispwarninglists": {
"editable": true,
"git": "",
"ref": "1901e2e54db829fb3c50dd034f2632874aa779db"
"ref": "52b0a0f93045861330c134385f88441f212f6421"
"pyrsistent": {
"hashes": [
"version": "==0.15.3"
"version": "==0.15.4"
"python-dateutil": {
"hashes": [
@ -311,47 +311,48 @@
"coverage": {
"hashes": [
"version": "==4.5.3"
"version": "==4.5.4"
"coveralls": {
"hashes": [
"index": "pypi",
"version": "==1.8.1"
"version": "==1.8.2"
"decorator": {
"hashes": [
@ -488,10 +489,10 @@
"packaging": {
"hashes": [
"version": "==19.0"
"version": "==19.1"
"pillow": {
"hashes": [
@ -563,16 +564,16 @@
"pyparsing": {
"hashes": [
"version": "==2.4.1"
"version": "==2.4.2"
"pyrsistent": {
"hashes": [
"version": "==0.15.3"
"version": "==0.15.4"
"python-dateutil": {
"hashes": [
@ -590,10 +591,10 @@
"pytz": {
"hashes": [
"version": "==2019.1"
"version": "==2019.2"
"recommonmark": {
"hashes": [
@ -679,10 +680,10 @@
"sphinx-autodoc-typehints": {
"hashes": [
"version": "==1.6.0"
"version": "==1.7.0"
"sphinxcontrib-applehelp": {
"hashes": [

View File

@ -12,6 +12,8 @@ import requests
from requests.auth import AuthBase
import re
from uuid import UUID
import warnings
import sys
from . import __version__
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey
@ -78,6 +80,9 @@ class ExpandedPyMISP(PyMISP):
elif pymisp_version_tup[:3] < recommended_version_tup:
logger.warning(f"The version of PyMISP recommended by the MI)SP instance ({response['version']}) is newer than the one you're using now ({__version__}). Please upgrade PyMISP.")
misp_version = self.misp_instance_version
if 'version' in misp_version:
self._misp_version = tuple(int(v) for v in misp_version['version'].split('.'))
except Exception as e:
raise PyMISPError(f'Unable to connect to MISP ({self.root_url}). Please make sure the API key and the URL are correct (http/https is required): {e}')
@ -149,8 +154,31 @@ class ExpandedPyMISP(PyMISP):
def toggle_global_pythonify(self):
self.global_pythonify = not self.global_pythonify
def _old_misp(self, minimal_version_required: tuple, removal_date: Union[str, date, datetime], method: str=None, message: str=None):
if self._misp_version >= minimal_version_required:
return False
if isinstance(removal_date, (datetime, date)):
removal_date = removal_date.isoformat()
to_print = f'The instance of MISP you are using is outdated. Unless you update your MISP instance, {method} will stop working after {removal_date}.'
if message:
to_print += f' {message}'
warnings.warn(to_print, DeprecationWarning)
return True
# ## BEGIN Event ##
def events(self, pythonify: bool=False):
events = self._prepare_request('GET', 'events')
events = self._check_response(events, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in events:
return events
to_return = []
for event in events:
e = MISPEvent()
return to_return
def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=False):
'''Get an event from a MISP instance'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
@ -240,7 +268,6 @@ class ExpandedPyMISP(PyMISP):
def delete_object(self, misp_object: Union[MISPObject, int, str, UUID]):
'''Delete an object from a MISP instance'''
# FIXME: MISP doesn't support DELETE on this endpoint
object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object)
response = self._prepare_request('POST', f'objects/delete/{object_id}')
return self._check_response(response, expect_json=True)
@ -296,6 +323,18 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Attribute ###
def attributes(self, pythonify: bool=False):
attributes = self._prepare_request('GET', f'attributes/index')
attributes = self._check_response(attributes, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in attributes:
return attributes
to_return = []
for attribute in attributes:
a = MISPAttribute()
return to_return
def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False):
'''Get an attribute from a MISP instance'''
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
@ -358,6 +397,22 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Attribute Proposal ###
def attribute_proposals(self, event: Union[MISPEvent, int, str, UUID]=None, pythonify: bool=False):
if event:
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
attribute_proposals = self._prepare_request('GET', f'shadow_attributes/index/{event_id}')
attribute_proposals = self._prepare_request('GET', f'shadow_attributes')
attribute_proposals = self._check_response(attribute_proposals, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposals:
return attribute_proposals
to_return = []
for attribute_proposal in attribute_proposals:
a = MISPShadowAttribute()
return to_return
def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False):
proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal)
attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}')
@ -373,7 +428,6 @@ class ExpandedPyMISP(PyMISP):
def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False):
'''Propose a new attribute in an event'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
# FIXME: attribute needs to be a complete MISPAttribute:
new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute)
new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in new_attribute_proposal:
@ -384,8 +438,10 @@ class ExpandedPyMISP(PyMISP):
def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False):
'''Propose a change for an attribute'''
# FIXME: inconsistency in MISP:
initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute)
if self._old_misp((2, 4, 112), '2020-01-01', sys._getframe().f_code.co_name):
# Inconsistency in MISP:
# Fix:
attribute = {'ShadowAttribute': attribute}
update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute)
update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True)
@ -421,19 +477,28 @@ class ExpandedPyMISP(PyMISP):
def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False):
"""Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)"""
if isinstance(misp_entity, MISPEvent):
scope = 'event'
context = 'event'
elif isinstance(misp_entity, MISPAttribute):
scope = 'attribute'
context = 'attribute'
raise PyMISPError('misp_entity can only be a MISPEvent or a MISPAttribute')
if org is not None:
org_id = self.__get_uuid_or_id_from_abstract_misp(org)
url = f'sightings/listSightings/{}/{scope}/{org_id}'
url = f'sightings/listSightings/{}/{scope}'
org_id = None
if self._old_misp((2, 4, 112), '2020-01-01', sys._getframe().f_code.co_name):
url = f'sightings/listSightings/{}/{context}'
if org_id:
url = f'{url}/{org_id}'
sightings = self._prepare_request('POST', url)
to_post = {'id':, 'context': context}
if org_id:
to_post['org_id'] = org_id
sightings = self._prepare_request('POST', 'sightings/listSightings', data=to_post)
sightings = self._check_response(sightings, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in sightings:
return sightings
@ -926,7 +991,6 @@ class ExpandedPyMISP(PyMISP):
def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None):
'''Initialize a pull from a sync server'''
server_id = self.__get_uuid_or_id_from_abstract_misp(server)
# FIXME: POST & data
if event:
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
url = f'servers/pull/{server_id}/{event_id}'
@ -939,7 +1003,6 @@ class ExpandedPyMISP(PyMISP):
def server_push(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None):
'''Initialize a push to a sync server'''
server_id = self.__get_uuid_or_id_from_abstract_misp(server)
# FIXME: POST & data
if event:
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
url = f'servers/push/{server_id}/{event_id}'
@ -970,7 +1033,9 @@ class ExpandedPyMISP(PyMISP):
"""Add a new sharing group"""
sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group)
sharing_group = self._check_response(sharing_group, expect_json=True)
if self._old_misp((2, 4, 112), '2020-01-01', sys._getframe().f_code.co_name) and isinstance(sharing_group, list):
sharing_group = sharing_group[0]
if not (self.global_pythonify or pythonify) or 'errors' in sharing_group:
return sharing_group
@ -1539,16 +1604,16 @@ class ExpandedPyMISP(PyMISP):
response = self._prepare_request('POST', f'events/pushEventToZMQ/{event_id}.json')
return self._check_response(response, expect_json=True)
def direct_call(self, url: str, data: dict=None, params: dict={}):
def direct_call(self, url: str, data: dict=None, params: dict={}, kw_params: dict={}):
'''Very lightweight call that posts a data blob (python dictionary or json string) on the URL'''
if data is None:
response = self._prepare_request('GET', url, params=params)
response = self._prepare_request('GET', url, params=params, kw_params=kw_params)
response = self._prepare_request('POST', url, data=data, params=params)
response = self._prepare_request('POST', url, data=data, params=params, kw_params=kw_params)
return self._check_response(response, lenient_response_type=True)
def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False,
distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False):
distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False, **kwargs):
"""Pass a text to the freetext importer"""
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
query = {"value": string}
@ -1561,7 +1626,7 @@ class ExpandedPyMISP(PyMISP):
query['distribution'] = distribution
if returnMetaAttributes:
query['returnMetaAttributes'] = returnMetaAttributes
attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query)
attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query, **kwargs)
attributes = self._check_response(attributes, expect_json=True)
if returnMetaAttributes or not (self.global_pythonify or pythonify) or 'errors' in attributes:
return attributes
@ -1756,7 +1821,8 @@ class ExpandedPyMISP(PyMISP):
def __repr__(self):
return f'<{self.__class__.__name__}(url={self.root_url})'
def _prepare_request(self, request_type: str, url: str, data: dict={}, params: dict={}, output_type: str='json'):
def _prepare_request(self, request_type: str, url: str, data: dict={}, params: dict={},
kw_params: dict={}, output_type: str='json'):
'''Prepare a request for python-requests'''
url = urljoin(self.root_url, url)
if logger.isEnabledFor(logging.DEBUG):
@ -1770,6 +1836,10 @@ class ExpandedPyMISP(PyMISP):
data = {k: v for k, v in data.items() if v is not None}
data = json.dumps(data, cls=MISPEncode)
if kw_params:
# CakePHP params in URL
to_append_url = '/'.join([f'{k}:{v}' for k, v in kw_params.items()])
url = f'{url}/{to_append_url}'
req = requests.Request(request_type, url, data=data, params=params)
with requests.Session() as s:
user_agent = 'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}'

View File

@ -616,9 +616,9 @@ class MISPEvent(AbstractMISP):
return misp_shadow_attribute
def get_attribute_tag(self, attribute_identifier):
'''Return the tags associated to an attribute or an object attribute.
"""Return the tags associated to an attribute or an object attribute.
:attribute_identifier: can be an ID, UUID, or the value.
tags = []
for a in self.attributes + [attribute for o in self.objects for attribute in o.attributes]:
if ((hasattr(a, 'id') and == attribute_identifier)
@ -629,10 +629,10 @@ class MISPEvent(AbstractMISP):
return tags
def add_attribute_tag(self, tag, attribute_identifier):
'''Add a tag to an existing attribute, raise an Exception if the attribute doesn't exists.
"""Add a tag to an existing attribute, raise an Exception if the attribute doesn't exists.
:tag: Tag name as a string, MISPTag instance, or dictionary
:attribute_identifier: can be an ID, UUID, or the value.
attributes = []
for a in self.attributes + [attribute for o in self.objects for attribute in o.attributes]:
if ((hasattr(a, 'id') and == attribute_identifier)

View File

@ -1152,8 +1152,7 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.enable_taxonomy(
self.assertEqual(r['message'], 'Taxonomy enabled')
r = self.admin_misp_connector.enable_taxonomy_tags(
# self.assertEqual(r, [])
self.assertEqual(r['name'], 'The tag(s) has been saved.')
r = self.admin_misp_connector.disable_taxonomy(
self.assertEqual(r['message'], 'Taxonomy disabled')
@ -1292,6 +1291,7 @@ class TestComprehensive(unittest.TestCase):
second.distribution = Distribution.all_communities
first = self.user_misp_connector.add_event(first)
second = self.admin_misp_connector.add_event(second, pythonify=True)
# Get attribute
attribute = self.user_misp_connector.get_attribute(first.attributes[0].id)
self.assertEqual(first.attributes[0].uuid, attribute.uuid)
@ -1321,6 +1321,20 @@ class TestComprehensive(unittest.TestCase):
# Get attribute proposal
temp_new_proposal = self.user_misp_connector.get_attribute_proposal(
self.assertEqual(temp_new_proposal.uuid, new_proposal.uuid)
# Get attribute proposal*S*
proposals = self.user_misp_connector.attribute_proposals()
self.assertTrue(isinstance(proposals, list))
self.assertEqual(len(proposals), 3)
self.assertEqual(proposals[0].value, '')
# Get proposals on a specific event
self.admin_misp_connector.add_attribute_proposal(, {'type': 'ip-src', 'value': ''})
proposals = self.admin_misp_connector.attribute_proposals(pythonify=True)
self.assertTrue(isinstance(proposals, list))
self.assertEqual(len(proposals), 4)
proposals = self.admin_misp_connector.attribute_proposals(second, pythonify=True)
self.assertTrue(isinstance(proposals, list))
self.assertEqual(len(proposals), 1)
self.assertEqual(proposals[0].value, '')
# Accept attribute proposal - New attribute
first = self.user_misp_connector.get_event(
@ -1338,13 +1352,14 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(attribute.to_ids, False)
# Test fallback to proposal if the user doesn't own the event
second = self.admin_misp_connector.add_event(second, pythonify=True)
# FIXME: attribute needs to be a complete MISPAttribute:
prop_attr = MISPAttribute()
prop_attr.from_dict(**{'type': 'ip-dst', 'value': ''})
# Add attribute on event owned by someone else
attribute = self.user_misp_connector.add_attribute(, prop_attr)
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
# Test if add proposal without category works -
attribute = self.user_misp_connector.add_attribute(, {'type': 'ip-dst', 'value': ''})
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
# Add attribute with the same value as an existing proposal
prop_attr.uuid = str(uuid4())
attribute = self.admin_misp_connector.add_attribute(, prop_attr, pythonify=True)
@ -1362,6 +1377,17 @@ class TestComprehensive(unittest.TestCase):
# Delete attribute owned by user
response = self.admin_misp_connector.delete_attribute(second.attributes[1].id)
self.assertEqual(response['message'], 'Attribute deleted.')
# Test attribute*S*
attributes = self.admin_misp_connector.attributes()
self.assertEqual(len(attributes), 5)
# attributes = self.user_misp_connector.attributes()
# self.assertEqual(len(attributes), 5)
# Test event*S*
events =
self.assertEqual(len(events), 2)
events =
self.assertEqual(len(events), 2)
# Delete event
@ -1445,11 +1471,9 @@ class TestComprehensive(unittest.TestCase):
users_stats = self.admin_misp_connector.users_statistics(context='tags')
self.assertEqual(list(users_stats.keys()), ['flatData', 'treemap'])
# users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram')
users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram')
self.assertTrue(isinstance(users_stats, dict))
# NOTE Not supported yet
# self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
users_stats = self.user_misp_connector.users_statistics(context='sightings')
self.assertEqual(list(users_stats.keys()), ['toplist', 'eventids'])
@ -1480,23 +1504,36 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%', force_enable=True)
first = self.user_misp_connector.add_event(first)
# disable_background_processing => returns the parsed data, before insertion
r = self.user_misp_connector.freetext(, '', adhereToWarninglists=False,
distribution=2, returnMetaAttributes=False, pythonify=True)
distribution=2, returnMetaAttributes=False, pythonify=True,
kw_params={'disable_background_processing': 1})
self.assertTrue(isinstance(r, list))
self.assertEqual(r[0].value, '')
# r_wl = self.user_misp_connector.freetext(, '', adhereToWarninglists=True,
# distribution=2, returnMetaAttributes=False)
# print(r_wl)
r = self.user_misp_connector.freetext(, '', adhereToWarninglists='soft',
distribution=2, returnMetaAttributes=False, pythonify=True,
kw_params={'disable_background_processing': 1})
self.assertTrue(isinstance(r, list))
self.assertEqual(r[0].value, '')
event = self.user_misp_connector.get_event(, pythonify=True)
self.assertEqual(event.attributes[3].value, '')
# keep disable_background_processing enabled => returns the same ???? FIXME
r_wl = self.user_misp_connector.freetext(, '', adhereToWarninglists=True,
distribution=2, returnMetaAttributes=False,
kw_params={'disable_background_processing': 0})
self.assertEqual(r_wl[0].value, '')
event = self.user_misp_connector.get_event(, pythonify=True)
for attribute in event.attributes:
self.assertFalse(attribute.value == '')
r = self.user_misp_connector.freetext(, '', adhereToWarninglists=True,
distribution=2, returnMetaAttributes=True)
self.assertTrue(isinstance(r, list))
self.assertTrue(isinstance(r[0]['types'], dict))
# NOTE: required, or the attributes are inserted *after* the event is deleted
# Delete event
@ -1509,15 +1546,15 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(, 'Testcases SG')
self.assertEqual(sharing_group.releasability, 'Testing')
# add org
# r = self.admin_misp_connector.add_org_to_sharing_group(,
#, extend=True)
r = self.admin_misp_connector.add_org_to_sharing_group(,, extend=True)
self.assertEqual(r['name'], 'Organisation added to the sharing group.')
# delete org
# r = self.admin_misp_connector.remove_org_from_sharing_group(,
# self.assertEqual(r['name'], 'Organisation deleted from the sharing group.', r)
# Get list
sharing_groups = self.admin_misp_connector.sharing_groups(pythonify=True)
self.assertTrue(isinstance(sharing_groups, list))
@ -1568,7 +1605,6 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(botvrij.url, "")
# Enable
feed = self.admin_misp_connector.enable_feed(feeds[0].id, pythonify=True)
feed = self.admin_misp_connector.enable_feed_cache(feeds[0].id, pythonify=True)
@ -1622,8 +1658,8 @@ class TestComprehensive(unittest.TestCase):
servers = self.admin_misp_connector.servers(pythonify=True)
self.assertEqual(servers[0].name, 'Updated name')
# Delete
server = self.admin_misp_connector.delete_server(
r = self.admin_misp_connector.delete_server(
self.assertEqual(r['name'], 'Server deleted')
@unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6')
def test_expansion(self):