diff --git a/pymisp/abstract.py b/pymisp/abstract.py index a7ccb8e..88e881d 100644 --- a/pymisp/abstract.py +++ b/pymisp/abstract.py @@ -147,6 +147,19 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): # type: i treatment are processed. Note: This method is used when you initialize an object with existing data so by default, the class is flaged as not edited.""" + # Recursively loads more analyst data + from pymisp.mispevent import AnalystDataBehaviorMixin, MISPNote, MISPOpinion, MISPRelationship + if isinstance(self, AnalystDataBehaviorMixin): + for analystType in ['Note', 'Opinion', 'Relationship']: + if kwargs.get(analystType): + analystDataList = kwargs.pop(analystType) + for analystDataDict in analystDataList: + analystData = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(analystType, MISPNote)() + analystDataDict['object_uuid'] = self.uuid if 'object_uuid' not in analystDataDict else analystDataDict['object_uuid'] + analystDataDict['object_type'] = self.classObjectType + analystData.from_dict(**analystDataDict) + {'Note': self.notes, 'Opinion': self.opinions, 'Relationship': self.relationships}.get(analystType, 'Note').append(analystData) + for prop, value in kwargs.items(): if value is None: continue diff --git a/pymisp/api.py b/pymisp/api.py index 0803561..e10c04f 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -31,7 +31,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \ - MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel + MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \ + MISPNote, MISPOpinion, MISPRelationship from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types @@ -587,6 +588,186 @@ class PyMISP: # ## END Event Report ### + # ## BEGIN Analyst Data ### + def get_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID, + pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: + """Get an analyst data from a MISP instance + + :param analyst_data: analyst data to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + type = analyst_data.classObjectType + analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) + r = self._prepare_request('GET', f'analyst_data/view/{type}/{analyst_data_id}') + analyst_data_r = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r: + return analyst_data_r + er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)() + er.from_dict(**analyst_data_r) + return er + + def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, + pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: + """Add an analyst data to an existing MISP element + + :param analyst_data: analyst_data to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + type = analyst_data.classObjectType + object_uuid = analyst_data.object_uuid + object_type = analyst_data.object_type + r = self._prepare_request('POST', f'analyst_data/add/{type}/{object_uuid}/{object_type}', data=analyst_data) + new_analyst_data = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data: + return new_analyst_data + er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)() + er.from_dict(**new_analyst_data) + return er + + def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None, + pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: + """Update an analyst data on a MISP instance + + :param analyst_data: analyst data to update + :param analyst_data_id: analyst data ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + type = analyst_data.classObjectType + if analyst_data_id is None: + adid = get_uuid_or_id_from_abstract_misp(analyst_data) + else: + adid = get_uuid_or_id_from_abstract_misp(analyst_data_id) + r = self._prepare_request('POST', f'analyst_data/edit/{type}/{adid}', data=analyst_data) + updated_analyst_data = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data: + return updated_analyst_data + er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)() + er.from_dict(**updated_analyst_data) + return er + + def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete an analyst data from a MISP instance + + :param analyst_data: analyst data to delete + """ + type = analyst_data.classObjectType + analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) + request_url = f'analyst_data/delete/{type}/{analyst_data_id}' + data = {} + r = self._prepare_request('POST', request_url, data=data) + return self._check_json_response(r) + + # ## END Analyst Data ### + + # ## BEGIN Analyst Note ### + + def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote: + """Get a note from a MISP instance + + :param note: note to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + return self.get_analyst_data(note, pythonify) + + def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote: + """Add a note to an existing MISP element + + :param note: note to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.add_analyst_data(note, pythonify) + + def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote: + """Update a note on a MISP instance + + :param note: note to update + :param note_id: note ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.update_analyst_data(note, note_id, pythonify) + + def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete a note from a MISP instance + + :param note: note delete + """ + return self.delete_analyst_data(note) + + # ## END Analyst Note ### + + # ## BEGIN Analyst Opinion ### + + def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion: + """Get an opinion from a MISP instance + + :param opinion: opinion to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + return self.get_analyst_data(opinion, pythonify) + + def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion: + """Add an opinion to an existing MISP element + + :param opinion: opinion to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.add_analyst_data(opinion, pythonify) + + def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion: + """Update an opinion on a MISP instance + + :param opinion: opinion to update + :param opinion_id: opinion ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.update_analyst_data(opinion, opinion_id, pythonify) + + def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete an opinion from a MISP instance + + :param opinion: opinion to delete + """ + return self.delete_analyst_data(opinion) + + # ## END Analyst Opinion ### + + # ## BEGIN Analyst Relationship ### + + def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship: + """Get a relationship from a MISP instance + + :param relationship: relationship to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + return self.get_analyst_data(relationship, pythonify) + + def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship: + """Add a relationship to an existing MISP element + + :param relationship: relationship to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.add_analyst_data(relationship, pythonify) + + def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship: + """Update a relationship on a MISP instance + + :param relationship: relationship to update + :param relationship_id: relationship ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.update_analyst_data(relationship, relationship_id, pythonify) + + def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete a relationship from a MISP instance + + :param relationship: relationship to delete + """ + return self.delete_analyst_data(relationship) + + # ## END Analyst Relationship ### + + # ## BEGIN Object ### def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject: diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 227b66a..dd9f73d 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -2425,9 +2425,6 @@ class MISPAnalystData(AbstractMISP): self.note_type_name = self.classObjectType def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] - if 'Note' in kwargs: - kwargs = kwargs['Note'] - self.distribution = kwargs.pop('distribution', None) if self.distribution is not None: self.distribution = int(self.distribution) @@ -2447,10 +2444,10 @@ class MISPAnalystData(AbstractMISP): self.object_uuid = kwargs.pop('object_uuid', None) if self.object_uuid is None: - raise NewAnalystDataError('The UUID for which this note is attached is required.') + raise NewAnalystDataError('The UUID for which this element is attached is required.') self.object_type = kwargs.pop('object_type', None) if self.object_type is None: - raise NewAnalystDataError('The element type for which this note is attached is required.') + raise NewAnalystDataError('The element type for which this element is attached is required.') if self.object_type not in self.valid_object_type: raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.') @@ -2461,13 +2458,13 @@ class MISPAnalystData(AbstractMISP): if isinstance(ts, datetime): self.created = ts else: - self.created = datetime.fromtimestamp(int(ts), timezone.utc) + self.created = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ if kwargs.get('modified'): ts = kwargs.pop('modified') if isinstance(ts, datetime): self.modified = ts else: - self.modified = datetime.fromtimestamp(int(ts), timezone.utc) + self.modified = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ if kwargs.get('Org'): self.Org = MISPOrganisation() @@ -2500,6 +2497,8 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): super().__init__(**kwargs) def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Note' in kwargs: + kwargs = kwargs['Note'] self.note = kwargs.pop('note', None) if self.note is None: raise NewNoteError('The text note of the note is required.') @@ -2524,6 +2523,8 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): super().__init__(**kwargs) def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Opinion' in kwargs: + kwargs = kwargs['Opinion'] self.opinion = kwargs.pop('opinion', None) if self.opinion is not None: self.opinion = int(self.opinion) @@ -2557,7 +2558,8 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): super().__init__(**kwargs) def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] - + if 'Relationship' in kwargs: + kwargs = kwargs['Relationship'] self.related_object_type = kwargs.pop('related_object_type', None) if self.related_object_type is None: raise NewRelationshipError('The target object type for this relationship is required.') diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index e741acc..2279afb 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -25,7 +25,8 @@ try: MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster, - MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist) + MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist, + MISPNote, MISPOpinion, MISPRelationship) from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator except ImportError: raise @@ -3197,6 +3198,140 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(event) self.admin_misp_connector.toggle_global_pythonify() + def test_analyst_data_CRUD(self) -> None: + event = self.create_simple_event() + try: + fake_uuid = str(uuid4()) + new_note1 = MISPNote() + new_note1.object_type = 'Event' + new_note1.object_uuid = fake_uuid + new_note1.note = 'Fake note' + new_note1 = self.user_misp_connector.add_note(new_note1) + # The Note should be linked even for non-existing data + self.assertTrue(new_note1.object_uuid == fake_uuid) + + new_note1.note = "Updated Note" + new_note1 = self.user_misp_connector.update_note(new_note1) + # The Note should be updatable + self.assertTrue(new_note1.note == "Updated Note") + + # The Note should be able to get an Opinion + new_opinion = new_note1.add_opinion(42, 'Test Opinion') + new_note1 = self.user_misp_connector.update_note(new_note1) + # Fetch newly added node + new_note1 = self.user_misp_connector.get_note(new_note1) + # The Opinion shoud be able to be created via the Note + self.assertTrue(new_note1.Opinion[0].opinion == new_opinion.opinion) + + response = self.user_misp_connector.delete_note(new_note1) + # The Note should be deletable + self.assertTrue(response['success']) + self.assertEqual(response['message'], f'Note deleted.') + # The Opinion should not be deleted + opinion_resp = self.user_misp_connector.get_opinion(new_opinion) + self.assertTrue(opinion_resp.opinion == new_opinion.opinion) + + new_note: MISPNote = event.add_note(note='Test Note', language='en') + new_note.distribution = 1 # Community + event = self.user_misp_connector.add_event(event) + # The note should be linked by Event UUID + self.assertEqual(new_note.object_type, 'Event') + self.assertTrue(new_note.object_uuid == event.uuid) + + event = self.user_misp_connector.get_event(event) + # The Note should be present on the event + self.assertTrue(event.Note[0].object_uuid == event.uuid) + + finally: + self.admin_misp_connector.delete_event(event) + try: + self.admin_misp_connector.delete_opinion(new_opinion) + self.admin_misp_connector.delete_note(new_note) + self.admin_misp_connector.delete_note(new_note1) # Should already be deleted + except: + pass + + def test_analyst_data_ACL(self) -> None: + event = self.create_simple_event() + event.distribution = 2 + sg = MISPSharingGroup() + sg.name = 'Testcases SG' + sg.releasability = 'Testing' + sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True) + # Chec that sharing group was created + self.assertEqual(sharing_group.name, 'Testcases SG') + + try: + new_note: MISPNote = event.add_note(note='Test Note', language='en') + new_note.distribution = 0 # Org only + event = self.admin_misp_connector.add_event(event) + + # The note should be linked by Event UUID + self.assertEqual(new_note.object_type, 'Event') + self.assertEqual(event.uuid, new_note.object_uuid) + + event = self.admin_misp_connector.get_event(event) + # The note should be visible for the creator + self.assertEqual(len(event.Note), 1) + self.assertTrue(new_note.note == "Test Note") + + resp = self.user_misp_connector.get_note(new_note) + # The note should not be visible to another org + self.assertTrue(len(resp), 0) + + event = self.user_misp_connector.get_event(event) + # The Note attached to the event should not be visible for another org than the creator + self.assertEqual(len(event.Note), 0) + + new_note = self.admin_misp_connector.get_note(new_note) + new_note.distribution = 4 + new_note.sharing_group_id = sharing_group.id + new_note = self.admin_misp_connector.update_note(new_note) + self.assertEqual(int(new_note.sharing_group_id), int(sharing_group.id)) + + event = self.user_misp_connector.get_event(event) + # The Note attached to the event should not be visible for another org not part of the sharing group + self.assertEqual(len(event.Note), 0) + + # Add org to the sharing group + r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group, + self.test_org, extend=True) + self.assertEqual(r['name'], 'Organisation added to the sharing group.') + + event = self.user_misp_connector.get_event(event) + # The Note attached to the event should now be visible + self.assertEqual(len(event.Note), 1) + + new_note.note = "Updated Note" + resp = self.user_misp_connector.update_note(new_note) + # The Note should not be updatable by another organisation + self.assertTrue(resp['errors']) + + resp = self.user_misp_connector.delete_note(new_note) + # The Note should not be deletable by another organisation + self.assertTrue(resp['errors']) + + organisation = MISPOrganisation() + organisation.name = 'Fake Org' + fake_org = self.admin_misp_connector.add_organisation(organisation, pythonify=True) + new_note_2 = new_note.add_note('Test Note 2') + new_note_2.orgc_uuid = fake_org.uuid + new_note_2 = self.user_misp_connector.add_note(new_note_2) + # Regular user should not be able to create a note on behalf of another organisation + self.assertFalse(new_note_2.orgc_uuid == fake_org.uuid) + # Note should have the orgc set to the use's organisation for non-privileged users + self.assertTrue(new_note_2.orgc_uuid == self.test_org.uuid) + + finally: + self.admin_misp_connector.delete_event(event) + try: + pass + self.admin_misp_connector.delete_sharing_group(sharing_group.id) + self.admin_misp_connector.delete_organisation(fake_org) + self.admin_misp_connector.delete_note(new_note) + except: + pass + @unittest.skip("Internal use only") def missing_methods(self) -> None: skip = [