diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 73f1be8..ca0868d 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -39,7 +39,7 @@ try: MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster, - MISPGalaxyClusterElement, MISPGalaxyClusterRelation) + MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship) from .api import PyMISP, register_user # noqa # NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future from .tools import AbstractMISPObjectGenerator # noqa @@ -76,7 +76,8 @@ __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag', 'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist', 'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion', 'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement', - 'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError', + 'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship', + 'PyMISPError', 'NewEventError', 'NewAttributeError', 'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat', 'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP' ] diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index a0dd736..071d350 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError): pass +class NewAnalystDataError(PyMISPError): + pass + + +class NewNoteError(PyMISPError): + pass + + +class NewOpinionError(PyMISPError): + pass + +class NewRelationshipError(PyMISPError): + pass + + class UpdateAttributeError(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 9bab316..34dec2c 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -23,13 +23,65 @@ except ImportError: import json from .abstract import AbstractMISP, MISPTag -from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, +from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, - NewGalaxyClusterError, NewGalaxyClusterRelationError) + NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError) logger = logging.getLogger('pymisp') +class AnalystDataBehaviorMixin(AbstractMISP): + + # NOTE: edited here must be the property of Abstract MISP + + def __init__(self) -> None: + super().__init__() + self.uuid: str # Created in the child class + self.classObjectType: str # Must be defined in the child class + self.Note: list[MISPNote] = [] + self.Opinion: list[MISPOpinion] = [] + self.Relationship: list[MISPRelationship] = [] + + @property + def notes(self) -> list[MISPNote]: + return self.Note + + @property + def opinions(self) -> list[MISPOpinion]: + return self.Opinion + + @property + def relationships(self) -> list[MISPRelationship]: + return self.Relationship + + def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_note = MISPNote() + the_note.from_dict(note=note, language=language, + object_uuid=self.uuid, object_type=self.classObjectType, + **kwargs) + self.notes.append(the_note) + self.edited = True + return the_note + + def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_opinion = MISPOpinion() + the_opinion.from_dict(opinion=opinion, comment=comment, + object_uuid=self.uuid, object_type=self.classObjectType, + **kwargs) + self.opinions.append(the_opinion) + self.edited = True + return the_opinion + + def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_relationship = MISPRelationship() + the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid, + relationship_type=relationship_type, + object_uuid=self.uuid, object_type=self.classObjectType, + **kwargs) + self.relationships.append(the_relationship) + self.edited = True + return the_relationship + try: from dateutil.parser import parse except ImportError: @@ -226,11 +278,13 @@ class MISPSighting(AbstractMISP): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPAttribute(AbstractMISP): +class MISPAttribute(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'value', 'category', 'type', 'comment', 'data', 'deleted', 'timestamp', 'to_ids', 'disable_correlation', 'first_seen', 'last_seen'} + classObjectType = 'Attribute' + def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False): """Represents an Attribute @@ -666,12 +720,14 @@ class MISPObjectReference(AbstractMISP): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPObject(AbstractMISP): +class MISPObject(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'name', 'meta-category', 'description', 'template_uuid', 'template_version', 'uuid', 'timestamp', 'comment', 'first_seen', 'last_seen', 'deleted'} + classObjectType = 'Object' + def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def] default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None: ''' Master class representing a generic MISP object @@ -1063,9 +1119,10 @@ class MISPObject(AbstractMISP): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPEventReport(AbstractMISP): +class MISPEventReport(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'} + classObjectType = 'EventReport' timestamp: float | int | datetime @@ -1451,11 +1508,13 @@ class MISPGalaxy(AbstractMISP): return f'<{self.__class__.__name__}(NotInitialized)' -class MISPEvent(AbstractMISP): +class MISPEvent(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', 'publish_timestamp', 'published', 'date', 'extends_uuid'} + classObjectType = 'Event' + def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) self.__schema_file = 'schema.json' if strict_validation else 'schema-lax.json' @@ -2318,3 +2377,199 @@ class MISPDecayingModel(AbstractMISP): def __repr__(self) -> str: return f'<{self.__class__.__name__}(uuid={self.uuid})>' + + +class MISPAnalystData(AbstractMISP): + + _fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors', + 'created', 'distribution', 'sharing_group_id', } + + valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy', + 'Object', 'Note', 'Opinion', 'Relationship', 'Organisation', + 'SharingGroup'} + + @property + def org(self) -> MISPOrganisation: + return self.Org + + @property + def orgc(self) -> MISPOrganisation: + return self.Orgc + + @orgc.setter + def orgc(self, orgc: MISPOrganisation) -> None: + if isinstance(orgc, MISPOrganisation): + self.Orgc = orgc + else: + raise PyMISPError('Orgc must be of type MISPOrganisation.') + + def __new__(cls, *args, **kwargs): + if cls is MISPAnalystData: + raise TypeError(f"only children of '{cls.__name__}' may be instantiated") + return object.__new__(cls) + + def __init__(self, **kwargs: dict[str, Any]) -> None: + super().__init__(**kwargs) + self.uuid = str(uuid.uuid4()) + self.object_uuid: str + self.object_type: str + self.authors: str + self.created: float | int | datetime + self.modified: float | int | datetime + self.SharingGroup: MISPSharingGroup + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Note' in kwargs: + kwargs = kwargs['Note'] + + self.distribution = kwargs.pop('distribution', None) + if self.distribution is not None: + self.distribution = int(self.distribution) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).') + + self.object_uuid = kwargs.pop('object_uuid', None) + if self.object_uuid is None: + raise NewAnalystDataError('The UUID for which this note is attached is required.') + self.object_type = kwargs.pop('object_type', None) + if self.object_type is None: + raise NewAnalystDataError('The element type for which this note is attached is required.') + if self.object_type not in self.valid_object_type: + raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.') + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('created'): + ts = kwargs.pop('created') + if isinstance(ts, datetime): + self.created = ts + else: + self.created = datetime.fromtimestamp(int(ts), timezone.utc) + if kwargs.get('modified'): + ts = kwargs.pop('modified') + if isinstance(ts, datetime): + self.modified = ts + else: + self.modified = datetime.fromtimestamp(int(ts), timezone.utc) + + if kwargs.get('Org'): + self.Org = MISPOrganisation() + self.Org.from_dict(**kwargs.pop('Org')) + if kwargs.get('Orgc'): + self.Orgc = MISPOrganisation() + self.Orgc.from_dict(**kwargs.pop('Orgc')) + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + + super().from_dict(**kwargs) + + def _set_default(self) -> None: + if not hasattr(self, 'created'): + self.created = datetime.timestamp(datetime.now()) + if not hasattr(self, 'modified'): + self.modified = self.created + + +class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'}) + + classObjectType = 'Note' + + def __init__(self, **kwargs: dict[str, Any]) -> None: + self.note: str + self.language: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + self.note = kwargs.pop('note', None) + if self.note is None: + raise NewNoteError('The text note of the note is required.') + + super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'note'): + return '<{self.__class__.__name__}(note={self.note})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'}) + + classObjectType = 'Opinion' + + def __init__(self, **kwargs: dict[str, Any]) -> None: + self.opinion: int + self.comment: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + self.opinion = kwargs.pop('opinion', None) + if self.opinion is not None: + self.opinion = int(self.opinion) + if not (0 <= self.opinion <= 100): + raise NewOpinionError('The opinion value must be between 0 and 100 included.') + else: + raise NewOpinionError('The opinion value is required.') + + self.comment = kwargs.pop('comment', None) + if self.comment is None: + raise NewOpinionError('The text comment is required.') + + return super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'opinion'): + return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'}) + + classObjectType = 'Relationship' + + def __init__(self, **kwargs: dict[str, Any]) -> None: + self.related_object_uuid: str + self.related_object_type: str + self.relationship_type: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + + self.related_object_type = kwargs.pop('related_object_type', None) + if self.related_object_type is None: + raise NewRelationshipError('The target object type for this relationship is required.') + + self.related_object_uuid = kwargs.pop('related_object_uuid', None) + if self.related_object_uuid is None: + if not isinstance(self.related_object_type, AbstractMISP): + raise NewRelationshipError('The target UUID for this relationship is required.') + else: + self.related_object_uuid = self.related_object_type.uuid + self.related_object_type = self.related_object_type.classObjectType + + if self.related_object_type not in self.valid_object_type: + raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.') + + return super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'): + return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)'