Merge branch 'wip_analystdata'

pull/1227/head
Raphaël Vinot 2024-05-22 11:31:14 +02:00
commit feadedeadb
6 changed files with 636 additions and 15 deletions

View File

@ -39,7 +39,7 @@ try:
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship)
from .api import PyMISP, register_user # noqa
# NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future
from .tools import AbstractMISPObjectGenerator # noqa
@ -76,7 +76,8 @@ __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag',
'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist',
'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion',
'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement',
'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError',
'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship',
'PyMISPError', 'NewEventError', 'NewAttributeError',
'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat',
'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP'
]

View File

@ -8,7 +8,7 @@ from deprecated import deprecated # type: ignore
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum, IntEnum
from enum import Enum
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache

View File

@ -31,7 +31,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
@ -584,7 +585,6 @@ class PyMISP:
data['hard'] = 1
r = self._prepare_request('POST', request_url, data=data)
return self._check_json_response(r)
# ## END Event Report ###
# ## BEGIN Galaxy Cluster ###
@ -611,15 +611,199 @@ class PyMISP:
elif isinstance(galaxy_cluster, (int, str)):
cluster_id = galaxy_cluster
else:
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')
to_post = { 'Galaxy': { 'target_id': cluster_id } }
to_post = {'Galaxy': {'target_id': cluster_id}}
url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}'
r = self._prepare_request('POST', url, data=to_post)
return self._check_json_response(r)
# ## END Galaxy Cluster ###
# ## BEGIN Analyst Data ###a
def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Get an analyst data from a MISP instance
:param analyst_data: analyst data to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}')
analyst_data_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all':
return analyst_data_r
er = type(analyst_data)()
er.from_dict(**analyst_data_r)
return er
def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Add an analyst data to an existing MISP element
:param analyst_data: analyst_data to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
object_uuid = analyst_data.object_uuid
object_type = analyst_data.object_type
r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data)
new_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
return new_analyst_data
er = type(analyst_data)()
er.from_dict(**new_analyst_data)
return er
def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Update an analyst data on a MISP instance
:param analyst_data: analyst data to update
:param analyst_data_id: analyst data ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
if analyst_data_id is None:
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data)
updated_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all':
return updated_analyst_data
er = type(analyst_data)()
er.from_dict(**updated_analyst_data)
return er
def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete an analyst data from a MISP instance
:param analyst_data: analyst data to delete
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}'
r = self._prepare_request('POST', request_url)
return self._check_json_response(r)
# ## END Analyst Data ###
# ## BEGIN Analyst Note ###
def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Get a note from a MISP instance
:param note: note to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(note, pythonify)
def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Add a note to an existing MISP element
:param note: note to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(note, pythonify)
def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Update a note on a MISP instance
:param note: note to update
:param note_id: note ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(note, note_id, pythonify)
def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete a note from a MISP instance
:param note: note delete
"""
return self.delete_analyst_data(note)
# ## END Analyst Note ###
# ## BEGIN Analyst Opinion ###
def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Get an opinion from a MISP instance
:param opinion: opinion to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(opinion, pythonify)
def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Add an opinion to an existing MISP element
:param opinion: opinion to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(opinion, pythonify)
def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Update an opinion on a MISP instance
:param opinion: opinion to update
:param opinion_id: opinion ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(opinion, opinion_id, pythonify)
def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete an opinion from a MISP instance
:param opinion: opinion to delete
"""
return self.delete_analyst_data(opinion)
# ## END Analyst Opinion ###
# ## BEGIN Analyst Relationship ###
def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Get a relationship from a MISP instance
:param relationship: relationship to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(relationship, pythonify)
def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Add a relationship to an existing MISP element
:param relationship: relationship to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(relationship, pythonify)
def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Update a relationship on a MISP instance
:param relationship: relationship to update
:param relationship_id: relationship ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(relationship, relationship_id, pythonify)
def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete a relationship from a MISP instance
:param relationship: relationship to delete
"""
return self.delete_analyst_data(relationship)
# ## END Analyst Relationship ###
# ## BEGIN Object ###
def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:

View File

@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError):
pass
class NewAnalystDataError(PyMISPError):
pass
class NewNoteError(PyMISPError):
pass
class NewOpinionError(PyMISPError):
pass
class NewRelationshipError(PyMISPError):
pass
class UpdateAttributeError(PyMISPError):
pass

View File

@ -23,13 +23,89 @@ except ImportError:
import json
from .abstract import AbstractMISP, MISPTag
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
NewGalaxyClusterError, NewGalaxyClusterRelationError)
NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError)
logger = logging.getLogger('pymisp')
class AnalystDataBehaviorMixin(AbstractMISP):
# NOTE: edited here must be the property of Abstract MISP
def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(**kwargs)
self.uuid: str # Created in the child class
self._analyst_data_object_type: str # Must be defined in the child class
self.Note: list[MISPNote] = []
self.Opinion: list[MISPOpinion] = []
self.Relationship: list[MISPRelationship] = []
@property
def analyst_data_object_type(self) -> str:
return self._analyst_data_object_type
@property
def notes(self) -> list[MISPNote]:
return self.Note
@property
def opinions(self) -> list[MISPOpinion]:
return self.Opinion
@property
def relationships(self) -> list[MISPRelationship]:
return self.Relationship
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_note = MISPNote()
the_note.from_dict(note=note, language=language,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
self.notes.append(the_note)
self.edited = True
return the_note
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_opinion = MISPOpinion()
the_opinion.from_dict(opinion=opinion, comment=comment,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
self.opinions.append(the_opinion)
self.edited = True
return the_opinion
def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_relationship = MISPRelationship()
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
relationship_type=relationship_type,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
self.relationships.append(the_relationship)
self.edited = True
return the_relationship
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
# These members need a fully initialized class to be loaded properly
notes = kwargs.pop('Note', [])
opinions = kwargs.pop('Opinion', [])
relationships = kwargs.pop('Relationship', [])
super().from_dict(**kwargs)
for note in notes:
note.pop('object_uuid', None)
note.pop('object_type', None)
self.add_note(**note)
for opinion in opinions:
opinion.pop('object_uuid', None)
opinion.pop('object_type', None)
self.add_opinion(**opinion)
for relationship in relationships:
relationship.pop('object_uuid', None)
relationship.pop('object_type', None)
self.add_relationship(**relationship)
try:
from dateutil.parser import parse
except ImportError:
@ -226,11 +302,13 @@ class MISPSighting(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPAttribute(AbstractMISP):
class MISPAttribute(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'value', 'category', 'type', 'comment', 'data',
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
'first_seen', 'last_seen'}
_analyst_data_object_type = 'Attribute'
def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False):
"""Represents an Attribute
@ -666,12 +744,14 @@ class MISPObjectReference(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPObject(AbstractMISP):
class MISPObject(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'name', 'meta-category', 'description', 'template_uuid',
'template_version', 'uuid', 'timestamp', 'comment',
'first_seen', 'last_seen', 'deleted'}
_analyst_data_object_type = 'Object'
def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def]
default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None:
''' Master class representing a generic MISP object
@ -1063,12 +1143,17 @@ class MISPObject(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPEventReport(AbstractMISP):
class MISPEventReport(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
_analyst_data_object_type = 'EventReport'
timestamp: float | int | datetime
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.uuid: str = str(uuid.uuid4())
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'EventReport' in kwargs:
kwargs = kwargs['EventReport']
@ -1451,11 +1536,13 @@ class MISPGalaxy(AbstractMISP):
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPEvent(AbstractMISP):
class MISPEvent(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
'publish_timestamp', 'published', 'date', 'extends_uuid'}
_analyst_data_object_type = 'Event'
def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(**kwargs)
self.__schema_file = 'schema.json' if strict_validation else 'schema-lax.json'
@ -2318,3 +2405,200 @@ class MISPDecayingModel(AbstractMISP):
def __repr__(self) -> str:
return f'<{self.__class__.__name__}(uuid={self.uuid})>'
class MISPAnalystData(AbstractMISP):
_fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors',
'created', 'distribution', 'sharing_group_id', 'note_type_name'}
valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy',
'Object', 'Note', 'Opinion', 'Relationship', 'Organisation',
'SharingGroup'}
@property
def org(self) -> MISPOrganisation:
return self.Org
@property
def orgc(self) -> MISPOrganisation:
return self.Orgc
@orgc.setter
def orgc(self, orgc: MISPOrganisation) -> None:
if isinstance(orgc, MISPOrganisation):
self.Orgc = orgc
else:
raise PyMISPError('Orgc must be of type MISPOrganisation.')
def __new__(cls, *args, **kwargs):
if cls is MISPAnalystData:
raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
return object.__new__(cls)
def __init__(self, **kwargs: dict[str, Any]) -> None:
super().__init__(**kwargs)
self.uuid = str(uuid.uuid4())
self.object_uuid: str
self.object_type: str
self.authors: str
self.created: float | int | datetime
self.modified: float | int | datetime
self.SharingGroup: MISPSharingGroup
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
self.distribution = kwargs.pop('distribution', None)
if self.distribution is not None:
self.distribution = int(self.distribution)
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).')
self.object_uuid = kwargs.pop('object_uuid', None)
if self.object_uuid is None:
raise NewAnalystDataError('The UUID for which this element is attached is required.')
self.object_type = kwargs.pop('object_type', None)
if self.object_type is None:
raise NewAnalystDataError('The element type for which this element is attached is required.')
if self.object_type not in self.valid_object_type:
raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.')
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('created'):
ts = kwargs.pop('created')
if isinstance(ts, datetime):
self.created = ts
else:
self.created = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ
if kwargs.get('modified'):
ts = kwargs.pop('modified')
if isinstance(ts, datetime):
self.modified = ts
else:
self.modified = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ
if kwargs.get('Org'):
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if kwargs.get('Orgc'):
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def _set_default(self) -> None:
if not hasattr(self, 'created'):
self.created = datetime.timestamp(datetime.now())
if not hasattr(self, 'modified'):
self.modified = self.created
class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
_analyst_data_object_type = 'Note'
def __init__(self, **kwargs: dict[str, Any]) -> None:
self.note: str
self.language: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Note' in kwargs:
kwargs = kwargs['Note']
self.note = kwargs.pop('note', None)
if self.note is None:
raise NewNoteError('The text note of the note is required.')
super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'note'):
return '<{self.__class__.__name__}(note={self.note})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
_analyst_data_object_type = 'Opinion'
def __init__(self, **kwargs: dict[str, Any]) -> None:
self.opinion: int
self.comment: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Opinion' in kwargs:
kwargs = kwargs['Opinion']
self.opinion = kwargs.pop('opinion', None)
if self.opinion is not None:
self.opinion = int(self.opinion)
if not (0 <= self.opinion <= 100):
raise NewOpinionError('The opinion value must be between 0 and 100 included.')
else:
raise NewOpinionError('The opinion value is required.')
self.comment = kwargs.pop('comment', None)
if self.comment is None:
raise NewOpinionError('The text comment is required.')
return super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'opinion'):
return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
_analyst_data_object_type = 'Relationship'
def __init__(self, **kwargs: dict[str, Any]) -> None:
self.related_object_uuid: str
self.related_object_type: str
self.relationship_type: str
super().__init__(**kwargs)
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Relationship' in kwargs:
kwargs = kwargs['Relationship']
self.related_object_type = kwargs.pop('related_object_type', None)
if self.related_object_type is None:
raise NewRelationshipError('The target object type for this relationship is required.')
self.related_object_uuid = kwargs.pop('related_object_uuid', None)
if self.related_object_uuid is None:
if not isinstance(self.related_object_type, AbstractMISP):
raise NewRelationshipError('The target UUID for this relationship is required.')
else:
self.related_object_uuid = self.related_object_type.uuid
self.related_object_type = self.related_object_type._analyst_data_object_type
if self.related_object_type not in self.valid_object_type:
raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')
return super().from_dict(**kwargs)
def __repr__(self) -> str:
if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'):
return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)'

View File

@ -25,7 +25,8 @@ try:
MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag,
MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting,
MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster,
MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist)
MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist,
MISPNote)
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
except ImportError:
raise
@ -3198,7 +3199,7 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.toggle_global_pythonify()
def test_attach_galaxy_cluster(self) -> None:
event = self.create_simple_event()
event = self.create_simple_event()
event = self.admin_misp_connector.add_event(event, pythonify=True)
try:
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True)
@ -3234,6 +3235,142 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(event)
self.admin_misp_connector.toggle_global_pythonify()
def test_analyst_data_CRUD(self) -> None:
event = self.create_simple_event()
self.admin_misp_connector.toggle_global_pythonify()
try:
fake_uuid = str(uuid4())
new_note1 = MISPNote()
new_note1.object_type = 'Event'
new_note1.object_uuid = fake_uuid
new_note1.note = 'Fake note'
new_note1 = self.user_misp_connector.add_note(new_note1)
# The Note should be linked even for non-existing data
self.assertTrue(new_note1.object_uuid == fake_uuid)
new_note1.note = "Updated Note"
new_note1 = self.user_misp_connector.update_note(new_note1)
# The Note should be updatable
self.assertTrue(new_note1.note == "Updated Note")
# The Note should be able to get an Opinion
new_opinion = new_note1.add_opinion(42, 'Test Opinion')
new_note1 = self.user_misp_connector.update_note(new_note1)
# Fetch newly added node
new_note1 = self.user_misp_connector.get_note(new_note1)
# The Opinion shoud be able to be created via the Note
self.assertTrue(new_note1.opinions[0].opinion == new_opinion.opinion)
response = self.user_misp_connector.delete_note(new_note1)
# The Note should be deletable
self.assertTrue(response['success'])
self.assertEqual(response['message'], 'Note deleted.')
# The Opinion should not be deleted
opinion_resp = self.user_misp_connector.get_opinion(new_opinion)
self.assertTrue(opinion_resp.opinion == new_opinion.opinion)
new_note: MISPNote = event.add_note(note='Test Note', language='en')
new_note.distribution = 1 # Community
event = self.user_misp_connector.add_event(event)
# The note should be linked by Event UUID
self.assertEqual(new_note.object_type, 'Event')
self.assertTrue(new_note.object_uuid == event.uuid)
event = self.user_misp_connector.get_event(event)
# The Note should be present on the event
self.assertTrue(event.notes[0].object_uuid == event.uuid)
finally:
self.admin_misp_connector.delete_event(event)
try:
self.admin_misp_connector.delete_opinion(new_opinion)
self.admin_misp_connector.delete_note(new_note)
self.admin_misp_connector.delete_note(new_note1) # Should already be deleted
except Exception:
pass
def test_analyst_data_ACL(self) -> None:
event = self.create_simple_event()
event.distribution = 2
sg = MISPSharingGroup()
sg.name = 'Testcases SG'
sg.releasability = 'Testing'
sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True)
# Chec that sharing group was created
self.assertEqual(sharing_group.name, 'Testcases SG')
self.admin_misp_connector.toggle_global_pythonify()
try:
new_note: MISPNote = event.add_note(note='Test Note', language='en')
new_note.distribution = 0 # Org only
event = self.admin_misp_connector.add_event(event)
# The note should be linked by Event UUID
self.assertEqual(new_note.object_type, 'Event')
self.assertEqual(event.uuid, new_note.object_uuid)
event = self.admin_misp_connector.get_event(event)
# The note should be visible for the creator
self.assertEqual(len(event.notes), 1)
self.assertTrue(new_note.note == "Test Note")
resp = self.user_misp_connector.get_note(new_note)
# The note should not be visible to another org
self.assertTrue(len(resp), 0)
event = self.user_misp_connector.get_event(event)
# The Note attached to the event should not be visible for another org than the creator
self.assertEqual(len(event.Note), 0)
new_note = self.admin_misp_connector.get_note(new_note)
new_note.distribution = 4
new_note.sharing_group_id = sharing_group.id
new_note = self.admin_misp_connector.update_note(new_note)
self.assertEqual(int(new_note.sharing_group_id), int(sharing_group.id))
event = self.user_misp_connector.get_event(event)
# The Note attached to the event should not be visible for another org not part of the sharing group
self.assertEqual(len(event.Note), 0)
# Add org to the sharing group
r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group,
self.test_org, extend=True)
self.assertEqual(r['name'], 'Organisation added to the sharing group.')
event = self.user_misp_connector.get_event(event)
# The Note attached to the event should now be visible
self.assertEqual(len(event.Note), 1)
new_note.note = "Updated Note"
resp = self.user_misp_connector.update_note(new_note)
# The Note should not be updatable by another organisation
self.assertTrue(resp['errors'])
resp = self.user_misp_connector.delete_note(new_note)
# The Note should not be deletable by another organisation
self.assertTrue(resp['errors'])
organisation = MISPOrganisation()
organisation.name = 'Fake Org'
fake_org = self.admin_misp_connector.add_organisation(organisation, pythonify=True)
new_note_2 = new_note.add_note('Test Note 2')
new_note_2.orgc_uuid = fake_org.uuid
new_note_2 = self.user_misp_connector.add_note(new_note_2)
# Regular user should not be able to create a note on behalf of another organisation
self.assertFalse(new_note_2.orgc_uuid == fake_org.uuid)
# Note should have the orgc set to the use's organisation for non-privileged users
self.assertTrue(new_note_2.orgc_uuid == self.test_org.uuid)
finally:
self.admin_misp_connector.delete_event(event)
try:
pass
self.admin_misp_connector.delete_sharing_group(sharing_group.id)
self.admin_misp_connector.delete_organisation(fake_org)
self.admin_misp_connector.delete_note(new_note)
except Exception:
pass
@unittest.skip("Internal use only")
def missing_methods(self) -> None:
skip = [