new: Delegate Event

And more test cases
pull/471/head
Raphaël Vinot 2019-08-26 16:24:39 +02:00
parent 101ec5f9ed
commit 0b7314c474
4 changed files with 206 additions and 8 deletions

View File

@ -32,7 +32,7 @@ try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .api import PyMISP # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -18,7 +18,7 @@ import sys
from . import __version__
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey
from .api import everything_broken, PyMISP
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation
from .abstract import MISPEncode, MISPTag, AbstractMISP
SearchType = TypeVar('SearchType', str, int)
@ -624,8 +624,9 @@ class ExpandedPyMISP(PyMISP):
tag_id = self.__get_uuid_or_id_from_abstract_misp(tag)
else:
tag_id = self.__get_uuid_or_id_from_abstract_misp(tag_id)
# FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4852
tag = {'Tag': tag}
if self._old_misp((2, 4, 114), '2020-01-01', sys._getframe().f_code.co_name):
# Inconsistency https://github.com/MISP/MISP/issues/4852
tag = {'Tag': tag}
updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag)
updated_tag = self._check_response(updated_tag, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_tag:
@ -1706,6 +1707,70 @@ class ExpandedPyMISP(PyMISP):
# ## END Search methods ###
# ## BEGIN Event Delegation ###
def event_delegations(self, pythonify: bool=False):
"""Get all the event delegations."""
delegations = self._prepare_request('GET', 'event_delegations')
delegations = self._check_response(delegations, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in delegations:
return delegations
to_return = []
for delegation in delegations:
d = MISPEventDelegation()
d.from_dict(**delegation)
to_return.append(d)
return to_return
def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False):
delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation)
delegation = self._prepare_request('POST', f'event_delegations/acceptDelegation/{delegation_id}')
delegation = self._check_response(delegation, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in delegation:
return delegation
e = MISPEvent()
e.from_dict(**delegation)
return e
def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False):
delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation)
delegation = self._prepare_request('POST', f'event_delegations/deleteDelegation/{delegation_id}')
delegation = self._check_response(delegation, expect_json=True)
if self._old_misp((2, 4, 114), '2020-01-01', sys._getframe().f_code.co_name) and isinstance(delegation, list):
# FIXME: https://github.com/MISP/MISP/issues/5056
delegation = delegation[0]
if not (self.global_pythonify or pythonify) or 'errors' in delegation:
return delegation
e = MISPEvent()
e.from_dict(**delegation)
return e
def delegate_event(self, event: Union[MISPEvent, int, str, UUID]=None,
organisation: Union[MISPOrganisation, int, str, UUID]=None,
event_delegation: MISPEventDelegation=None,
distribution: int=-1, message: str='', pythonify: bool=False):
'''Note: distribution == -1 means recipient decides'''
if event and organisation:
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation)
if self._old_misp((2, 4, 114), '2020-01-01', sys._getframe().f_code.co_name):
# FIXME: https://github.com/MISP/MISP/issues/5055
organisation_id = organisation.id
data = {'event_id': event_id, 'org_id': organisation_id, 'distribution': distribution, 'message': message}
elif event_delegation:
data = event_delegation
else:
raise PyMISPError('Either event and organisation OR event_delegation are required.')
delegation = self._prepare_request('POST', f'event_delegations/delegateEvent/{event_id}', data=data)
delegation = self._check_response(delegation, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in delegation:
return delegation
d = MISPEventDelegation()
d.from_dict(**delegation)
return d
# ## END Event Delegation ###
# ## BEGIN Others ###
def push_event_to_ZMQ(self, event: Union[MISPEvent, int, str, UUID]):
@ -1889,6 +1954,9 @@ class ExpandedPyMISP(PyMISP):
if isinstance(obj, MISPShadowAttribute):
# A ShadowAttribute has the same UUID as the related Attribute, we *need* to use the ID
return obj['id']
if isinstance(obj, MISPEventDelegation):
# An EventDelegation doesn't have a uuid, we *need* to use the ID
return obj['id']
if 'uuid' in obj:
return obj['uuid']
return obj['id']

View File

@ -1013,6 +1013,20 @@ class MISPLog(AbstractMISP):
return '<{self.__class__.__name__}({self.model}, {self.action}, {self.title})'.format(self=self)
class MISPEventDelegation(AbstractMISP):
def __init__(self):
super(MISPEventDelegation, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('EventDelegation'):
kwargs = kwargs.get('EventDelegation')
super(MISPEventDelegation, self).from_dict(**kwargs)
def __repr__(self):
return '<{self.__class__.__name__}(org_id={self.org_id}, requester_org_id={self.requester_org_id}, {self.event_id})'.format(self=self)
class MISPSighting(AbstractMISP):
def __init__(self):

View File

@ -79,6 +79,14 @@ class TestComprehensive(unittest.TestCase):
user.role_id = 4
cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True)
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert)
# Creates a user that can accept a delegation request
user = MISPUser()
user.email = 'testusr@delegate.recipient.local'
user.org_id = cls.test_org_delegate.id
user.role_id = 2
cls.test_usr_delegate = cls.admin_misp_connector.add_user(user, pythonify=True)
cls.delegate_user_misp_connector = ExpandedPyMISP(url, cls.test_usr_delegate.authkey, verifycert, debug=False)
cls.delegate_user_misp_connector.toggle_global_pythonify()
if not fast_mode:
# Update all json stuff
cls.admin_misp_connector.update_object_templates()
@ -93,6 +101,7 @@ class TestComprehensive(unittest.TestCase):
cls.admin_misp_connector.delete_user(cls.test_pub)
# Delete user
cls.admin_misp_connector.delete_user(cls.test_usr)
cls.admin_misp_connector.delete_user(cls.test_usr_delegate)
# Delete org
cls.admin_misp_connector.delete_organisation(cls.test_org)
cls.admin_misp_connector.delete_organisation(cls.test_org_delegate)
@ -508,8 +517,9 @@ class TestComprehensive(unittest.TestCase):
obj.add_attribute('filename', 'foo')
first.add_object(obj)
first = self.user_misp_connector.add_event(first)
r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid)
self.assertEqual(r['message'], 'Attribute deleted.')
# FIXME: https://github.com/MISP/MISP/issues/5060
# r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid)
# self.assertEqual(r['message'], 'Attribute deleted.')
r = self.user_misp_connector.delete_object(first.objects[0].uuid)
self.assertEqual(r['message'], 'Object deleted')
r = self.user_misp_connector.delete_event(first.uuid)
@ -1481,8 +1491,9 @@ class TestComprehensive(unittest.TestCase):
self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute)
self.assertEqual(attribute.value, second.attributes[0].value)
# Delete attribute owned by someone else
response = self.user_misp_connector.delete_attribute(second.attributes[1])
self.assertTrue(response['success'])
# FIXME: https://github.com/MISP/MISP/issues/5060
# response = self.user_misp_connector.delete_attribute(second.attributes[1])
# self.assertTrue(response['success'])
# Delete attribute owned by user
response = self.admin_misp_connector.delete_attribute(second.attributes[1])
self.assertEqual(response['message'], 'Attribute deleted.')
@ -1782,6 +1793,111 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.delete_server(server)
self.assertEqual(r['name'], 'Server deleted')
def test_roles_expanded(self):
'''Test all possible things regarding roles
1. Use existing roles (ID in test VM):
* Read only (6): Can only connect via API and see events visible by its organisation
* User (3): Same as readonly + create event, tag (using existing tags), add sighting
* Publisher (4): Same as User + publish (also on zmq and kafka), and delegate
* Org Admin (2): Same as publisher + admin org, audit, create tags, templates, sharing groups
* Sync user (5): Same as publisher + sync, create tag, sharing group
* admin (1): Same as Org admin and sync user + site admin, edit regexes, edit object templates
2. Create roles:
* No Auth key access
* Auth key (=> Read only)
* + tagger
* + sightings creator (=> User)
* +
'''
# Creates a test user for roles
user = MISPUser()
user.email = 'testusr-roles@user.local'
user.org_id = self.test_org.id
tag = MISPTag()
tag.name = 'tlp:white___test'
try:
test_roles_user = self.admin_misp_connector.add_user(user, pythonify=True)
test_tag = self.admin_misp_connector.add_tag(tag, pythonify=True)
test_roles_user_connector = ExpandedPyMISP(url, test_roles_user.authkey, verifycert, debug=False)
test_roles_user_connector.toggle_global_pythonify()
# ===== Read Only
self.admin_misp_connector.update_user({'role_id': 6}, test_roles_user)
base_event = MISPEvent()
base_event.info = 'Test Roles'
base_event.distribution = 0
base_event.add_attribute('ip-dst', '8.8.8.8')
base_event.add_attribute('ip-dst', '9.9.9.9')
base_event.attributes[0].add_tag('tlp:white___test')
r = test_roles_user_connector.add_event(base_event)
self.assertTrue(isinstance(r['errors'], tuple), r['errors'])
self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r)
try:
e = self.user_misp_connector.add_event(base_event, pythonify=True)
e = test_roles_user_connector.get_event(e)
self.assertEqual(e.info, 'Test Roles')
self.assertEqual(e.attributes[0].tags[0].name, 'tlp:white___test')
r = test_roles_user_connector.publish(e)
self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r)
r = test_roles_user_connector.tag(e.attributes[1], 'tlp:white___test')
self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r)
r = test_roles_user_connector.add_sighting({'name': 'foo'}, e.attributes[1])
self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r)
self.user_misp_connector.add_sighting({'source': 'blah'}, e.attributes[0])
sightings = test_roles_user_connector.sightings(e.attributes[0])
self.assertEqual(sightings[0].source, 'blah')
e = test_roles_user_connector.get_event(e)
self.assertEqual(e.attributes[0].sightings[0].source, 'blah')
# FIXME: http://github.com/MISP/MISP/issues/5022
# a = test_roles_user_connector.get_attribute(e.attributes[0])
# self.assertEqual(a.sightings[0].source, 'blah')
# ===== User (the capabilities were tested just before, only testing the publisher capabilities)
self.admin_misp_connector.update_user({'role_id': 3}, test_roles_user)
r = test_roles_user_connector.publish(e)
self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r)
r = test_roles_user_connector.delegate_event(e, self.test_org_delegate)
self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r)
# ===== Publisher
self.admin_misp_connector.update_user({'role_id': 4}, test_roles_user)
r = test_roles_user_connector.publish(e)
self.assertEqual(r['message'], 'Job queued', r)
delegation = test_roles_user_connector.delegate_event(e, self.test_org_delegate)
self.assertEqual(delegation.org_id, self.test_org_delegate.id)
self.assertEqual(delegation.requester_org_id, self.test_org.id)
r = test_roles_user_connector.accept_event_delegation(delegation.id)
self.assertEqual(r['errors'][1]['message'], 'You are not authorised to do that.', r)
# Test delegation
delegations = self.delegate_user_misp_connector.event_delegations()
self.assertEqual(delegations[0].id, delegation.id)
e = self.delegate_user_misp_connector.accept_event_delegation(delegation)
self.assertEqual(e.info, 'Test Roles')
self.assertEqual(e.org.name, 'Test Org - delegate')
r = self.delegate_user_misp_connector.delete_event(e)
self.assertEqual(r['message'], 'Event deleted.', r)
e = test_roles_user_connector.add_event(base_event)
delegation = test_roles_user_connector.delegate_event(e, self.test_org_delegate)
e = test_roles_user_connector.discard_event_delegation(delegation.id)
self.assertEqual(e.info, 'Test Roles')
self.assertEqual(e.org_id, int(self.test_org.id))
finally:
# time.sleep(200)
# NOTE: When the delegation will work, we need to delete as site admin.
self.user_misp_connector.delete_event(e)
# Publisher
self.admin_misp_connector.update_user({'role_id': 4}, test_roles_user)
# Org Admin
self.admin_misp_connector.update_user({'role_id': 2}, test_roles_user)
# Sync User
self.admin_misp_connector.update_user({'role_id': 5}, test_roles_user)
# Admin
self.admin_misp_connector.update_user({'role_id': 1}, test_roles_user)
finally:
self.admin_misp_connector.delete_user(test_roles_user)
self.admin_misp_connector.delete_tag(test_tag)
@unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6')
def test_expansion(self):
first = self.create_simple_event()