diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 73f1be8..ca0868d 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -39,7 +39,7 @@ try: MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster, - MISPGalaxyClusterElement, MISPGalaxyClusterRelation) + MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship) from .api import PyMISP, register_user # noqa # NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future from .tools import AbstractMISPObjectGenerator # noqa @@ -76,7 +76,8 @@ __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag', 'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist', 'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion', 'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement', - 'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError', + 'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship', + 'PyMISPError', 'NewEventError', 'NewAttributeError', 'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat', 'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP' ] diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index a0dd736..071d350 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError): pass +class NewAnalystDataError(PyMISPError): + pass + + +class NewNoteError(PyMISPError): + pass + + +class NewOpinionError(PyMISPError): + pass + +class NewRelationshipError(PyMISPError): + pass + + class UpdateAttributeError(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 9bab316..3574e87 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -23,13 +23,62 @@ except ImportError: import json from .abstract import AbstractMISP, MISPTag -from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, +from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, - NewGalaxyClusterError, NewGalaxyClusterRelationError) + NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError) logger = logging.getLogger('pymisp') +class AnalystDataBehaviorMixin: + + def __init__(self, **kwargs) -> None: + super().__init__() + self.uuid = str(uuid.uuid4()) + self.Note: list[MISPNote] = [] + self.Opinion: list[MISPOpinion] = [] + self.Relationship: list[MISPRelationship] = [] + + @property + def notes(self) -> list[MISPNote]: + return self.Note + + @property + def opinions(self) -> list[MISPOpinion]: + return self.Opinion + + @property + def relationships(self) -> list[MISPRelationship]: + return self.Relationship + + def add_analyst_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_note = MISPNote() + the_note.from_dict(note=note, language=language, + object_uuid=self.uuid, object_type=self.classObjectType, + **kwargs) + self.notes.append(the_note) + self.edited = True + return the_note + + def add_analyst_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_opinion = MISPOpinion() + the_opinion.from_dict(opinion=opinion, comment=comment, + object_uuid=self.uuid, object_type=self.classObjectType, + **kwargs) + self.opinions.append(the_opinion) + self.edited = True + return the_opinion + + def add_analyst_relationship(self, related_object_type: str, related_object_uuid: str, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_relationship = MISPRelationship() + the_relationship.from_dict(related_object_uuid=related_object_uuid, related_object_type=related_object_type, + relationship_type=relationship_type, + object_uuid=self.uuid, object_type=self.classObjectType, + **kwargs) + self.relationships.append(the_relationship) + self.edited = True + return the_relationship + try: from dateutil.parser import parse except ImportError: @@ -1063,12 +1112,32 @@ class MISPObject(AbstractMISP): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPEventReport(AbstractMISP): +class MISPEventReport(AbstractMISP, AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'} + classObjectType = 'EventReport' timestamp: float | int | datetime + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + # self.uuid = str(uuid.uuid4()) + # self.Note: list[MISPNote] = [] + # self.Opinion: list[MISPOpinion] = [] + # self.Relationship: list[MISPRelationship] = [] + + # @property + # def notes(self) -> list[MISPNote]: + # return self.Note + + # @property + # def opinions(self) -> list[MISPOpinion]: + # return self.Opinion + + # @property + # def relationships(self) -> list[MISPRelationship]: + # return self.Relationship + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] if 'EventReport' in kwargs: kwargs = kwargs['EventReport'] @@ -1113,6 +1182,34 @@ class MISPEventReport(AbstractMISP): super().from_dict(**kwargs) + # def add_analyst_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + # the_note = MISPNote() + # the_note.from_dict(note=note, language=language, + # object_uuid=self.uuid, object_type='EventReport', + # **kwargs) + # self.notes.append(the_note) + # self.edited = True + # return the_note + + # def add_analyst_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + # the_opinion = MISPOpinion() + # the_opinion.from_dict(opinion=opinion, comment=comment, + # object_uuid=self.uuid, object_type='EventReport', + # **kwargs) + # self.opinions.append(the_opinion) + # self.edited = True + # return the_opinion + + # def add_analyst_relationship(self, related_object_type: str, related_object_uuid: str, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + # the_relationship = MISPRelationship() + # the_relationship.from_dict(related_object_uuid=related_object_uuid, related_object_type=related_object_type, + # relationship_type=relationship_type, + # object_uuid=self.uuid, object_type='EventReport', + # **kwargs) + # self.relationships.append(the_relationship) + # self.edited = True + # return the_relationship + def __repr__(self) -> str: if hasattr(self, 'name'): return '<{self.__class__.__name__}(name={self.name})'.format(self=self) @@ -2318,3 +2415,237 @@ class MISPDecayingModel(AbstractMISP): def __repr__(self) -> str: return f'<{self.__class__.__name__}(uuid={self.uuid})>' + + +class MISPAnalystData(AbstractMISP): + + _fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors', + 'created', 'distribution', 'sharing_group_id', } + + valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy', + 'Object', 'Note', 'Opinion', 'Relationship', 'Organisation', + 'SharingGroup'} + + @property + def org(self) -> MISPOrganisation: + return self.Org + + @property + def orgc(self) -> MISPOrganisation: + return self.Orgc + + @orgc.setter + def orgc(self, orgc: MISPOrganisation) -> None: + if isinstance(orgc, MISPOrganisation): + self.Orgc = orgc + else: + raise PyMISPError('Orgc must be of type MISPOrganisation.') + + def __new__(cls, *args, **kwargs): + if cls is MISPAnalystData: + raise TypeError(f"only children of '{cls.__name__}' may be instantiated") + return object.__new__(cls) + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.uuid = str(uuid.uuid4()) + self.object_uuid: str + self.object_type: str + self.authors: str + self.created: float | int | datetime + self.modified: float | int | datetime + self.SharingGroup: MISPSharingGroup + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Note' in kwargs: + kwargs = kwargs['Note'] + + self.distribution = kwargs.pop('distribution', None) + if self.distribution is not None: + self.distribution = int(self.distribution) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).') + + self.object_uuid = kwargs.pop('object_uuid', None) + if self.object_uuid is None: + raise NewAnalystDataError('The UUID for which this note is attached is required.') + self.object_type = kwargs.pop('object_type', None) + if self.object_type is None: + raise NewAnalystDataError('The element type for which this note is attached is required.') + if self.object_type not in self.valid_object_type: + raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.') + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('created'): + ts = kwargs.pop('created') + if isinstance(ts, datetime): + self.created = ts + else: + self.created = datetime.fromtimestamp(int(ts), timezone.utc) + if kwargs.get('modified'): + ts = kwargs.pop('modified') + if isinstance(ts, datetime): + self.modified = ts + else: + self.modified = datetime.fromtimestamp(int(ts), timezone.utc) + + if kwargs.get('Org'): + self.Org = MISPOrganisation() + self.Org.from_dict(**kwargs.pop('Org')) + if kwargs.get('Orgc'): + self.Orgc = MISPOrganisation() + self.Orgc.from_dict(**kwargs.pop('Orgc')) + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + + super().from_dict(**kwargs) + + def _set_default(self) -> None: + if not hasattr(self, 'created'): + self.created = datetime.timestamp(datetime.now()) + if not hasattr(self, 'modified'): + self.modified = self.created + + +class MISPNote(MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'}) + + def __init__(self, **kwargs) -> None: + self.note: str + self.language: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: + self.note = kwargs.pop('note', None) + if self.note is None: + raise NewNoteError('The text note of the note is required.') + + super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'note'): + return '<{self.__class__.__name__}(note={self.note})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +class MISPOpinion(MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'}) + + def __init__(self, **kwargs) -> None: + self.opinion: int + self.comment: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: + self.opinion = kwargs.pop('opinion', None) + if self.opinion is not None: + self.opinion = int(self.opinion) + if not (0 <= self.opinion <= 100): + raise NewOpinionError('The opinion value must be between 0 and 100 included.') + else: + raise NewOpinionError('The opinion value is required.') + + self.comment = kwargs.pop('comment', None) + if self.comment is None: + raise NewOpinionError('The text comment is required.') + + return super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'opinion'): + return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +class MISPRelationship(MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'}) + + def __init__(self, **kwargs) -> None: + self.related_object_uuid: str + self.related_object_type: str + self.relationship_type: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: + self.related_object_uuid = kwargs.pop('related_object_uuid', None) + if self.related_object_uuid is None: + raise NewRelationshipError('The target UUID for this relationship is required.') + + self.related_object_type = kwargs.pop('related_object_type', None) + if self.related_object_type is None: + raise NewRelationshipError('The target object type for this relationship is required.') + if self.related_object_type not in self.valid_object_type: + raise NewAnalystDataError('The target object type is not a valid type. Actual: {self.related_object_type}.'.format(self=self)) + + return super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'): + return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +# class AnalystDataBehavior(): + +# def __init__(self, **kwargs) -> None: +# super().__init__(**kwargs) +# self.uuid = str(uuid.uuid4()) +# self.Note: list[MISPNote] = [] +# self.Opinion: list[MISPOpinion] = [] +# self.Relationship: list[MISPRelationship] = [] + +# @property +# def notes(self) -> list[MISPNote]: +# return self.Note + +# @property +# def opinions(self) -> list[MISPOpinion]: +# return self.Opinion + +# @property +# def relationships(self) -> list[MISPRelationship]: +# return self.Relationship + +# def add_analyst_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] +# the_note = MISPNote() +# the_note.from_dict(note=note, language=language, +# object_uuid=self.uuid, object_type=self.classObjectType, +# **kwargs) +# self.notes.append(the_note) +# self.edited = True +# return the_note + +# def add_analyst_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] +# the_opinion = MISPOpinion() +# the_opinion.from_dict(opinion=opinion, comment=comment, +# object_uuid=self.uuid, object_type=self.classObjectType, +# **kwargs) +# self.opinions.append(the_opinion) +# self.edited = True +# return the_opinion + +# def add_analyst_relationship(self, related_object_type: str, related_object_uuid: str, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] +# the_relationship = MISPRelationship() +# the_relationship.from_dict(related_object_uuid=related_object_uuid, related_object_type=related_object_type, +# relationship_type=relationship_type, +# object_uuid=self.uuid, object_type=self.classObjectType, +# **kwargs) +# self.relationships.append(the_relationship) +# self.edited = True +# return the_relationship \ No newline at end of file