diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 6796815..c3c147c 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -60,28 +60,37 @@ class AnalystDataBehaviorMixin(AbstractMISP): def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] the_note = MISPNote() - the_note.from_dict(note=note, language=language, - object_uuid=self.uuid, object_type=self.analyst_data_object_type, - **kwargs) + object_uuid = kwargs.pop('object_uuid', self.uuid) + object_type = kwargs.pop('object_type', self.analyst_data_object_type) + the_note.from_dict( + note=note, language=language, object_uuid=object_uuid, + object_type=object_type, contained=True, parent=self, **kwargs + ) self.notes.append(the_note) self.edited = True return the_note def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPOpinion: # type: ignore[no-untyped-def] the_opinion = MISPOpinion() - the_opinion.from_dict(opinion=opinion, comment=comment, - object_uuid=self.uuid, object_type=self.analyst_data_object_type, - **kwargs) + object_uuid = kwargs.pop('object_uuid', self.uuid) + object_type = kwargs.pop('object_type', self.analyst_data_object_type) + the_opinion.from_dict( + opinion=opinion, comment=comment, object_uuid=object_uuid, + object_type=object_type, contained=True, parent=self, **kwargs + ) self.opinions.append(the_opinion) self.edited = True return the_opinion def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPRelationship: # type: ignore[no-untyped-def] the_relationship = MISPRelationship() - the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid, - relationship_type=relationship_type, - object_uuid=self.uuid, object_type=self.analyst_data_object_type, - **kwargs) + the_relationship.from_dict( + related_object_type=related_object_type, + related_object_uuid=related_object_uuid, + relationship_type=relationship_type, object_uuid=self.uuid, + object_type=self.analyst_data_object_type, contained=True, + parent=self, **kwargs + ) self.relationships.append(the_relationship) self.edited = True return the_relationship @@ -93,12 +102,8 @@ class AnalystDataBehaviorMixin(AbstractMISP): relationships = kwargs.pop('Relationship', []) super().from_dict(**kwargs) for note in notes: - note.pop('object_uuid', None) - note.pop('object_type', None) self.add_note(**note) for opinion in opinions: - opinion.pop('object_uuid', None) - opinion.pop('object_type', None) self.add_opinion(**opinion) for relationship in relationships: relationship.pop('object_uuid', None) @@ -2523,6 +2528,10 @@ class MISPAnalystData(AbstractMISP): 'Object', 'Note', 'Opinion', 'Relationship', 'Organisation', 'SharingGroup'} + @property + def analyst_data_object_type(self) -> str: + return self._analyst_data_object_type + @property def org(self) -> MISPOrganisation: return self.Org @@ -2538,6 +2547,10 @@ class MISPAnalystData(AbstractMISP): else: raise PyMISPError('Orgc must be of type MISPOrganisation.') + @property + def parent(self) -> MISPAttribute | MISPEvent | MISPEventReport | MISPObject: + return self.__parent + def __new__(cls, *args, **kwargs): if cls is MISPAnalystData: raise TypeError(f"only children of '{cls.__name__}' may be instantiated") @@ -2552,8 +2565,54 @@ class MISPAnalystData(AbstractMISP): self.created: float | int | datetime self.modified: float | int | datetime self.SharingGroup: MISPSharingGroup + self._analyst_data_object_type: str # Must be defined in the child class + + def add_note(self, note: str, language: str | None = None, object_uuid: str | None = None, object_type: str | None = None, parent: MISPEvent | MISPAttribute | MISPObject | MISPEventReport | None = None, **kwargs: dict[str, Any]) -> MISPNote: + misp_note = MISPNote() + if object_uuid is None: + object_uuid = self.uuid + if object_type is None: + object_type = self.analyst_data_object_type + if parent is None and hasattr(self, 'parent'): + parent = self.parent + misp_note.from_dict( + note=note, language=language, object_uuid=object_uuid, + object_type=object_type, parent=parent, contained=True, **kwargs + ) + if parent is None: + if not hasattr(self, 'Note'): + self.Note: list[MISPNote] = [] + self.Note.append(misp_note) + else: + self.parent.notes.append(misp_note) + self.edited = True + return misp_note + + def add_opinion(self, opinion: int, comment: str | None = None, object_uuid: str | None = None, object_type: str | None = None, parent: MISPEvent | MISPAttribute | MISPObject | MISPEventReport | None = None, **kwargs: dict[str, Any]) -> MISPOpinion: + misp_opinion = MISPOpinion() + if object_uuid is None: + object_uuid = self.uuid + if object_type is None: + object_type = self.analyst_data_object_type + if parent is None and hasattr(self, 'parent'): + parent = self.parent + misp_opinion.from_dict( + opinion=opinion, comment=comment, object_uuid=object_uuid, + object_type=object_type, parent=parent, contained=True, **kwargs + ) + if parent is None: + if not hasattr(self, 'Opinion'): + self.Opinion: list[MISPOpinion] = [] + self.Opinion.append(misp_opinion) + else: + self.parent.opinions.append(misp_opinion) + self.edited = True + return misp_opinion def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + notes = kwargs.pop('Note', []) + opinions = kwargs.pop('Opinion', []) + self.__parent = kwargs.pop('parent', None) self.distribution = kwargs.pop('distribution', None) if self.distribution is not None: self.distribution = int(self.distribution) @@ -2607,6 +2666,11 @@ class MISPAnalystData(AbstractMISP): super().from_dict(**kwargs) + for note in notes: + self.add_note(**note) + for opinion in opinions: + self.add_opinion(**opinion) + def _set_default(self) -> None: if not hasattr(self, 'created'): self.created = datetime.timestamp(datetime.now()) @@ -2614,7 +2678,7 @@ class MISPAnalystData(AbstractMISP): self.modified = self.created -class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): +class MISPNote(MISPAnalystData): _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'}) @@ -2625,8 +2689,8 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): self.language: str super().__init__(**kwargs) - def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] - if 'Note' in kwargs: + def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def] + if not contained and 'Note' in kwargs: kwargs = kwargs['Note'] self.note = kwargs.pop('note', None) if self.note is None: @@ -2639,7 +2703,7 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): +class MISPOpinion(MISPAnalystData): _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'}) @@ -2650,8 +2714,8 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): self.comment: str super().__init__(**kwargs) - def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] - if 'Opinion' in kwargs: + def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def] + if not contained and 'Opinion' in kwargs: kwargs = kwargs['Opinion'] self.opinion = kwargs.pop('opinion', None) if self.opinion is not None: @@ -2673,7 +2737,7 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): +class MISPRelationship(MISPAnalystData): _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'}) @@ -2685,8 +2749,8 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): self.relationship_type: str super().__init__(**kwargs) - def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] - if 'Relationship' in kwargs: + def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def] + if not contained and 'Relationship' in kwargs: kwargs = kwargs['Relationship'] self.related_object_type = kwargs.pop('related_object_type', None) if self.related_object_type is None: diff --git a/tests/test_analyst_data.py b/tests/test_analyst_data.py new file mode 100644 index 0000000..eaf9b34 --- /dev/null +++ b/tests/test_analyst_data.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python + +from __future__ import annotations + +import unittest +from pymisp import (MISPAttribute, MISPEvent, MISPEventReport, MISPNote, + MISPObject, MISPOpinion) +from uuid import uuid4 + + +class TestAnalystData(unittest.TestCase): + def setUp(self) -> None: + self.note_dict = { + "uuid": uuid4(), + "note": "note3" + } + self.opinion_dict = { + "uuid": uuid4(), + "opinion": 75, + "comment": "Agree" + } + + def test_analyst_data_on_attribute(self) -> None: + attribute = MISPAttribute() + attribute.from_dict(type='filename', value='foo.exe') + self._attach_analyst_data(attribute) + + def test_analyst_data_on_attribute_alternative(self) -> None: + event = MISPEvent() + event.info = 'Test on Attribute' + event.add_attribute('domain', 'foo.bar') + self._attach_analyst_data(event.attributes[0]) + + def test_analyst_data_on_event(self) -> None: + event = MISPEvent() + event.info = 'Test Event' + self._attach_analyst_data(event) + + def test_analyst_data_on_event_report(self) -> None: + event_report = MISPEventReport() + event_report.from_dict(name='Test Report', content='This is a report') + self._attach_analyst_data(event_report) + + def test_analyst_data_on_event_report_alternative(self) -> None: + event = MISPEvent() + event.info = 'Test on Event Report' + event.add_event_report('Test Report', 'This is a report') + self._attach_analyst_data(event.event_reports[0]) + + def test_analyst_data_on_object(self) -> None: + misp_object = MISPObject('file') + misp_object.add_attribute('filename', 'foo.exe') + self._attach_analyst_data(misp_object) + + def test_analyst_data_on_object_alternative(self) -> None: + event = MISPEvent() + event.info = 'Test on Object' + misp_object = MISPObject('file') + misp_object.add_attribute('filename', 'foo.exe') + event.add_object(misp_object) + self._attach_analyst_data(event.objects[0]) + + def test_analyst_data_on_object_attribute(self) -> None: + misp_object = MISPObject('file') + object_attribute = misp_object.add_attribute('filename', 'foo.exe') + self._attach_analyst_data(object_attribute) + + def test_analyst_data_object_object_attribute_alternative(self) -> None: + misp_object = MISPObject('file') + misp_object.add_attribute('filename', 'foo.exe') + self._attach_analyst_data(misp_object.attributes[0]) + + def _attach_analyst_data( + self, container: MISPAttribute | MISPEvent | MISPEventReport | MISPObject) -> None: + object_type = container._analyst_data_object_type + note1 = container.add_note(note='note1') + opinion1 = note1.add_opinion(opinion=25, comment='Disagree') + opinion2 = container.add_opinion(opinion=50, comment='Neutral') + note2 = opinion2.add_note(note='note2') + + dict_note = MISPNote() + dict_note.from_dict( + object_type=object_type, object_uuid=container.uuid, **self.note_dict + ) + note3 = container.add_note(**dict_note) + dict_opinion = MISPOpinion() + dict_opinion.from_dict( + object_type='Note', object_uuid=note3.uuid, **self.opinion_dict + ) + container.add_opinion(**dict_opinion) + + self.assertEqual(len(container.notes), 3) + self.assertEqual(len(container.opinions), 3) + + misp_note1, misp_note2, misp_note3 = container.notes + misp_opinion1, misp_opinion2, misp_opinion3 = container.opinions + + self.assertEqual(misp_note1.object_type, object_type) + self.assertEqual(misp_note1.object_uuid, container.uuid) + self.assertEqual(misp_note1.note, 'note1') + + self.assertEqual(misp_note2.object_type, 'Opinion') + self.assertEqual(misp_note2.object_uuid, opinion2.uuid) + self.assertEqual(misp_note2.note, 'note2') + + self.assertEqual(misp_note3.object_type, object_type) + self.assertEqual(misp_note3.object_uuid, container.uuid) + self.assertEqual(misp_note3.note, 'note3') + + self.assertEqual(misp_opinion1.object_type, 'Note') + self.assertEqual(misp_opinion1.object_uuid, note1.uuid) + self.assertEqual(misp_opinion1.opinion, 25) + self.assertEqual(misp_opinion1.comment, 'Disagree') + + self.assertEqual(misp_opinion2.object_type, object_type) + self.assertEqual(misp_opinion2.object_uuid, container.uuid) + self.assertEqual(misp_opinion2.opinion, 50) + self.assertEqual(misp_opinion2.comment, 'Neutral') + + self.assertEqual(misp_opinion3.object_type, 'Note') + self.assertEqual(misp_opinion3.object_uuid, note3.uuid) + self.assertEqual(misp_opinion3.opinion, 75) + self.assertEqual(misp_opinion3.comment, 'Agree')