new: [analyst-data] Added initial support of analyst data concept and functions - WiP

mokaddem-feature/analyst-data
Sami Mokaddem 2024-04-23 09:24:44 +02:00
parent 8b4f98ac4c
commit 0a0f0f2059
No known key found for this signature in database
GPG Key ID: 164C473F627A06FA
3 changed files with 352 additions and 5 deletions

View File

@ -39,7 +39,7 @@ try:
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster, MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation) MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship)
from .api import PyMISP, register_user # noqa from .api import PyMISP, register_user # noqa
# NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future # NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future
from .tools import AbstractMISPObjectGenerator # noqa from .tools import AbstractMISPObjectGenerator # noqa
@ -76,7 +76,8 @@ __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag',
'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist', 'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist',
'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion', 'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion',
'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement', 'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement',
'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError', 'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship',
'PyMISPError', 'NewEventError', 'NewAttributeError',
'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat', 'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat',
'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP' 'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP'
] ]

View File

@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError):
pass pass
class NewAnalystDataError(PyMISPError):
pass
class NewNoteError(PyMISPError):
pass
class NewOpinionError(PyMISPError):
pass
class NewRelationshipError(PyMISPError):
pass
class UpdateAttributeError(PyMISPError): class UpdateAttributeError(PyMISPError):
pass pass

View File

@ -23,13 +23,62 @@ except ImportError:
import json import json
from .abstract import AbstractMISP, MISPTag from .abstract import AbstractMISP, MISPTag
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
PyMISPError, NewEventError, NewAttributeError, NewEventReportError, PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
NewGalaxyClusterError, NewGalaxyClusterRelationError) NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError)
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
class AnalystDataBehaviorMixin:
def __init__(self, **kwargs) -> None:
super().__init__()
self.uuid = str(uuid.uuid4())
self.Note: list[MISPNote] = []
self.Opinion: list[MISPOpinion] = []
self.Relationship: list[MISPRelationship] = []
@property
def notes(self) -> list[MISPNote]:
return self.Note
@property
def opinions(self) -> list[MISPOpinion]:
return self.Opinion
@property
def relationships(self) -> list[MISPRelationship]:
return self.Relationship
def add_analyst_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_note = MISPNote()
the_note.from_dict(note=note, language=language,
object_uuid=self.uuid, object_type=self.classObjectType,
**kwargs)
self.notes.append(the_note)
self.edited = True
return the_note
def add_analyst_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_opinion = MISPOpinion()
the_opinion.from_dict(opinion=opinion, comment=comment,
object_uuid=self.uuid, object_type=self.classObjectType,
**kwargs)
self.opinions.append(the_opinion)
self.edited = True
return the_opinion
def add_analyst_relationship(self, related_object_type: str, related_object_uuid: str, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_relationship = MISPRelationship()
the_relationship.from_dict(related_object_uuid=related_object_uuid, related_object_type=related_object_type,
relationship_type=relationship_type,
object_uuid=self.uuid, object_type=self.classObjectType,
**kwargs)
self.relationships.append(the_relationship)
self.edited = True
return the_relationship
try: try:
from dateutil.parser import parse from dateutil.parser import parse
except ImportError: except ImportError:
@ -1063,12 +1112,32 @@ class MISPObject(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)' return f'<{self.__class__.__name__}(NotInitialized)'
class MISPEventReport(AbstractMISP): class MISPEventReport(AbstractMISP, AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'} _fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
classObjectType = 'EventReport'
timestamp: float | int | datetime timestamp: float | int | datetime
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
# self.uuid = str(uuid.uuid4())
# self.Note: list[MISPNote] = []
# self.Opinion: list[MISPOpinion] = []
# self.Relationship: list[MISPRelationship] = []
# @property
# def notes(self) -> list[MISPNote]:
# return self.Note
# @property
# def opinions(self) -> list[MISPOpinion]:
# return self.Opinion
# @property
# def relationships(self) -> list[MISPRelationship]:
# return self.Relationship
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'EventReport' in kwargs: if 'EventReport' in kwargs:
kwargs = kwargs['EventReport'] kwargs = kwargs['EventReport']
@ -1113,6 +1182,34 @@ class MISPEventReport(AbstractMISP):
super().from_dict(**kwargs) super().from_dict(**kwargs)
# def add_analyst_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
# the_note = MISPNote()
# the_note.from_dict(note=note, language=language,
# object_uuid=self.uuid, object_type='EventReport',
# **kwargs)
# self.notes.append(the_note)
# self.edited = True
# return the_note
# def add_analyst_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
# the_opinion = MISPOpinion()
# the_opinion.from_dict(opinion=opinion, comment=comment,
# object_uuid=self.uuid, object_type='EventReport',
# **kwargs)
# self.opinions.append(the_opinion)
# self.edited = True
# return the_opinion
# def add_analyst_relationship(self, related_object_type: str, related_object_uuid: str, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
# the_relationship = MISPRelationship()
# the_relationship.from_dict(related_object_uuid=related_object_uuid, related_object_type=related_object_type,
# relationship_type=relationship_type,
# object_uuid=self.uuid, object_type='EventReport',
# **kwargs)
# self.relationships.append(the_relationship)
# self.edited = True
# return the_relationship
def __repr__(self) -> str: def __repr__(self) -> str:
if hasattr(self, 'name'): if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self) return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
@ -2318,3 +2415,237 @@ class MISPDecayingModel(AbstractMISP):
def __repr__(self) -> str: def __repr__(self) -> str:
return f'<{self.__class__.__name__}(uuid={self.uuid})>' return f'<{self.__class__.__name__}(uuid={self.uuid})>'
class MISPAnalystData(AbstractMISP):
_fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors',
'created', 'distribution', 'sharing_group_id', }
valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy',
'Object', 'Note', 'Opinion', 'Relationship', 'Organisation',
'SharingGroup'}
@property
def org(self) -> MISPOrganisation:
return self.Org
@property
def orgc(self) -> MISPOrganisation:
return self.Orgc
@orgc.setter
def orgc(self, orgc: MISPOrganisation) -> None:
if isinstance(orgc, MISPOrganisation):
self.Orgc = orgc
else:
raise PyMISPError('Orgc must be of type MISPOrganisation.')
def __new__(cls, *args, **kwargs):
if cls is MISPAnalystData:
raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.uuid = str(uuid.uuid4())
self.object_uuid: str
self.object_type: str
self.authors: str
self.created: float | int | datetime
self.modified: float | int | datetime
self.SharingGroup: MISPSharingGroup
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Note' in kwargs:
kwargs = kwargs['Note']
self.distribution = kwargs.pop('distribution', None)
if self.distribution is not None:
self.distribution = int(self.distribution)
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).')
self.object_uuid = kwargs.pop('object_uuid', None)
if self.object_uuid is None:
raise NewAnalystDataError('The UUID for which this note is attached is required.')
self.object_type = kwargs.pop('object_type', None)
if self.object_type is None:
raise NewAnalystDataError('The element type for which this note is attached is required.')
if self.object_type not in self.valid_object_type:
raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.')
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('created'):
ts = kwargs.pop('created')
if isinstance(ts, datetime):
self.created = ts
else:
self.created = datetime.fromtimestamp(int(ts), timezone.utc)
if kwargs.get('modified'):
ts = kwargs.pop('modified')
if isinstance(ts, datetime):
self.modified = ts
else:
self.modified = datetime.fromtimestamp(int(ts), timezone.utc)
if kwargs.get('Org'):
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if kwargs.get('Orgc'):
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def _set_default(self) -> None:
if not hasattr(self, 'created'):
self.created = datetime.timestamp(datetime.now())
if not hasattr(self, 'modified'):
self.modified = self.created
class MISPNote(MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
def __init__(self, **kwargs) -> None:
self.note: str
self.language: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None:
self.note = kwargs.pop('note', None)
if self.note is None:
raise NewNoteError('The text note of the note is required.')
super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'note'):
return '<{self.__class__.__name__}(note={self.note})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPOpinion(MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
def __init__(self, **kwargs) -> None:
self.opinion: int
self.comment: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None:
self.opinion = kwargs.pop('opinion', None)
if self.opinion is not None:
self.opinion = int(self.opinion)
if not (0 <= self.opinion <= 100):
raise NewOpinionError('The opinion value must be between 0 and 100 included.')
else:
raise NewOpinionError('The opinion value is required.')
self.comment = kwargs.pop('comment', None)
if self.comment is None:
raise NewOpinionError('The text comment is required.')
return super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'opinion'):
return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPRelationship(MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
def __init__(self, **kwargs) -> None:
self.related_object_uuid: str
self.related_object_type: str
self.relationship_type: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None:
self.related_object_uuid = kwargs.pop('related_object_uuid', None)
if self.related_object_uuid is None:
raise NewRelationshipError('The target UUID for this relationship is required.')
self.related_object_type = kwargs.pop('related_object_type', None)
if self.related_object_type is None:
raise NewRelationshipError('The target object type for this relationship is required.')
if self.related_object_type not in self.valid_object_type:
raise NewAnalystDataError('The target object type is not a valid type. Actual: {self.related_object_type}.'.format(self=self))
return super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'):
return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
# class AnalystDataBehavior():
# def __init__(self, **kwargs) -> None:
# super().__init__(**kwargs)
# self.uuid = str(uuid.uuid4())
# self.Note: list[MISPNote] = []
# self.Opinion: list[MISPOpinion] = []
# self.Relationship: list[MISPRelationship] = []
# @property
# def notes(self) -> list[MISPNote]:
# return self.Note
# @property
# def opinions(self) -> list[MISPOpinion]:
# return self.Opinion
# @property
# def relationships(self) -> list[MISPRelationship]:
# return self.Relationship
# def add_analyst_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
# the_note = MISPNote()
# the_note.from_dict(note=note, language=language,
# object_uuid=self.uuid, object_type=self.classObjectType,
# **kwargs)
# self.notes.append(the_note)
# self.edited = True
# return the_note
# def add_analyst_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
# the_opinion = MISPOpinion()
# the_opinion.from_dict(opinion=opinion, comment=comment,
# object_uuid=self.uuid, object_type=self.classObjectType,
# **kwargs)
# self.opinions.append(the_opinion)
# self.edited = True
# return the_opinion
# def add_analyst_relationship(self, related_object_type: str, related_object_uuid: str, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
# the_relationship = MISPRelationship()
# the_relationship.from_dict(related_object_uuid=related_object_uuid, related_object_type=related_object_type,
# relationship_type=relationship_type,
# object_uuid=self.uuid, object_type=self.classObjectType,
# **kwargs)
# self.relationships.append(the_relationship)
# self.edited = True
# return the_relationship