Merge branch 'mokaddem-feature/analyst-data' into wip_analystdata

wip_analystdata
Raphaël Vinot 2024-04-29 13:50:53 +02:00
commit f19c31de42
3 changed files with 279 additions and 8 deletions

View File

@ -39,7 +39,7 @@ try:
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster, MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation) MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship)
from .api import PyMISP, register_user # noqa from .api import PyMISP, register_user # noqa
# NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future # NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future
from .tools import AbstractMISPObjectGenerator # noqa from .tools import AbstractMISPObjectGenerator # noqa
@ -76,7 +76,8 @@ __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag',
'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist', 'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist',
'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion', 'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion',
'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement', 'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement',
'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError', 'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship',
'PyMISPError', 'NewEventError', 'NewAttributeError',
'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat', 'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat',
'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP' 'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP'
] ]

View File

@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError):
pass pass
class NewAnalystDataError(PyMISPError):
pass
class NewNoteError(PyMISPError):
pass
class NewOpinionError(PyMISPError):
pass
class NewRelationshipError(PyMISPError):
pass
class UpdateAttributeError(PyMISPError): class UpdateAttributeError(PyMISPError):
pass pass

View File

@ -23,13 +23,65 @@ except ImportError:
import json import json
from .abstract import AbstractMISP, MISPTag from .abstract import AbstractMISP, MISPTag
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
PyMISPError, NewEventError, NewAttributeError, NewEventReportError, PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
NewGalaxyClusterError, NewGalaxyClusterRelationError) NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError)
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
class AnalystDataBehaviorMixin(AbstractMISP):
# NOTE: edited here must be the property of Abstract MISP
def __init__(self) -> None:
super().__init__()
self.uuid: str # Created in the child class
self.classObjectType: str # Must be defined in the child class
self.Note: list[MISPNote] = []
self.Opinion: list[MISPOpinion] = []
self.Relationship: list[MISPRelationship] = []
@property
def notes(self) -> list[MISPNote]:
return self.Note
@property
def opinions(self) -> list[MISPOpinion]:
return self.Opinion
@property
def relationships(self) -> list[MISPRelationship]:
return self.Relationship
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_note = MISPNote()
the_note.from_dict(note=note, language=language,
object_uuid=self.uuid, object_type=self.classObjectType,
**kwargs)
self.notes.append(the_note)
self.edited = True
return the_note
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_opinion = MISPOpinion()
the_opinion.from_dict(opinion=opinion, comment=comment,
object_uuid=self.uuid, object_type=self.classObjectType,
**kwargs)
self.opinions.append(the_opinion)
self.edited = True
return the_opinion
def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_relationship = MISPRelationship()
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
relationship_type=relationship_type,
object_uuid=self.uuid, object_type=self.classObjectType,
**kwargs)
self.relationships.append(the_relationship)
self.edited = True
return the_relationship
try: try:
from dateutil.parser import parse from dateutil.parser import parse
except ImportError: except ImportError:
@ -226,11 +278,13 @@ class MISPSighting(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)' return f'<{self.__class__.__name__}(NotInitialized)'
class MISPAttribute(AbstractMISP): class MISPAttribute(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'value', 'category', 'type', 'comment', 'data', _fields_for_feed: set[str] = {'uuid', 'value', 'category', 'type', 'comment', 'data',
'deleted', 'timestamp', 'to_ids', 'disable_correlation', 'deleted', 'timestamp', 'to_ids', 'disable_correlation',
'first_seen', 'last_seen'} 'first_seen', 'last_seen'}
classObjectType = 'Attribute'
def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False): def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False):
"""Represents an Attribute """Represents an Attribute
@ -666,12 +720,14 @@ class MISPObjectReference(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)' return f'<{self.__class__.__name__}(NotInitialized)'
class MISPObject(AbstractMISP): class MISPObject(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'name', 'meta-category', 'description', 'template_uuid', _fields_for_feed: set[str] = {'name', 'meta-category', 'description', 'template_uuid',
'template_version', 'uuid', 'timestamp', 'comment', 'template_version', 'uuid', 'timestamp', 'comment',
'first_seen', 'last_seen', 'deleted'} 'first_seen', 'last_seen', 'deleted'}
classObjectType = 'Object'
def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def] def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def]
default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None: default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None:
''' Master class representing a generic MISP object ''' Master class representing a generic MISP object
@ -1063,9 +1119,10 @@ class MISPObject(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)' return f'<{self.__class__.__name__}(NotInitialized)'
class MISPEventReport(AbstractMISP): class MISPEventReport(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'} _fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
classObjectType = 'EventReport'
timestamp: float | int | datetime timestamp: float | int | datetime
@ -1451,11 +1508,13 @@ class MISPGalaxy(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)' return f'<{self.__class__.__name__}(NotInitialized)'
class MISPEvent(AbstractMISP): class MISPEvent(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', _fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
'publish_timestamp', 'published', 'date', 'extends_uuid'} 'publish_timestamp', 'published', 'date', 'extends_uuid'}
classObjectType = 'Event'
def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def] def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(**kwargs) super().__init__(**kwargs)
self.__schema_file = 'schema.json' if strict_validation else 'schema-lax.json' self.__schema_file = 'schema.json' if strict_validation else 'schema-lax.json'
@ -2318,3 +2377,199 @@ class MISPDecayingModel(AbstractMISP):
def __repr__(self) -> str: def __repr__(self) -> str:
return f'<{self.__class__.__name__}(uuid={self.uuid})>' return f'<{self.__class__.__name__}(uuid={self.uuid})>'
class MISPAnalystData(AbstractMISP):
_fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors',
'created', 'distribution', 'sharing_group_id', }
valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy',
'Object', 'Note', 'Opinion', 'Relationship', 'Organisation',
'SharingGroup'}
@property
def org(self) -> MISPOrganisation:
return self.Org
@property
def orgc(self) -> MISPOrganisation:
return self.Orgc
@orgc.setter
def orgc(self, orgc: MISPOrganisation) -> None:
if isinstance(orgc, MISPOrganisation):
self.Orgc = orgc
else:
raise PyMISPError('Orgc must be of type MISPOrganisation.')
def __new__(cls, *args, **kwargs):
if cls is MISPAnalystData:
raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __init__(self, **kwargs: dict[str, Any]) -> None:
super().__init__(**kwargs)
self.uuid = str(uuid.uuid4())
self.object_uuid: str
self.object_type: str
self.authors: str
self.created: float | int | datetime
self.modified: float | int | datetime
self.SharingGroup: MISPSharingGroup
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Note' in kwargs:
kwargs = kwargs['Note']
self.distribution = kwargs.pop('distribution', None)
if self.distribution is not None:
self.distribution = int(self.distribution)
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).')
self.object_uuid = kwargs.pop('object_uuid', None)
if self.object_uuid is None:
raise NewAnalystDataError('The UUID for which this note is attached is required.')
self.object_type = kwargs.pop('object_type', None)
if self.object_type is None:
raise NewAnalystDataError('The element type for which this note is attached is required.')
if self.object_type not in self.valid_object_type:
raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.')
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('created'):
ts = kwargs.pop('created')
if isinstance(ts, datetime):
self.created = ts
else:
self.created = datetime.fromtimestamp(int(ts), timezone.utc)
if kwargs.get('modified'):
ts = kwargs.pop('modified')
if isinstance(ts, datetime):
self.modified = ts
else:
self.modified = datetime.fromtimestamp(int(ts), timezone.utc)
if kwargs.get('Org'):
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if kwargs.get('Orgc'):
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def _set_default(self) -> None:
if not hasattr(self, 'created'):
self.created = datetime.timestamp(datetime.now())
if not hasattr(self, 'modified'):
self.modified = self.created
class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
classObjectType = 'Note'
def __init__(self, **kwargs: dict[str, Any]) -> None:
self.note: str
self.language: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
self.note = kwargs.pop('note', None)
if self.note is None:
raise NewNoteError('The text note of the note is required.')
super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'note'):
return '<{self.__class__.__name__}(note={self.note})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
classObjectType = 'Opinion'
def __init__(self, **kwargs: dict[str, Any]) -> None:
self.opinion: int
self.comment: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
self.opinion = kwargs.pop('opinion', None)
if self.opinion is not None:
self.opinion = int(self.opinion)
if not (0 <= self.opinion <= 100):
raise NewOpinionError('The opinion value must be between 0 and 100 included.')
else:
raise NewOpinionError('The opinion value is required.')
self.comment = kwargs.pop('comment', None)
if self.comment is None:
raise NewOpinionError('The text comment is required.')
return super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'opinion'):
return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
classObjectType = 'Relationship'
def __init__(self, **kwargs: dict[str, Any]) -> None:
self.related_object_uuid: str
self.related_object_type: str
self.relationship_type: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
self.related_object_type = kwargs.pop('related_object_type', None)
if self.related_object_type is None:
raise NewRelationshipError('The target object type for this relationship is required.')
self.related_object_uuid = kwargs.pop('related_object_uuid', None)
if self.related_object_uuid is None:
if not isinstance(self.related_object_type, AbstractMISP):
raise NewRelationshipError('The target UUID for this relationship is required.')
else:
self.related_object_uuid = self.related_object_type.uuid
self.related_object_type = self.related_object_type.classObjectType
if self.related_object_type not in self.valid_object_type:
raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')
return super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'):
return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'