mirror of https://github.com/MISP/PyMISP
chg: A bit more refactoring
parent
94a48a6fdd
commit
2cf5d99dc8
|
@ -32,7 +32,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
|
||||||
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
||||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
|
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
|
||||||
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
|
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
|
||||||
MISPNote, MISPOpinion, MISPRelationship
|
MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin
|
||||||
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
||||||
|
|
||||||
|
|
||||||
|
@ -585,24 +585,26 @@ class PyMISP:
|
||||||
data['hard'] = 1
|
data['hard'] = 1
|
||||||
r = self._prepare_request('POST', request_url, data=data)
|
r = self._prepare_request('POST', request_url, data=data)
|
||||||
return self._check_json_response(r)
|
return self._check_json_response(r)
|
||||||
|
|
||||||
# ## END Event Report ###
|
# ## END Event Report ###
|
||||||
|
|
||||||
# ## BEGIN Analyst Data ###
|
# ## BEGIN Analyst Data ###a
|
||||||
def get_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID,
|
def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID,
|
||||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||||
"""Get an analyst data from a MISP instance
|
"""Get an analyst data from a MISP instance
|
||||||
|
|
||||||
:param analyst_data: analyst data to get
|
:param analyst_data: analyst data to get
|
||||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||||
"""
|
"""
|
||||||
type = analyst_data.classObjectType
|
if isinstance(analyst_data, AnalystDataBehaviorMixin):
|
||||||
|
analyst_data_type = analyst_data.analyst_data_object_type
|
||||||
|
else:
|
||||||
|
analyst_data_type = 'all'
|
||||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||||
r = self._prepare_request('GET', f'analyst_data/view/{type}/{analyst_data_id}')
|
r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}')
|
||||||
analyst_data_r = self._check_json_response(r)
|
analyst_data_r = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r:
|
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all':
|
||||||
return analyst_data_r
|
return analyst_data_r
|
||||||
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
|
er = type(analyst_data)()
|
||||||
er.from_dict(**analyst_data_r)
|
er.from_dict(**analyst_data_r)
|
||||||
return er
|
return er
|
||||||
|
|
||||||
|
@ -613,14 +615,13 @@ class PyMISP:
|
||||||
:param analyst_data: analyst_data to add
|
:param analyst_data: analyst_data to add
|
||||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||||
"""
|
"""
|
||||||
type = analyst_data.classObjectType
|
|
||||||
object_uuid = analyst_data.object_uuid
|
object_uuid = analyst_data.object_uuid
|
||||||
object_type = analyst_data.object_type
|
object_type = analyst_data.object_type
|
||||||
r = self._prepare_request('POST', f'analyst_data/add/{type}/{object_uuid}/{object_type}', data=analyst_data)
|
r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data)
|
||||||
new_analyst_data = self._check_json_response(r)
|
new_analyst_data = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
|
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
|
||||||
return new_analyst_data
|
return new_analyst_data
|
||||||
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
|
er = type(analyst_data)()
|
||||||
er.from_dict(**new_analyst_data)
|
er.from_dict(**new_analyst_data)
|
||||||
return er
|
return er
|
||||||
|
|
||||||
|
@ -632,14 +633,17 @@ class PyMISP:
|
||||||
:param analyst_data_id: analyst data ID to update
|
:param analyst_data_id: analyst data ID to update
|
||||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||||
"""
|
"""
|
||||||
type = analyst_data.classObjectType
|
if isinstance(analyst_data, AnalystDataBehaviorMixin):
|
||||||
|
analyst_data_type = analyst_data.analyst_data_object_type
|
||||||
|
else:
|
||||||
|
analyst_data_type = 'all'
|
||||||
if analyst_data_id is None:
|
if analyst_data_id is None:
|
||||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||||
r = self._prepare_request('POST', f'analyst_data/edit/{type}/{analyst_data_id}', data=analyst_data)
|
r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data)
|
||||||
updated_analyst_data = self._check_json_response(r)
|
updated_analyst_data = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data:
|
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all':
|
||||||
return updated_analyst_data
|
return updated_analyst_data
|
||||||
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
|
er = type(analyst_data)()
|
||||||
er.from_dict(**updated_analyst_data)
|
er.from_dict(**updated_analyst_data)
|
||||||
return er
|
return er
|
||||||
|
|
||||||
|
@ -648,9 +652,12 @@ class PyMISP:
|
||||||
|
|
||||||
:param analyst_data: analyst data to delete
|
:param analyst_data: analyst data to delete
|
||||||
"""
|
"""
|
||||||
type = analyst_data.classObjectType
|
if isinstance(analyst_data, AnalystDataBehaviorMixin):
|
||||||
|
analyst_data_type = analyst_data.analyst_data_object_type
|
||||||
|
else:
|
||||||
|
analyst_data_type = 'all'
|
||||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||||
request_url = f'analyst_data/delete/{type}/{analyst_data_id}'
|
request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}'
|
||||||
r = self._prepare_request('POST', request_url)
|
r = self._prepare_request('POST', request_url)
|
||||||
return self._check_json_response(r)
|
return self._check_json_response(r)
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,15 @@ class AnalystDataBehaviorMixin(AbstractMISP):
|
||||||
def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.uuid: str # Created in the child class
|
self.uuid: str # Created in the child class
|
||||||
self.classObjectType: str # Must be defined in the child class
|
self._analyst_data_object_type: str # Must be defined in the child class
|
||||||
self.Note: list[MISPNote] = []
|
self.Note: list[MISPNote] = []
|
||||||
self.Opinion: list[MISPOpinion] = []
|
self.Opinion: list[MISPOpinion] = []
|
||||||
self.Relationship: list[MISPRelationship] = []
|
self.Relationship: list[MISPRelationship] = []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def analyst_data_object_type(self) -> str:
|
||||||
|
return self._analyst_data_object_type
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def notes(self) -> list[MISPNote]:
|
def notes(self) -> list[MISPNote]:
|
||||||
return self.Note
|
return self.Note
|
||||||
|
@ -57,7 +61,7 @@ class AnalystDataBehaviorMixin(AbstractMISP):
|
||||||
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
||||||
the_note = MISPNote()
|
the_note = MISPNote()
|
||||||
the_note.from_dict(note=note, language=language,
|
the_note.from_dict(note=note, language=language,
|
||||||
object_uuid=self.uuid, object_type=self.classObjectType,
|
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
self.notes.append(the_note)
|
self.notes.append(the_note)
|
||||||
self.edited = True
|
self.edited = True
|
||||||
|
@ -66,7 +70,7 @@ class AnalystDataBehaviorMixin(AbstractMISP):
|
||||||
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
|
||||||
the_opinion = MISPOpinion()
|
the_opinion = MISPOpinion()
|
||||||
the_opinion.from_dict(opinion=opinion, comment=comment,
|
the_opinion.from_dict(opinion=opinion, comment=comment,
|
||||||
object_uuid=self.uuid, object_type=self.classObjectType,
|
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
self.opinions.append(the_opinion)
|
self.opinions.append(the_opinion)
|
||||||
self.edited = True
|
self.edited = True
|
||||||
|
@ -76,7 +80,7 @@ class AnalystDataBehaviorMixin(AbstractMISP):
|
||||||
the_relationship = MISPRelationship()
|
the_relationship = MISPRelationship()
|
||||||
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
|
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
|
||||||
relationship_type=relationship_type,
|
relationship_type=relationship_type,
|
||||||
object_uuid=self.uuid, object_type=self.classObjectType,
|
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
self.relationships.append(the_relationship)
|
self.relationships.append(the_relationship)
|
||||||
self.edited = True
|
self.edited = True
|
||||||
|
@ -303,7 +307,7 @@ class MISPAttribute(AnalystDataBehaviorMixin):
|
||||||
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
|
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
|
||||||
'first_seen', 'last_seen'}
|
'first_seen', 'last_seen'}
|
||||||
|
|
||||||
classObjectType = 'Attribute'
|
_analyst_data_object_type = 'Attribute'
|
||||||
|
|
||||||
def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False):
|
def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False):
|
||||||
"""Represents an Attribute
|
"""Represents an Attribute
|
||||||
|
@ -746,7 +750,7 @@ class MISPObject(AnalystDataBehaviorMixin):
|
||||||
'template_version', 'uuid', 'timestamp', 'comment',
|
'template_version', 'uuid', 'timestamp', 'comment',
|
||||||
'first_seen', 'last_seen', 'deleted'}
|
'first_seen', 'last_seen', 'deleted'}
|
||||||
|
|
||||||
classObjectType = 'Object'
|
_analyst_data_object_type = 'Object'
|
||||||
|
|
||||||
def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def]
|
def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def]
|
||||||
default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None:
|
default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None:
|
||||||
|
@ -1142,7 +1146,7 @@ class MISPObject(AnalystDataBehaviorMixin):
|
||||||
class MISPEventReport(AnalystDataBehaviorMixin):
|
class MISPEventReport(AnalystDataBehaviorMixin):
|
||||||
|
|
||||||
_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
|
_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
|
||||||
classObjectType = 'EventReport'
|
_analyst_data_object_type = 'EventReport'
|
||||||
|
|
||||||
timestamp: float | int | datetime
|
timestamp: float | int | datetime
|
||||||
|
|
||||||
|
@ -1537,7 +1541,7 @@ class MISPEvent(AnalystDataBehaviorMixin):
|
||||||
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||||
'publish_timestamp', 'published', 'date', 'extends_uuid'}
|
'publish_timestamp', 'published', 'date', 'extends_uuid'}
|
||||||
|
|
||||||
classObjectType = 'Event'
|
_analyst_data_object_type = 'Event'
|
||||||
|
|
||||||
def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def]
|
def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
@ -2507,7 +2511,7 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||||
|
|
||||||
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
|
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})
|
||||||
|
|
||||||
classObjectType = 'Note'
|
_analyst_data_object_type = 'Note'
|
||||||
|
|
||||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||||
self.note: str
|
self.note: str
|
||||||
|
@ -2532,7 +2536,7 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||||
|
|
||||||
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
|
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})
|
||||||
|
|
||||||
classObjectType = 'Opinion'
|
_analyst_data_object_type = 'Opinion'
|
||||||
|
|
||||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||||
self.opinion: int
|
self.opinion: int
|
||||||
|
@ -2566,7 +2570,7 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||||
|
|
||||||
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
|
_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})
|
||||||
|
|
||||||
classObjectType = 'Relationship'
|
_analyst_data_object_type = 'Relationship'
|
||||||
|
|
||||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||||
self.related_object_uuid: str
|
self.related_object_uuid: str
|
||||||
|
@ -2587,7 +2591,7 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
|
||||||
raise NewRelationshipError('The target UUID for this relationship is required.')
|
raise NewRelationshipError('The target UUID for this relationship is required.')
|
||||||
else:
|
else:
|
||||||
self.related_object_uuid = self.related_object_type.uuid
|
self.related_object_uuid = self.related_object_type.uuid
|
||||||
self.related_object_type = self.related_object_type.classObjectType
|
self.related_object_type = self.related_object_type._analyst_data_object_type
|
||||||
|
|
||||||
if self.related_object_type not in self.valid_object_type:
|
if self.related_object_type not in self.valid_object_type:
|
||||||
raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')
|
raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')
|
||||||
|
|
Loading…
Reference in New Issue