Merge branch 'chrisr3d-analyst_data_fix'

main
Raphaël Vinot 2024-12-20 15:38:36 +01:00
commit 3d3ee2b7fa
No known key found for this signature in database
GPG Key ID: 32E4E1C133B3792F
2 changed files with 210 additions and 23 deletions

View File

@ -60,28 +60,37 @@ class AnalystDataBehaviorMixin(AbstractMISP):
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_note = MISPNote()
the_note.from_dict(note=note, language=language,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
object_uuid = kwargs.pop('object_uuid', self.uuid)
object_type = kwargs.pop('object_type', self.analyst_data_object_type)
the_note.from_dict(
note=note, language=language, object_uuid=object_uuid,
object_type=object_type, contained=True, parent=self, **kwargs
)
self.notes.append(the_note)
self.edited = True
return the_note
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPOpinion: # type: ignore[no-untyped-def]
the_opinion = MISPOpinion()
the_opinion.from_dict(opinion=opinion, comment=comment,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
object_uuid = kwargs.pop('object_uuid', self.uuid)
object_type = kwargs.pop('object_type', self.analyst_data_object_type)
the_opinion.from_dict(
opinion=opinion, comment=comment, object_uuid=object_uuid,
object_type=object_type, contained=True, parent=self, **kwargs
)
self.opinions.append(the_opinion)
self.edited = True
return the_opinion
def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPRelationship: # type: ignore[no-untyped-def]
the_relationship = MISPRelationship()
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
relationship_type=relationship_type,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
the_relationship.from_dict(
related_object_type=related_object_type,
related_object_uuid=related_object_uuid,
relationship_type=relationship_type, object_uuid=self.uuid,
object_type=self.analyst_data_object_type, contained=True,
parent=self, **kwargs
)
self.relationships.append(the_relationship)
self.edited = True
return the_relationship
@ -93,12 +102,8 @@ class AnalystDataBehaviorMixin(AbstractMISP):
relationships = kwargs.pop('Relationship', [])
super().from_dict(**kwargs)
for note in notes:
note.pop('object_uuid', None)
note.pop('object_type', None)
self.add_note(**note)
for opinion in opinions:
opinion.pop('object_uuid', None)
opinion.pop('object_type', None)
self.add_opinion(**opinion)
for relationship in relationships:
relationship.pop('object_uuid', None)
@ -2523,6 +2528,10 @@ class MISPAnalystData(AbstractMISP):
'Object', 'Note', 'Opinion', 'Relationship', 'Organisation',
'SharingGroup'}
@property
def analyst_data_object_type(self) -> str:
return self._analyst_data_object_type
@property
def org(self) -> MISPOrganisation:
return self.Org
@ -2538,6 +2547,10 @@ class MISPAnalystData(AbstractMISP):
else:
raise PyMISPError('Orgc must be of type MISPOrganisation.')
@property
def parent(self) -> MISPAttribute | MISPEvent | MISPEventReport | MISPObject:
return self.__parent
def __new__(cls, *args, **kwargs):
if cls is MISPAnalystData:
raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
@ -2552,8 +2565,54 @@ class MISPAnalystData(AbstractMISP):
self.created: float | int | datetime
self.modified: float | int | datetime
self.SharingGroup: MISPSharingGroup
self._analyst_data_object_type: str # Must be defined in the child class
def add_note(self, note: str, language: str | None = None, object_uuid: str | None = None, object_type: str | None = None, parent: MISPEvent | MISPAttribute | MISPObject | MISPEventReport | None = None, **kwargs: dict[str, Any]) -> MISPNote:
misp_note = MISPNote()
if object_uuid is None:
object_uuid = self.uuid
if object_type is None:
object_type = self.analyst_data_object_type
if parent is None and hasattr(self, 'parent'):
parent = self.parent
misp_note.from_dict(
note=note, language=language, object_uuid=object_uuid,
object_type=object_type, parent=parent, contained=True, **kwargs
)
if parent is None:
if not hasattr(self, 'Note'):
self.Note: list[MISPNote] = []
self.Note.append(misp_note)
else:
self.parent.notes.append(misp_note)
self.edited = True
return misp_note
def add_opinion(self, opinion: int, comment: str | None = None, object_uuid: str | None = None, object_type: str | None = None, parent: MISPEvent | MISPAttribute | MISPObject | MISPEventReport | None = None, **kwargs: dict[str, Any]) -> MISPOpinion:
misp_opinion = MISPOpinion()
if object_uuid is None:
object_uuid = self.uuid
if object_type is None:
object_type = self.analyst_data_object_type
if parent is None and hasattr(self, 'parent'):
parent = self.parent
misp_opinion.from_dict(
opinion=opinion, comment=comment, object_uuid=object_uuid,
object_type=object_type, parent=parent, contained=True, **kwargs
)
if parent is None:
if not hasattr(self, 'Opinion'):
self.Opinion: list[MISPOpinion] = []
self.Opinion.append(misp_opinion)
else:
self.parent.opinions.append(misp_opinion)
self.edited = True
return misp_opinion
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
notes = kwargs.pop('Note', [])
opinions = kwargs.pop('Opinion', [])
self.__parent = kwargs.pop('parent', None)
self.distribution = kwargs.pop('distribution', None)
if self.distribution is not None:
self.distribution = int(self.distribution)
@ -2607,6 +2666,11 @@ class MISPAnalystData(AbstractMISP):
super().from_dict(**kwargs)
for note in notes:
self.add_note(**note)
for opinion in opinions:
self.add_opinion(**opinion)
def _set_default(self) -> None:
if not hasattr(self, 'created'):
self.created = datetime.timestamp(datetime.now())
@ -2614,7 +2678,7 @@ class MISPAnalystData(AbstractMISP):
self.modified = self.created
class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
class MISPNote(MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
@ -2625,8 +2689,8 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
self.language: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Note' in kwargs:
def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def]
if not contained and 'Note' in kwargs:
kwargs = kwargs['Note']
self.note = kwargs.pop('note', None)
if self.note is None:
@ -2639,7 +2703,7 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
class MISPOpinion(MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
@ -2650,8 +2714,8 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
self.comment: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Opinion' in kwargs:
def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def]
if not contained and 'Opinion' in kwargs:
kwargs = kwargs['Opinion']
self.opinion = kwargs.pop('opinion', None)
if self.opinion is not None:
@ -2673,7 +2737,7 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
class MISPRelationship(MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
@ -2685,8 +2749,8 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
self.relationship_type: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Relationship' in kwargs:
def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def]
if not contained and 'Relationship' in kwargs:
kwargs = kwargs['Relationship']
self.related_object_type = kwargs.pop('related_object_type', None)
if self.related_object_type is None:

123
tests/test_analyst_data.py Normal file
View File

@ -0,0 +1,123 @@
#!/usr/bin/env python
from __future__ import annotations
import unittest
from pymisp import (MISPAttribute, MISPEvent, MISPEventReport, MISPNote,
MISPObject, MISPOpinion)
from uuid import uuid4
class TestAnalystData(unittest.TestCase):
def setUp(self) -> None:
self.note_dict = {
"uuid": uuid4(),
"note": "note3"
}
self.opinion_dict = {
"uuid": uuid4(),
"opinion": 75,
"comment": "Agree"
}
def test_analyst_data_on_attribute(self) -> None:
attribute = MISPAttribute()
attribute.from_dict(type='filename', value='foo.exe')
self._attach_analyst_data(attribute)
def test_analyst_data_on_attribute_alternative(self) -> None:
event = MISPEvent()
event.info = 'Test on Attribute'
event.add_attribute('domain', 'foo.bar')
self._attach_analyst_data(event.attributes[0])
def test_analyst_data_on_event(self) -> None:
event = MISPEvent()
event.info = 'Test Event'
self._attach_analyst_data(event)
def test_analyst_data_on_event_report(self) -> None:
event_report = MISPEventReport()
event_report.from_dict(name='Test Report', content='This is a report')
self._attach_analyst_data(event_report)
def test_analyst_data_on_event_report_alternative(self) -> None:
event = MISPEvent()
event.info = 'Test on Event Report'
event.add_event_report('Test Report', 'This is a report')
self._attach_analyst_data(event.event_reports[0])
def test_analyst_data_on_object(self) -> None:
misp_object = MISPObject('file')
misp_object.add_attribute('filename', 'foo.exe')
self._attach_analyst_data(misp_object)
def test_analyst_data_on_object_alternative(self) -> None:
event = MISPEvent()
event.info = 'Test on Object'
misp_object = MISPObject('file')
misp_object.add_attribute('filename', 'foo.exe')
event.add_object(misp_object)
self._attach_analyst_data(event.objects[0])
def test_analyst_data_on_object_attribute(self) -> None:
misp_object = MISPObject('file')
object_attribute = misp_object.add_attribute('filename', 'foo.exe')
self._attach_analyst_data(object_attribute)
def test_analyst_data_object_object_attribute_alternative(self) -> None:
misp_object = MISPObject('file')
misp_object.add_attribute('filename', 'foo.exe')
self._attach_analyst_data(misp_object.attributes[0])
def _attach_analyst_data(
self, container: MISPAttribute | MISPEvent | MISPEventReport | MISPObject) -> None:
object_type = container._analyst_data_object_type
note1 = container.add_note(note='note1')
opinion1 = note1.add_opinion(opinion=25, comment='Disagree')
opinion2 = container.add_opinion(opinion=50, comment='Neutral')
note2 = opinion2.add_note(note='note2')
dict_note = MISPNote()
dict_note.from_dict(
object_type=object_type, object_uuid=container.uuid, **self.note_dict
)
note3 = container.add_note(**dict_note)
dict_opinion = MISPOpinion()
dict_opinion.from_dict(
object_type='Note', object_uuid=note3.uuid, **self.opinion_dict
)
container.add_opinion(**dict_opinion)
self.assertEqual(len(container.notes), 3)
self.assertEqual(len(container.opinions), 3)
misp_note1, misp_note2, misp_note3 = container.notes
misp_opinion1, misp_opinion2, misp_opinion3 = container.opinions
self.assertEqual(misp_note1.object_type, object_type)
self.assertEqual(misp_note1.object_uuid, container.uuid)
self.assertEqual(misp_note1.note, 'note1')
self.assertEqual(misp_note2.object_type, 'Opinion')
self.assertEqual(misp_note2.object_uuid, opinion2.uuid)
self.assertEqual(misp_note2.note, 'note2')
self.assertEqual(misp_note3.object_type, object_type)
self.assertEqual(misp_note3.object_uuid, container.uuid)
self.assertEqual(misp_note3.note, 'note3')
self.assertEqual(misp_opinion1.object_type, 'Note')
self.assertEqual(misp_opinion1.object_uuid, note1.uuid)
self.assertEqual(misp_opinion1.opinion, 25)
self.assertEqual(misp_opinion1.comment, 'Disagree')
self.assertEqual(misp_opinion2.object_type, object_type)
self.assertEqual(misp_opinion2.object_uuid, container.uuid)
self.assertEqual(misp_opinion2.opinion, 50)
self.assertEqual(misp_opinion2.comment, 'Neutral')
self.assertEqual(misp_opinion3.object_type, 'Note')
self.assertEqual(misp_opinion3.object_uuid, note3.uuid)
self.assertEqual(misp_opinion3.opinion, 75)
self.assertEqual(misp_opinion3.comment, 'Agree')