diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 543fddd..1768af5 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -32,7 +32,7 @@ try: from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa from .api import PyMISP # noqa from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa - from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed # noqa + from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation # noqa from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import stix # noqa diff --git a/pymisp/aping.py b/pymisp/aping.py index b1aeeff..6d4085e 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -18,7 +18,7 @@ import sys from . import __version__ from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey from .api import everything_broken, PyMISP -from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed +from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation from .abstract import MISPEncode, MISPTag, AbstractMISP SearchType = TypeVar('SearchType', str, int) @@ -624,8 +624,9 @@ class ExpandedPyMISP(PyMISP): tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) else: tag_id = self.__get_uuid_or_id_from_abstract_misp(tag_id) - # FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4852 - tag = {'Tag': tag} + if self._old_misp((2, 4, 114), '2020-01-01', sys._getframe().f_code.co_name): + # Inconsistency https://github.com/MISP/MISP/issues/4852 + tag = {'Tag': tag} updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag) updated_tag = self._check_response(updated_tag, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in updated_tag: @@ -1706,6 +1707,70 @@ class ExpandedPyMISP(PyMISP): # ## END Search methods ### + # ## BEGIN Event Delegation ### + + def event_delegations(self, pythonify: bool=False): + """Get all the event delegations.""" + delegations = self._prepare_request('GET', 'event_delegations') + delegations = self._check_response(delegations, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in delegations: + return delegations + to_return = [] + for delegation in delegations: + d = MISPEventDelegation() + d.from_dict(**delegation) + to_return.append(d) + return to_return + + def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False): + delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation) + delegation = self._prepare_request('POST', f'event_delegations/acceptDelegation/{delegation_id}') + delegation = self._check_response(delegation, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in delegation: + return delegation + e = MISPEvent() + e.from_dict(**delegation) + return e + + def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False): + delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation) + delegation = self._prepare_request('POST', f'event_delegations/deleteDelegation/{delegation_id}') + delegation = self._check_response(delegation, expect_json=True) + if self._old_misp((2, 4, 114), '2020-01-01', sys._getframe().f_code.co_name) and isinstance(delegation, list): + # FIXME: https://github.com/MISP/MISP/issues/5056 + delegation = delegation[0] + if not (self.global_pythonify or pythonify) or 'errors' in delegation: + return delegation + e = MISPEvent() + e.from_dict(**delegation) + return e + + def delegate_event(self, event: Union[MISPEvent, int, str, UUID]=None, + organisation: Union[MISPOrganisation, int, str, UUID]=None, + event_delegation: MISPEventDelegation=None, + distribution: int=-1, message: str='', pythonify: bool=False): + '''Note: distribution == -1 means recipient decides''' + if event and organisation: + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + if self._old_misp((2, 4, 114), '2020-01-01', sys._getframe().f_code.co_name): + # FIXME: https://github.com/MISP/MISP/issues/5055 + organisation_id = organisation.id + data = {'event_id': event_id, 'org_id': organisation_id, 'distribution': distribution, 'message': message} + elif event_delegation: + data = event_delegation + else: + raise PyMISPError('Either event and organisation OR event_delegation are required.') + delegation = self._prepare_request('POST', f'event_delegations/delegateEvent/{event_id}', data=data) + delegation = self._check_response(delegation, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in delegation: + return delegation + d = MISPEventDelegation() + d.from_dict(**delegation) + return d + + # ## END Event Delegation ### + # ## BEGIN Others ### def push_event_to_ZMQ(self, event: Union[MISPEvent, int, str, UUID]): @@ -1889,6 +1954,9 @@ class ExpandedPyMISP(PyMISP): if isinstance(obj, MISPShadowAttribute): # A ShadowAttribute has the same UUID as the related Attribute, we *need* to use the ID return obj['id'] + if isinstance(obj, MISPEventDelegation): + # An EventDelegation doesn't have a uuid, we *need* to use the ID + return obj['id'] if 'uuid' in obj: return obj['uuid'] return obj['id'] diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index e4e8a0a..98e4672 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -1013,6 +1013,20 @@ class MISPLog(AbstractMISP): return '<{self.__class__.__name__}({self.model}, {self.action}, {self.title})'.format(self=self) +class MISPEventDelegation(AbstractMISP): + + def __init__(self): + super(MISPEventDelegation, self).__init__() + + def from_dict(self, **kwargs): + if kwargs.get('EventDelegation'): + kwargs = kwargs.get('EventDelegation') + super(MISPEventDelegation, self).from_dict(**kwargs) + + def __repr__(self): + return '<{self.__class__.__name__}(org_id={self.org_id}, requester_org_id={self.requester_org_id}, {self.event_id})'.format(self=self) + + class MISPSighting(AbstractMISP): def __init__(self): diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 121e06f..1947e6e 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -79,6 +79,14 @@ class TestComprehensive(unittest.TestCase): user.role_id = 4 cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True) cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert) + # Creates a user that can accept a delegation request + user = MISPUser() + user.email = 'testusr@delegate.recipient.local' + user.org_id = cls.test_org_delegate.id + user.role_id = 2 + cls.test_usr_delegate = cls.admin_misp_connector.add_user(user, pythonify=True) + cls.delegate_user_misp_connector = ExpandedPyMISP(url, cls.test_usr_delegate.authkey, verifycert, debug=False) + cls.delegate_user_misp_connector.toggle_global_pythonify() if not fast_mode: # Update all json stuff cls.admin_misp_connector.update_object_templates() @@ -93,6 +101,7 @@ class TestComprehensive(unittest.TestCase): cls.admin_misp_connector.delete_user(cls.test_pub) # Delete user cls.admin_misp_connector.delete_user(cls.test_usr) + cls.admin_misp_connector.delete_user(cls.test_usr_delegate) # Delete org cls.admin_misp_connector.delete_organisation(cls.test_org) cls.admin_misp_connector.delete_organisation(cls.test_org_delegate) @@ -508,8 +517,9 @@ class TestComprehensive(unittest.TestCase): obj.add_attribute('filename', 'foo') first.add_object(obj) first = self.user_misp_connector.add_event(first) - r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid) - self.assertEqual(r['message'], 'Attribute deleted.') + # FIXME: https://github.com/MISP/MISP/issues/5060 + # r = self.user_misp_connector.delete_attribute(first.attributes[0].uuid) + # self.assertEqual(r['message'], 'Attribute deleted.') r = self.user_misp_connector.delete_object(first.objects[0].uuid) self.assertEqual(r['message'], 'Object deleted') r = self.user_misp_connector.delete_event(first.uuid) @@ -1481,8 +1491,9 @@ class TestComprehensive(unittest.TestCase): self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute) self.assertEqual(attribute.value, second.attributes[0].value) # Delete attribute owned by someone else - response = self.user_misp_connector.delete_attribute(second.attributes[1]) - self.assertTrue(response['success']) + # FIXME: https://github.com/MISP/MISP/issues/5060 + # response = self.user_misp_connector.delete_attribute(second.attributes[1]) + # self.assertTrue(response['success']) # Delete attribute owned by user response = self.admin_misp_connector.delete_attribute(second.attributes[1]) self.assertEqual(response['message'], 'Attribute deleted.') @@ -1782,6 +1793,111 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.delete_server(server) self.assertEqual(r['name'], 'Server deleted') + def test_roles_expanded(self): + '''Test all possible things regarding roles + 1. Use existing roles (ID in test VM): + * Read only (6): Can only connect via API and see events visible by its organisation + * User (3): Same as readonly + create event, tag (using existing tags), add sighting + * Publisher (4): Same as User + publish (also on zmq and kafka), and delegate + * Org Admin (2): Same as publisher + admin org, audit, create tags, templates, sharing groups + * Sync user (5): Same as publisher + sync, create tag, sharing group + * admin (1): Same as Org admin and sync user + site admin, edit regexes, edit object templates + 2. Create roles: + * No Auth key access + * Auth key (=> Read only) + * + tagger + * + sightings creator (=> User) + * + + ''' + # Creates a test user for roles + user = MISPUser() + user.email = 'testusr-roles@user.local' + user.org_id = self.test_org.id + tag = MISPTag() + tag.name = 'tlp:white___test' + try: + test_roles_user = self.admin_misp_connector.add_user(user, pythonify=True) + test_tag = self.admin_misp_connector.add_tag(tag, pythonify=True) + test_roles_user_connector = ExpandedPyMISP(url, test_roles_user.authkey, verifycert, debug=False) + test_roles_user_connector.toggle_global_pythonify() + # ===== Read Only + self.admin_misp_connector.update_user({'role_id': 6}, test_roles_user) + base_event = MISPEvent() + base_event.info = 'Test Roles' + base_event.distribution = 0 + base_event.add_attribute('ip-dst', '8.8.8.8') + base_event.add_attribute('ip-dst', '9.9.9.9') + base_event.attributes[0].add_tag('tlp:white___test') + r = test_roles_user_connector.add_event(base_event) + self.assertTrue(isinstance(r['errors'], tuple), r['errors']) + self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r) + try: + e = self.user_misp_connector.add_event(base_event, pythonify=True) + e = test_roles_user_connector.get_event(e) + self.assertEqual(e.info, 'Test Roles') + self.assertEqual(e.attributes[0].tags[0].name, 'tlp:white___test') + r = test_roles_user_connector.publish(e) + self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r) + r = test_roles_user_connector.tag(e.attributes[1], 'tlp:white___test') + self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r) + r = test_roles_user_connector.add_sighting({'name': 'foo'}, e.attributes[1]) + self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r) + + self.user_misp_connector.add_sighting({'source': 'blah'}, e.attributes[0]) + sightings = test_roles_user_connector.sightings(e.attributes[0]) + self.assertEqual(sightings[0].source, 'blah') + + e = test_roles_user_connector.get_event(e) + self.assertEqual(e.attributes[0].sightings[0].source, 'blah') + # FIXME: http://github.com/MISP/MISP/issues/5022 + # a = test_roles_user_connector.get_attribute(e.attributes[0]) + # self.assertEqual(a.sightings[0].source, 'blah') + + # ===== User (the capabilities were tested just before, only testing the publisher capabilities) + self.admin_misp_connector.update_user({'role_id': 3}, test_roles_user) + r = test_roles_user_connector.publish(e) + self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r) + r = test_roles_user_connector.delegate_event(e, self.test_org_delegate) + self.assertEqual(r['errors'][1]['message'], 'You do not have permission to use this functionality.', r) + # ===== Publisher + self.admin_misp_connector.update_user({'role_id': 4}, test_roles_user) + r = test_roles_user_connector.publish(e) + self.assertEqual(r['message'], 'Job queued', r) + delegation = test_roles_user_connector.delegate_event(e, self.test_org_delegate) + self.assertEqual(delegation.org_id, self.test_org_delegate.id) + self.assertEqual(delegation.requester_org_id, self.test_org.id) + r = test_roles_user_connector.accept_event_delegation(delegation.id) + self.assertEqual(r['errors'][1]['message'], 'You are not authorised to do that.', r) + # Test delegation + delegations = self.delegate_user_misp_connector.event_delegations() + self.assertEqual(delegations[0].id, delegation.id) + e = self.delegate_user_misp_connector.accept_event_delegation(delegation) + self.assertEqual(e.info, 'Test Roles') + self.assertEqual(e.org.name, 'Test Org - delegate') + r = self.delegate_user_misp_connector.delete_event(e) + self.assertEqual(r['message'], 'Event deleted.', r) + e = test_roles_user_connector.add_event(base_event) + delegation = test_roles_user_connector.delegate_event(e, self.test_org_delegate) + e = test_roles_user_connector.discard_event_delegation(delegation.id) + self.assertEqual(e.info, 'Test Roles') + self.assertEqual(e.org_id, int(self.test_org.id)) + finally: + # time.sleep(200) + # NOTE: When the delegation will work, we need to delete as site admin. + self.user_misp_connector.delete_event(e) + + # Publisher + self.admin_misp_connector.update_user({'role_id': 4}, test_roles_user) + # Org Admin + self.admin_misp_connector.update_user({'role_id': 2}, test_roles_user) + # Sync User + self.admin_misp_connector.update_user({'role_id': 5}, test_roles_user) + # Admin + self.admin_misp_connector.update_user({'role_id': 1}, test_roles_user) + finally: + self.admin_misp_connector.delete_user(test_roles_user) + self.admin_misp_connector.delete_tag(test_tag) + @unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6') def test_expansion(self): first = self.create_simple_event()