mirror of https://github.com/MISP/PyMISP
Merge branch 'main' of github.com:misp/pymisp
commit
2889bb5513
|
@ -2,6 +2,64 @@ Changelog
|
|||
=========
|
||||
|
||||
|
||||
v2.4.194 (2024-06-21)
|
||||
---------------------
|
||||
|
||||
Changes
|
||||
~~~~~~~
|
||||
- Bump version. [Raphaël Vinot]
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
|
||||
Fix
|
||||
~~~
|
||||
- Make a response in the tests a MISPUser obj. [Raphaël Vinot]
|
||||
- Tests failing du to missing error. [Raphaël Vinot]
|
||||
|
||||
|
||||
v2.4.193 (2024-06-06)
|
||||
---------------------
|
||||
|
||||
New
|
||||
~~~
|
||||
- [analyst-data] Added initial support of analyst data concept and
|
||||
functions - WiP. [Sami Mokaddem]
|
||||
|
||||
Changes
|
||||
~~~~~~~
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
- A bit more refactoring. [Raphaël Vinot]
|
||||
- Use from_dict in the mixin to initialize the objects. [Raphaël Vinot]
|
||||
- [analyst-data] Added improvements, API endpoints and tests. [Sami
|
||||
Mokaddem]
|
||||
- [analyst-data] Make sure to include note_type_name. [Sami Mokaddem]
|
||||
- Make mypy happy, change inheritance. [Raphaël Vinot]
|
||||
- Allow orgc context for search_galaxy_clusters. [Jeroen Pinoy]
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
- [analyst-data] Continued implementation of analyst-data support. [Sami
|
||||
Mokaddem]
|
||||
- Allow orgc context for search_galaxy_clusters. [Jeroen Pinoy]
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
- Bump deps. [Raphaël Vinot]
|
||||
- Bump changelog. [Raphaël Vinot]
|
||||
|
||||
Fix
|
||||
~~~
|
||||
- Get the tests to pass. [Raphaël Vinot]
|
||||
- Properly load AnalystData from dict. [Raphaël Vinot]
|
||||
- More changes to get the tests to pass. [Raphaël Vinot]
|
||||
- [event-report] Make sure to generate an UUID. [Sami Mokaddem]
|
||||
- Pass kwargs to abstract. [Raphaël Vinot]
|
||||
|
||||
Other
|
||||
~~~~~
|
||||
- Chg; Bump changelog. [Raphaël Vinot]
|
||||
- Chg; Bump version. [Raphaël Vinot]
|
||||
- Add test case. [Vincenzo]
|
||||
- Add attach galaxy cluster method. [Vincenzo]
|
||||
|
||||
|
||||
v2.4.190 (2024-04-18)
|
||||
---------------------
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -39,7 +39,7 @@ try:
|
|||
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
|
||||
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
|
||||
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
|
||||
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
|
||||
MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship)
|
||||
from .api import PyMISP, register_user # noqa
|
||||
# NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
|
@ -76,7 +76,8 @@ __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag',
|
|||
'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist',
|
||||
'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion',
|
||||
'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement',
|
||||
'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError',
|
||||
'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship',
|
||||
'PyMISPError', 'NewEventError', 'NewAttributeError',
|
||||
'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat',
|
||||
'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP'
|
||||
]
|
||||
|
|
|
@ -8,7 +8,7 @@ from deprecated import deprecated # type: ignore
|
|||
from json import JSONEncoder
|
||||
from uuid import UUID
|
||||
from abc import ABCMeta
|
||||
from enum import Enum, IntEnum
|
||||
from enum import Enum
|
||||
from typing import Any, Mapping
|
||||
from collections.abc import MutableMapping
|
||||
from functools import lru_cache
|
||||
|
|
221
pymisp/api.py
221
pymisp/api.py
|
@ -31,7 +31,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
|
|||
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
|
||||
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
|
||||
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
|
||||
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
|
||||
MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin
|
||||
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
||||
|
||||
|
||||
|
@ -584,9 +585,225 @@ class PyMISP:
|
|||
data['hard'] = 1
|
||||
r = self._prepare_request('POST', request_url, data=data)
|
||||
return self._check_json_response(r)
|
||||
|
||||
# ## END Event Report ###
|
||||
|
||||
# ## BEGIN Galaxy Cluster ###
|
||||
def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_cluster: MISPGalaxyCluster | int | str, local: bool = False, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Attach a galaxy cluster to an event or an attribute
|
||||
|
||||
:param misp_entity: a MISP Event or a MISP Attribute
|
||||
:param galaxy_cluster: Galaxy cluster to attach
|
||||
:param local: whether the object should be attached locally or not to the target
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
if isinstance(misp_entity, MISPEvent):
|
||||
attach_target_type = 'event'
|
||||
elif isinstance(misp_entity, MISPAttribute):
|
||||
attach_target_type = 'attribute'
|
||||
else:
|
||||
raise PyMISPError('The misp_entity must be MISPEvent or MISPAttribute')
|
||||
|
||||
attach_target_id = misp_entity.id
|
||||
local = 1 if local else 0
|
||||
|
||||
if isinstance(galaxy_cluster, MISPGalaxyCluster):
|
||||
cluster_id = galaxy_cluster.id
|
||||
elif isinstance(galaxy_cluster, (int, str)):
|
||||
cluster_id = galaxy_cluster
|
||||
else:
|
||||
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')
|
||||
|
||||
to_post = {'Galaxy': {'target_id': cluster_id}}
|
||||
url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}'
|
||||
|
||||
r = self._prepare_request('POST', url, data=to_post)
|
||||
return self._check_json_response(r)
|
||||
# ## END Galaxy Cluster ###
|
||||
|
||||
# ## BEGIN Analyst Data ###a
|
||||
def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID,
|
||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||
"""Get an analyst data from a MISP instance
|
||||
|
||||
:param analyst_data: analyst data to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
if isinstance(analyst_data, AnalystDataBehaviorMixin):
|
||||
analyst_data_type = analyst_data.analyst_data_object_type
|
||||
else:
|
||||
analyst_data_type = 'all'
|
||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||
r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}')
|
||||
analyst_data_r = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all':
|
||||
return analyst_data_r
|
||||
er = type(analyst_data)()
|
||||
er.from_dict(**analyst_data_r)
|
||||
return er
|
||||
|
||||
def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship,
|
||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||
"""Add an analyst data to an existing MISP element
|
||||
|
||||
:param analyst_data: analyst_data to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
object_uuid = analyst_data.object_uuid
|
||||
object_type = analyst_data.object_type
|
||||
r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data)
|
||||
new_analyst_data = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
|
||||
return new_analyst_data
|
||||
er = type(analyst_data)()
|
||||
er.from_dict(**new_analyst_data)
|
||||
return er
|
||||
|
||||
def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None,
|
||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||
"""Update an analyst data on a MISP instance
|
||||
|
||||
:param analyst_data: analyst data to update
|
||||
:param analyst_data_id: analyst data ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
if isinstance(analyst_data, AnalystDataBehaviorMixin):
|
||||
analyst_data_type = analyst_data.analyst_data_object_type
|
||||
else:
|
||||
analyst_data_type = 'all'
|
||||
if analyst_data_id is None:
|
||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||
r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data)
|
||||
updated_analyst_data = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all':
|
||||
return updated_analyst_data
|
||||
er = type(analyst_data)()
|
||||
er.from_dict(**updated_analyst_data)
|
||||
return er
|
||||
|
||||
def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete an analyst data from a MISP instance
|
||||
|
||||
:param analyst_data: analyst data to delete
|
||||
"""
|
||||
if isinstance(analyst_data, AnalystDataBehaviorMixin):
|
||||
analyst_data_type = analyst_data.analyst_data_object_type
|
||||
else:
|
||||
analyst_data_type = 'all'
|
||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||
request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}'
|
||||
r = self._prepare_request('POST', request_url)
|
||||
return self._check_json_response(r)
|
||||
|
||||
# ## END Analyst Data ###
|
||||
|
||||
# ## BEGIN Analyst Note ###
|
||||
|
||||
def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
|
||||
"""Get a note from a MISP instance
|
||||
|
||||
:param note: note to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
return self.get_analyst_data(note, pythonify)
|
||||
|
||||
def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
|
||||
"""Add a note to an existing MISP element
|
||||
|
||||
:param note: note to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.add_analyst_data(note, pythonify)
|
||||
|
||||
def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote:
|
||||
"""Update a note on a MISP instance
|
||||
|
||||
:param note: note to update
|
||||
:param note_id: note ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.update_analyst_data(note, note_id, pythonify)
|
||||
|
||||
def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete a note from a MISP instance
|
||||
|
||||
:param note: note delete
|
||||
"""
|
||||
return self.delete_analyst_data(note)
|
||||
|
||||
# ## END Analyst Note ###
|
||||
|
||||
# ## BEGIN Analyst Opinion ###
|
||||
|
||||
def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
|
||||
"""Get an opinion from a MISP instance
|
||||
|
||||
:param opinion: opinion to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
return self.get_analyst_data(opinion, pythonify)
|
||||
|
||||
def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
|
||||
"""Add an opinion to an existing MISP element
|
||||
|
||||
:param opinion: opinion to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.add_analyst_data(opinion, pythonify)
|
||||
|
||||
def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
|
||||
"""Update an opinion on a MISP instance
|
||||
|
||||
:param opinion: opinion to update
|
||||
:param opinion_id: opinion ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.update_analyst_data(opinion, opinion_id, pythonify)
|
||||
|
||||
def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete an opinion from a MISP instance
|
||||
|
||||
:param opinion: opinion to delete
|
||||
"""
|
||||
return self.delete_analyst_data(opinion)
|
||||
|
||||
# ## END Analyst Opinion ###
|
||||
|
||||
# ## BEGIN Analyst Relationship ###
|
||||
|
||||
def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
|
||||
"""Get a relationship from a MISP instance
|
||||
|
||||
:param relationship: relationship to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
return self.get_analyst_data(relationship, pythonify)
|
||||
|
||||
def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
|
||||
"""Add a relationship to an existing MISP element
|
||||
|
||||
:param relationship: relationship to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.add_analyst_data(relationship, pythonify)
|
||||
|
||||
def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
|
||||
"""Update a relationship on a MISP instance
|
||||
|
||||
:param relationship: relationship to update
|
||||
:param relationship_id: relationship ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.update_analyst_data(relationship, relationship_id, pythonify)
|
||||
|
||||
def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete a relationship from a MISP instance
|
||||
|
||||
:param relationship: relationship to delete
|
||||
"""
|
||||
return self.delete_analyst_data(relationship)
|
||||
|
||||
# ## END Analyst Relationship ###
|
||||
|
||||
# ## BEGIN Object ###
|
||||
|
||||
def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 96492b9c932a4b307216550abeadddc727e17cec
|
||||
Subproject commit e3288ef6e516624e3e335939a2b7fe4aef5ce510
|
|
@ -23,6 +23,22 @@ class NewEventReportError(PyMISPError):
|
|||
pass
|
||||
|
||||
|
||||
class NewAnalystDataError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class NewNoteError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class NewOpinionError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class NewRelationshipError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class UpdateAttributeError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
@ -51,18 +67,26 @@ class NoKey(PyMISPError):
|
|||
pass
|
||||
|
||||
|
||||
class MISPObjectException(PyMISPError):
|
||||
pass
|
||||
class MISPAttributeException(PyMISPError):
|
||||
"""A base class for attribute specific exceptions"""
|
||||
|
||||
class MISPObjectException(PyMISPError):
|
||||
"""A base class for object specific exceptions"""
|
||||
|
||||
|
||||
class InvalidMISPAttribute(MISPAttributeException):
|
||||
"""Exception raised when an attribute doesn't respect the constraints in the definition"""
|
||||
|
||||
class InvalidMISPObjectAttribute(MISPAttributeException):
|
||||
"""Exception raised when an object attribute doesn't respect the constraints in the definition"""
|
||||
|
||||
class InvalidMISPObject(MISPObjectException):
|
||||
"""Exception raised when an object doesn't respect the contrains in the definition"""
|
||||
pass
|
||||
"""Exception raised when an object doesn't respect the constraints in the definition"""
|
||||
|
||||
|
||||
class UnknownMISPObjectTemplate(MISPObjectException):
|
||||
"""Exception raised when the template is unknown"""
|
||||
pass
|
||||
|
||||
|
||||
|
||||
class InvalidMISPGalaxy(PyMISPError):
|
||||
|
|
|
@ -23,13 +23,89 @@ except ImportError:
|
|||
import json
|
||||
|
||||
from .abstract import AbstractMISP, MISPTag
|
||||
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
|
||||
PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
|
||||
NewGalaxyClusterError, NewGalaxyClusterRelationError)
|
||||
from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPAttribute,
|
||||
InvalidMISPObject, InvalidMISPObjectAttribute, PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
|
||||
NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError)
|
||||
|
||||
logger = logging.getLogger('pymisp')
|
||||
|
||||
|
||||
class AnalystDataBehaviorMixin(AbstractMISP):
|
||||
|
||||
# NOTE: edited here must be the property of Abstract MISP
|
||||
|
||||
def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
super().__init__(**kwargs)
|
||||
self.uuid: str # Created in the child class
|
||||
self._analyst_data_object_type: str # Must be defined in the child class
|
||||
self.Note: list[MISPNote] = []
|
||||
self.Opinion: list[MISPOpinion] = []
|
||||
self.Relationship: list[MISPRelationship] = []
|
||||
|
||||
@property
|
||||
def analyst_data_object_type(self) -> str:
|
||||
return self._analyst_data_object_type
|
||||
|
||||
@property
|
||||
def notes(self) -> list[MISPNote]:
|
||||
return self.Note
|
||||
|
||||
@property
|
||||
def opinions(self) -> list[MISPOpinion]:
|
||||
return self.Opinion
|
||||
|
||||
@property
|
||||
def relationships(self) -> list[MISPRelationship]:
|
||||
return self.Relationship
|
||||
|
||||
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
||||
the_note = MISPNote()
|
||||
the_note.from_dict(note=note, language=language,
|
||||
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
|
||||
**kwargs)
|
||||
self.notes.append(the_note)
|
||||
self.edited = True
|
||||
return the_note
|
||||
|
||||
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
||||
the_opinion = MISPOpinion()
|
||||
the_opinion.from_dict(opinion=opinion, comment=comment,
|
||||
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
|
||||
**kwargs)
|
||||
self.opinions.append(the_opinion)
|
||||
self.edited = True
|
||||
return the_opinion
|
||||
|
||||
def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
||||
the_relationship = MISPRelationship()
|
||||
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
|
||||
relationship_type=relationship_type,
|
||||
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
|
||||
**kwargs)
|
||||
self.relationships.append(the_relationship)
|
||||
self.edited = True
|
||||
return the_relationship
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
# These members need a fully initialized class to be loaded properly
|
||||
notes = kwargs.pop('Note', [])
|
||||
opinions = kwargs.pop('Opinion', [])
|
||||
relationships = kwargs.pop('Relationship', [])
|
||||
super().from_dict(**kwargs)
|
||||
for note in notes:
|
||||
note.pop('object_uuid', None)
|
||||
note.pop('object_type', None)
|
||||
self.add_note(**note)
|
||||
for opinion in opinions:
|
||||
opinion.pop('object_uuid', None)
|
||||
opinion.pop('object_type', None)
|
||||
self.add_opinion(**opinion)
|
||||
for relationship in relationships:
|
||||
relationship.pop('object_uuid', None)
|
||||
relationship.pop('object_type', None)
|
||||
self.add_relationship(**relationship)
|
||||
|
||||
|
||||
try:
|
||||
from dateutil.parser import parse
|
||||
except ImportError:
|
||||
|
@ -226,11 +302,13 @@ class MISPSighting(AbstractMISP):
|
|||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
|
||||
class MISPAttribute(AbstractMISP):
|
||||
class MISPAttribute(AnalystDataBehaviorMixin):
|
||||
_fields_for_feed: set[str] = {'uuid', 'value', 'category', 'type', 'comment', 'data',
|
||||
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
|
||||
'first_seen', 'last_seen'}
|
||||
|
||||
_analyst_data_object_type = 'Attribute'
|
||||
|
||||
def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False):
|
||||
"""Represents an Attribute
|
||||
|
||||
|
@ -666,12 +744,14 @@ class MISPObjectReference(AbstractMISP):
|
|||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
|
||||
class MISPObject(AbstractMISP):
|
||||
class MISPObject(AnalystDataBehaviorMixin):
|
||||
|
||||
_fields_for_feed: set[str] = {'name', 'meta-category', 'description', 'template_uuid',
|
||||
'template_version', 'uuid', 'timestamp', 'comment',
|
||||
'first_seen', 'last_seen', 'deleted'}
|
||||
|
||||
_analyst_data_object_type = 'Object'
|
||||
|
||||
def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def]
|
||||
default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None:
|
||||
''' Master class representing a generic MISP object
|
||||
|
@ -947,6 +1027,26 @@ class MISPObject(AbstractMISP):
|
|||
self.edited = True
|
||||
return reference
|
||||
|
||||
def get_attribute_by_id(self, attribute_id: str | int) -> MISPObjectAttribute:
|
||||
"""Get an object attribute by ID
|
||||
|
||||
:param attribute_id: The ID of the seeking object attribute"""
|
||||
for attribute in self.attributes:
|
||||
if hasattr(attribute, 'id') and attribute.id == attribute_id:
|
||||
return attribute
|
||||
|
||||
raise InvalidMISPObjectAttribute(f'Object attribute with {attribute_id} does not exist in this event')
|
||||
|
||||
def get_attribute_by_uuid(self, attribute_uuid: str) -> MISPObjectAttribute:
|
||||
"""Get an object attribute by UUID
|
||||
|
||||
:param attribute_uuid: The UUID of the seeking object attribute"""
|
||||
for attribute in self.attributes:
|
||||
if hasattr(attribute, 'uuid') and attribute.uuid == attribute_uuid:
|
||||
return attribute
|
||||
|
||||
raise InvalidMISPObjectAttribute(f'Object attribute with {attribute_uuid} does not exist in this event')
|
||||
|
||||
def get_attributes_by_relation(self, object_relation: str) -> list[MISPAttribute]:
|
||||
'''Returns the list of attributes with the given object relation in the object'''
|
||||
return self._fast_attribute_access.get(object_relation, [])
|
||||
|
@ -1063,12 +1163,17 @@ class MISPObject(AbstractMISP):
|
|||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
|
||||
class MISPEventReport(AbstractMISP):
|
||||
class MISPEventReport(AnalystDataBehaviorMixin):
|
||||
|
||||
_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
|
||||
_analyst_data_object_type = 'EventReport'
|
||||
|
||||
timestamp: float | int | datetime
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.uuid: str = str(uuid.uuid4())
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'EventReport' in kwargs:
|
||||
kwargs = kwargs['EventReport']
|
||||
|
@ -1451,11 +1556,13 @@ class MISPGalaxy(AbstractMISP):
|
|||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
|
||||
class MISPEvent(AbstractMISP):
|
||||
class MISPEvent(AnalystDataBehaviorMixin):
|
||||
|
||||
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||
'publish_timestamp', 'published', 'date', 'extends_uuid'}
|
||||
|
||||
_analyst_data_object_type = 'Event'
|
||||
|
||||
def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
super().__init__(**kwargs)
|
||||
self.__schema_file = 'schema.json' if strict_validation else 'schema-lax.json'
|
||||
|
@ -1952,6 +2059,25 @@ class MISPEvent(AbstractMISP):
|
|||
self.galaxies.append(misp_galaxy)
|
||||
return misp_galaxy
|
||||
|
||||
def get_attribute_by_id(self, attribute_id: str | int) -> MISPAttribute:
|
||||
"""Get an attribute by ID
|
||||
|
||||
:param attribute_id: The ID of the seeking attribute"""
|
||||
for attribute in self.attributes:
|
||||
if hasattr(attribute, 'id') and int(attribute.id) == int(attribute_id):
|
||||
return attribute
|
||||
raise InvalidMISPAttribute(f'Attribute with {attribute_id} does not exist in this event')
|
||||
|
||||
def get_attribute_by_uuid(self, attribute_uuid: str) -> MISPAttribute:
|
||||
"""Get an attribute by UUID
|
||||
|
||||
:param attribute_uuid: The UUID of the seeking attribute"""
|
||||
for attribute in self.attributes:
|
||||
if hasattr(attribute, 'uuid') and attribute.uuid == attribute_uuid:
|
||||
return attribute
|
||||
|
||||
raise InvalidMISPAttribute(f'Attribute with {attribute_uuid} does not exist in this event')
|
||||
|
||||
def get_object_by_id(self, object_id: str | int) -> MISPObject:
|
||||
"""Get an object by ID
|
||||
|
||||
|
@ -2318,3 +2444,200 @@ class MISPDecayingModel(AbstractMISP):
|
|||
|
||||
def __repr__(self) -> str:
|
||||
return f'<{self.__class__.__name__}(uuid={self.uuid})>'
|
||||
|
||||
|
||||
class MISPAnalystData(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors',
|
||||
'created', 'distribution', 'sharing_group_id', 'note_type_name'}
|
||||
|
||||
valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy',
|
||||
'Object', 'Note', 'Opinion', 'Relationship', 'Organisation',
|
||||
'SharingGroup'}
|
||||
|
||||
@property
|
||||
def org(self) -> MISPOrganisation:
|
||||
return self.Org
|
||||
|
||||
@property
|
||||
def orgc(self) -> MISPOrganisation:
|
||||
return self.Orgc
|
||||
|
||||
@orgc.setter
|
||||
def orgc(self, orgc: MISPOrganisation) -> None:
|
||||
if isinstance(orgc, MISPOrganisation):
|
||||
self.Orgc = orgc
|
||||
else:
|
||||
raise PyMISPError('Orgc must be of type MISPOrganisation.')
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls is MISPAnalystData:
|
||||
raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
|
||||
return object.__new__(cls)
|
||||
|
||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self.uuid = str(uuid.uuid4())
|
||||
self.object_uuid: str
|
||||
self.object_type: str
|
||||
self.authors: str
|
||||
self.created: float | int | datetime
|
||||
self.modified: float | int | datetime
|
||||
self.SharingGroup: MISPSharingGroup
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
self.distribution = kwargs.pop('distribution', None)
|
||||
if self.distribution is not None:
|
||||
self.distribution = int(self.distribution)
|
||||
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
||||
raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5')
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).')
|
||||
|
||||
self.object_uuid = kwargs.pop('object_uuid', None)
|
||||
if self.object_uuid is None:
|
||||
raise NewAnalystDataError('The UUID for which this element is attached is required.')
|
||||
self.object_type = kwargs.pop('object_type', None)
|
||||
if self.object_type is None:
|
||||
raise NewAnalystDataError('The element type for which this element is attached is required.')
|
||||
if self.object_type not in self.valid_object_type:
|
||||
raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.')
|
||||
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('created'):
|
||||
ts = kwargs.pop('created')
|
||||
if isinstance(ts, datetime):
|
||||
self.created = ts
|
||||
else:
|
||||
self.created = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ
|
||||
if kwargs.get('modified'):
|
||||
ts = kwargs.pop('modified')
|
||||
if isinstance(ts, datetime):
|
||||
self.modified = ts
|
||||
else:
|
||||
self.modified = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ
|
||||
|
||||
if kwargs.get('Org'):
|
||||
self.Org = MISPOrganisation()
|
||||
self.Org.from_dict(**kwargs.pop('Org'))
|
||||
if kwargs.get('Orgc'):
|
||||
self.Orgc = MISPOrganisation()
|
||||
self.Orgc.from_dict(**kwargs.pop('Orgc'))
|
||||
if kwargs.get('SharingGroup'):
|
||||
self.SharingGroup = MISPSharingGroup()
|
||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def _set_default(self) -> None:
|
||||
if not hasattr(self, 'created'):
|
||||
self.created = datetime.timestamp(datetime.now())
|
||||
if not hasattr(self, 'modified'):
|
||||
self.modified = self.created
|
||||
|
||||
|
||||
class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||
|
||||
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
|
||||
|
||||
_analyst_data_object_type = 'Note'
|
||||
|
||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||
self.note: str
|
||||
self.language: str
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'Note' in kwargs:
|
||||
kwargs = kwargs['Note']
|
||||
self.note = kwargs.pop('note', None)
|
||||
if self.note is None:
|
||||
raise NewNoteError('The text note of the note is required.')
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'note'):
|
||||
return '<{self.__class__.__name__}(note={self.note})'.format(self=self)
|
||||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
|
||||
class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||
|
||||
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
|
||||
|
||||
_analyst_data_object_type = 'Opinion'
|
||||
|
||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||
self.opinion: int
|
||||
self.comment: str
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'Opinion' in kwargs:
|
||||
kwargs = kwargs['Opinion']
|
||||
self.opinion = kwargs.pop('opinion', None)
|
||||
if self.opinion is not None:
|
||||
self.opinion = int(self.opinion)
|
||||
if not (0 <= self.opinion <= 100):
|
||||
raise NewOpinionError('The opinion value must be between 0 and 100 included.')
|
||||
else:
|
||||
raise NewOpinionError('The opinion value is required.')
|
||||
|
||||
self.comment = kwargs.pop('comment', None)
|
||||
if self.comment is None:
|
||||
raise NewOpinionError('The text comment is required.')
|
||||
|
||||
return super().from_dict(**kwargs)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'opinion'):
|
||||
return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self)
|
||||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
|
||||
class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||
|
||||
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
|
||||
|
||||
_analyst_data_object_type = 'Relationship'
|
||||
|
||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||
self.related_object_uuid: str
|
||||
self.related_object_type: str
|
||||
self.relationship_type: str
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'Relationship' in kwargs:
|
||||
kwargs = kwargs['Relationship']
|
||||
self.related_object_type = kwargs.pop('related_object_type', None)
|
||||
if self.related_object_type is None:
|
||||
raise NewRelationshipError('The target object type for this relationship is required.')
|
||||
|
||||
self.related_object_uuid = kwargs.pop('related_object_uuid', None)
|
||||
if self.related_object_uuid is None:
|
||||
if not isinstance(self.related_object_type, AbstractMISP):
|
||||
raise NewRelationshipError('The target UUID for this relationship is required.')
|
||||
else:
|
||||
self.related_object_uuid = self.related_object_type.uuid
|
||||
self.related_object_type = self.related_object_type._analyst_data_object_type
|
||||
|
||||
if self.related_object_type not in self.valid_object_type:
|
||||
raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')
|
||||
|
||||
return super().from_dict(**kwargs)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'):
|
||||
return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self)
|
||||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[tool.poetry]
|
||||
name = "pymisp"
|
||||
version = "2.4.190"
|
||||
version = "2.4.194"
|
||||
description = "Python API for MISP."
|
||||
authors = ["Raphaël Vinot <raphael.vinot@circl.lu>"]
|
||||
license = "BSD-2-Clause"
|
||||
|
@ -42,7 +42,7 @@ include = [
|
|||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.8"
|
||||
requests = "^2.31.0"
|
||||
requests = "^2.32.3"
|
||||
python-dateutil = "^2.9.0.post0"
|
||||
deprecated = "^1.2.14"
|
||||
extract_msg = {version = "^0.48.5", optional = true}
|
||||
|
@ -53,12 +53,12 @@ pydeep2 = {version = "^0.5.1", optional = true}
|
|||
lief = {version = "^0.14.1", optional = true}
|
||||
beautifulsoup4 = {version = "^4.12.3", optional = true}
|
||||
validators = {version = "^0.28.0", optional = true}
|
||||
sphinx-autodoc-typehints = {version = "^2.1.0", optional = true, python = ">=3.9"}
|
||||
sphinx-autodoc-typehints = {version = "^2.2.2", optional = true, python = ">=3.9"}
|
||||
docutils = {version = "^0.21.1", optional = true, python = ">=3.9"}
|
||||
recommonmark = {version = "^0.7.1", optional = true, python = ">=3.9"}
|
||||
reportlab = {version = "^4.2.0", optional = true}
|
||||
reportlab = {version = "^4.2.2", optional = true}
|
||||
pyfaup = {version = "^1.2", optional = true}
|
||||
publicsuffixlist = {version = "^0.10.0.20240403", optional = true}
|
||||
publicsuffixlist = {version = "^1.0.1.20240625", optional = true}
|
||||
urllib3 = {extras = ["brotli"], version = "*", optional = true}
|
||||
Sphinx = {version = "^7.3.7", python = ">=3.9", optional = true}
|
||||
|
||||
|
@ -74,14 +74,14 @@ brotli = ['urllib3']
|
|||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
requests-mock = "^1.12.1"
|
||||
mypy = "^1.10.0"
|
||||
mypy = "^1.10.1"
|
||||
ipython = [
|
||||
{version = "<8.13.0", python = "<3.9"},
|
||||
{version = "^8.18.0", python = ">=3.9"},
|
||||
{version = "^8.19.0", python = ">=3.10"}
|
||||
]
|
||||
jupyterlab = "^4.1.7"
|
||||
types-requests = "^2.31.0.20240406"
|
||||
jupyterlab = "^4.2.2"
|
||||
types-requests = "^2.32.0.20240622"
|
||||
types-python-dateutil = "^2.9.0.20240316"
|
||||
types-redis = "^4.6.0.20240425"
|
||||
types-Flask = "^1.1.6"
|
||||
|
|
|
@ -25,7 +25,8 @@ try:
|
|||
MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag,
|
||||
MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting,
|
||||
MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster,
|
||||
MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist)
|
||||
MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist,
|
||||
MISPNote)
|
||||
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
|
||||
except ImportError:
|
||||
raise
|
||||
|
@ -78,7 +79,6 @@ class TestComprehensive(unittest.TestCase):
|
|||
cls.admin_misp_connector.set_server_setting('debug', 1, force=True)
|
||||
if not fast_mode:
|
||||
r = cls.admin_misp_connector.update_misp()
|
||||
print(r)
|
||||
# Creates an org
|
||||
organisation = MISPOrganisation()
|
||||
organisation.name = 'Test Org'
|
||||
|
@ -2716,7 +2716,8 @@ class TestComprehensive(unittest.TestCase):
|
|||
# # Enable autoalert on admin
|
||||
self.admin_misp_connector._current_user.autoalert = True
|
||||
self.admin_misp_connector._current_user.termsaccepted = True
|
||||
self.user_misp_connector.update_user(self.admin_misp_connector._current_user)
|
||||
admin_usr = self.admin_misp_connector.update_user(self.admin_misp_connector._current_user, pythonify=True)
|
||||
self.assertTrue(admin_usr.autoalert)
|
||||
|
||||
first = self.admin_misp_connector.add_event(first, pythonify=True)
|
||||
second = self.admin_misp_connector.add_event(second, pythonify=True)
|
||||
|
@ -3075,16 +3076,13 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.user_misp_connector.delete_event_report(new_event_report)
|
||||
|
||||
def test_search_galaxy(self) -> None:
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies() # type: ignore[assignment]
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True) # type: ignore[assignment]
|
||||
galaxy: MISPGalaxy = galaxies[0]
|
||||
ret = self.admin_misp_connector.search_galaxy(value=galaxy.name)
|
||||
ret = self.admin_misp_connector.search_galaxy(value=galaxy.name, pythonify=True)
|
||||
self.assertEqual(len(ret), 1)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
||||
def test_galaxy_cluster(self) -> None:
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies() # type: ignore[assignment]
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True) # type: ignore[assignment]
|
||||
galaxy: MISPGalaxy = galaxies[0]
|
||||
new_galaxy_cluster: MISPGalaxyCluster = MISPGalaxyCluster()
|
||||
new_galaxy_cluster.value = "Test Cluster"
|
||||
|
@ -3093,13 +3091,13 @@ class TestComprehensive(unittest.TestCase):
|
|||
new_galaxy_cluster.description = "Example test cluster"
|
||||
try:
|
||||
if gid := galaxy.id:
|
||||
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True) # type: ignore[assignment]
|
||||
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True, pythonify=True) # type: ignore[assignment]
|
||||
else:
|
||||
raise Exception("No galaxy found")
|
||||
existing_galaxy_cluster = galaxy.clusters[0]
|
||||
|
||||
if gid := galaxy.id:
|
||||
new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(gid, new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(gid, new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
else:
|
||||
raise Exception("No galaxy found")
|
||||
# The new galaxy cluster should be under the selected galaxy
|
||||
|
@ -3108,7 +3106,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.assertEqual(new_galaxy_cluster.value, "Test Cluster")
|
||||
|
||||
new_galaxy_cluster.add_cluster_element("synonyms", "Test2")
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
|
||||
# The cluster should have one element that is a synonym
|
||||
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1)
|
||||
|
@ -3121,22 +3119,22 @@ class TestComprehensive(unittest.TestCase):
|
|||
|
||||
# The cluster element should be updatable
|
||||
element.value = "Test3"
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
element = new_galaxy_cluster.cluster_elements[0]
|
||||
self.assertEqual(element.value, "Test3")
|
||||
|
||||
new_galaxy_cluster.add_cluster_element("synonyms", "ToDelete")
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
# The cluster should have two elements
|
||||
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 2)
|
||||
|
||||
new_galaxy_cluster.cluster_elements = [e for e in new_galaxy_cluster.cluster_elements if e.value != "ToDelete"]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
# The cluster elements should be deletable
|
||||
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1)
|
||||
|
||||
new_galaxy_cluster.add_cluster_relation(existing_galaxy_cluster, "is-tested-by")
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
# The cluster should have a relationship
|
||||
self.assertEqual(len(new_galaxy_cluster.cluster_relations), 1)
|
||||
relation = new_galaxy_cluster.cluster_relations[0]
|
||||
|
@ -3144,7 +3142,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.assertEqual(relation.referenced_galaxy_cluster_uuid, existing_galaxy_cluster.uuid)
|
||||
|
||||
relation.add_tag("tlp:amber")
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
relation = new_galaxy_cluster.cluster_relations[0]
|
||||
# The relationship should have a tag of tlp:amber
|
||||
self.assertEqual(len(relation.tags), 1)
|
||||
|
@ -3154,13 +3152,13 @@ class TestComprehensive(unittest.TestCase):
|
|||
resp = self.admin_misp_connector.delete_galaxy_cluster_relation(relation)
|
||||
self.assertTrue(resp['success'])
|
||||
# The cluster relation should no longer be present
|
||||
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
self.assertEqual(len(new_galaxy_cluster.cluster_relations), 0)
|
||||
|
||||
resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster)
|
||||
# Galaxy clusters should be soft deletable
|
||||
self.assertTrue(resp['success'])
|
||||
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster, pythonify=True) # type: ignore[assignment]
|
||||
self.assertTrue(isinstance(new_galaxy_cluster, MISPGalaxyCluster))
|
||||
|
||||
resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True)
|
||||
|
@ -3169,23 +3167,20 @@ class TestComprehensive(unittest.TestCase):
|
|||
resp = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment]
|
||||
self.assertTrue("errors" in resp)
|
||||
finally:
|
||||
self.admin_misp_connector.delete_galaxy_cluster_relation(relation)
|
||||
self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
pass
|
||||
|
||||
def test_event_galaxy(self) -> None:
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
event = self.create_simple_event()
|
||||
try:
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies() # type: ignore[assignment]
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True) # type: ignore[assignment]
|
||||
galaxy: MISPGalaxy = galaxies[0]
|
||||
if gid := galaxy.id:
|
||||
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True) # type: ignore[assignment]
|
||||
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True, pythonify=True) # type: ignore[assignment]
|
||||
else:
|
||||
raise Exception("No galaxy found")
|
||||
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[0]
|
||||
event.add_tag(galaxy_cluster.tag_name)
|
||||
event = self.admin_misp_connector.add_event(event)
|
||||
event = self.admin_misp_connector.add_event(event, pythonify=True)
|
||||
# The event should have a galaxy attached
|
||||
self.assertEqual(len(event.galaxies), 1)
|
||||
event_galaxy = event.galaxies[0]
|
||||
|
@ -3195,7 +3190,179 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id)
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
||||
def test_attach_galaxy_cluster(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
event = self.admin_misp_connector.add_event(event, pythonify=True)
|
||||
try:
|
||||
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True)
|
||||
galaxy: MISPGalaxy = galaxies[0]
|
||||
if gid := galaxy.id:
|
||||
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True, pythonify=True)
|
||||
else:
|
||||
raise Exception("No galaxy found")
|
||||
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[0]
|
||||
response = self.admin_misp_connector.attach_galaxy_cluster(event, galaxy_cluster)
|
||||
self.assertTrue(response['saved'])
|
||||
event = self.admin_misp_connector.get_event(event.id, pythonify=True)
|
||||
|
||||
self.assertEqual(len(event.galaxies), 1)
|
||||
event_galaxy = event.galaxies[0]
|
||||
# The galaxy ID should equal the galaxy from which the cluster came from
|
||||
self.assertEqual(event_galaxy.id, galaxy.id)
|
||||
# The galaxy cluster should equal the cluster added
|
||||
self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id)
|
||||
|
||||
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[1]
|
||||
|
||||
# Test on attribute
|
||||
attribute = event.attributes[0]
|
||||
response = self.admin_misp_connector.attach_galaxy_cluster(attribute, galaxy_cluster)
|
||||
self.assertTrue(response['saved'])
|
||||
event = self.admin_misp_connector.get_event(event.id, pythonify=True)
|
||||
attribute = event.attributes[0]
|
||||
self.assertEqual(len(attribute.galaxies), 1)
|
||||
attribute_galaxy = attribute.galaxies[0]
|
||||
# The galaxy ID should equal the galaxy from which the cluster came from
|
||||
self.assertEqual(attribute_galaxy.id, galaxy.id)
|
||||
# The galaxy cluster should equal the cluster added
|
||||
self.assertEqual(attribute_galaxy.clusters[0].id, galaxy_cluster.id)
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
|
||||
def test_analyst_data_CRUD(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
try:
|
||||
fake_uuid = str(uuid4())
|
||||
new_note1 = MISPNote()
|
||||
new_note1.object_type = 'Event'
|
||||
new_note1.object_uuid = fake_uuid
|
||||
new_note1.note = 'Fake note'
|
||||
new_note1 = self.user_misp_connector.add_note(new_note1)
|
||||
# The Note should be linked even for non-existing data
|
||||
self.assertTrue(new_note1.object_uuid == fake_uuid)
|
||||
|
||||
new_note1.note = "Updated Note"
|
||||
new_note1 = self.user_misp_connector.update_note(new_note1)
|
||||
# The Note should be updatable
|
||||
self.assertTrue(new_note1.note == "Updated Note")
|
||||
|
||||
# The Note should be able to get an Opinion
|
||||
new_opinion = new_note1.add_opinion(42, 'Test Opinion')
|
||||
new_note1 = self.user_misp_connector.update_note(new_note1)
|
||||
# Fetch newly added node
|
||||
new_note1 = self.user_misp_connector.get_note(new_note1)
|
||||
# The Opinion shoud be able to be created via the Note
|
||||
self.assertTrue(new_note1.opinions[0].opinion == new_opinion.opinion)
|
||||
|
||||
response = self.user_misp_connector.delete_note(new_note1)
|
||||
# The Note should be deletable
|
||||
self.assertTrue(response['success'])
|
||||
self.assertEqual(response['message'], 'Note deleted.')
|
||||
# The Opinion should not be deleted
|
||||
opinion_resp = self.user_misp_connector.get_opinion(new_opinion)
|
||||
self.assertTrue(opinion_resp.opinion == new_opinion.opinion)
|
||||
|
||||
new_note: MISPNote = event.add_note(note='Test Note', language='en')
|
||||
new_note.distribution = 1 # Community
|
||||
event = self.user_misp_connector.add_event(event)
|
||||
# The note should be linked by Event UUID
|
||||
self.assertEqual(new_note.object_type, 'Event')
|
||||
self.assertTrue(new_note.object_uuid == event.uuid)
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note should be present on the event
|
||||
self.assertTrue(event.notes[0].object_uuid == event.uuid)
|
||||
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
try:
|
||||
self.admin_misp_connector.delete_opinion(new_opinion)
|
||||
self.admin_misp_connector.delete_note(new_note)
|
||||
self.admin_misp_connector.delete_note(new_note1) # Should already be deleted
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_analyst_data_ACL(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
event.distribution = 2
|
||||
sg = MISPSharingGroup()
|
||||
sg.name = 'Testcases SG'
|
||||
sg.releasability = 'Testing'
|
||||
sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True)
|
||||
# Chec that sharing group was created
|
||||
self.assertEqual(sharing_group.name, 'Testcases SG')
|
||||
|
||||
try:
|
||||
new_note: MISPNote = event.add_note(note='Test Note', language='en')
|
||||
new_note.distribution = 0 # Org only
|
||||
event = self.admin_misp_connector.add_event(event, pythonify=True)
|
||||
|
||||
# The note should be linked by Event UUID
|
||||
self.assertEqual(new_note.object_type, 'Event')
|
||||
self.assertEqual(event.uuid, new_note.object_uuid)
|
||||
|
||||
event = self.admin_misp_connector.get_event(event, pythonify=True)
|
||||
# The note should be visible for the creator
|
||||
self.assertEqual(len(event.notes), 1)
|
||||
self.assertTrue(new_note.note == "Test Note")
|
||||
|
||||
resp = self.user_misp_connector.get_note(new_note)
|
||||
# The note should not be visible to another org
|
||||
self.assertTrue(len(resp), 0)
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note attached to the event should not be visible for another org than the creator
|
||||
self.assertEqual(len(event.Note), 0)
|
||||
|
||||
new_note = self.admin_misp_connector.get_note(new_note, pythonify=True)
|
||||
new_note.distribution = 4
|
||||
new_note.sharing_group_id = sharing_group.id
|
||||
new_note = self.admin_misp_connector.update_note(new_note, pythonify=True)
|
||||
self.assertEqual(int(new_note.sharing_group_id), int(sharing_group.id))
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note attached to the event should not be visible for another org not part of the sharing group
|
||||
self.assertEqual(len(event.Note), 0)
|
||||
|
||||
# Add org to the sharing group
|
||||
r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group,
|
||||
self.test_org, extend=True)
|
||||
self.assertEqual(r['name'], 'Organisation added to the sharing group.')
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note attached to the event should now be visible
|
||||
self.assertEqual(len(event.Note), 1)
|
||||
|
||||
new_note.note = "Updated Note"
|
||||
resp = self.user_misp_connector.update_note(new_note)
|
||||
# The Note should not be updatable by another organisation
|
||||
self.assertTrue(resp['errors'])
|
||||
|
||||
resp = self.user_misp_connector.delete_note(new_note)
|
||||
# The Note should not be deletable by another organisation
|
||||
self.assertTrue(resp['errors'])
|
||||
|
||||
organisation = MISPOrganisation()
|
||||
organisation.name = 'Fake Org'
|
||||
fake_org = self.admin_misp_connector.add_organisation(organisation, pythonify=True)
|
||||
new_note_2 = new_note.add_note('Test Note 2')
|
||||
new_note_2.orgc_uuid = fake_org.uuid
|
||||
new_note_2 = self.user_misp_connector.add_note(new_note_2)
|
||||
# Regular user should not be able to create a note on behalf of another organisation
|
||||
self.assertFalse(new_note_2.orgc_uuid == fake_org.uuid)
|
||||
# Note should have the orgc set to the use's organisation for non-privileged users
|
||||
self.assertTrue(new_note_2.orgc_uuid == self.test_org.uuid)
|
||||
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
try:
|
||||
pass
|
||||
self.admin_misp_connector.delete_sharing_group(sharing_group.id)
|
||||
self.admin_misp_connector.delete_organisation(fake_org)
|
||||
self.admin_misp_connector.delete_note(new_note)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@unittest.skip("Internal use only")
|
||||
def missing_methods(self) -> None:
|
||||
|
|
Loading…
Reference in New Issue