From 2cf5d99dc87410fa133a563c16eb7398c09f90fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= <raphael@vinot.info> Date: Mon, 6 May 2024 16:39:07 +0200 Subject: [PATCH] chg: A bit more refactoring --- pymisp/api.py | 41 ++++++++++++++++++++++++----------------- pymisp/mispevent.py | 28 ++++++++++++++++------------ 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 3aa9f60..d361eb7 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -32,7 +32,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \ MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \ - MISPNote, MISPOpinion, MISPRelationship + MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types @@ -585,24 +585,26 @@ class PyMISP: data['hard'] = 1 r = self._prepare_request('POST', request_url, data=data) return self._check_json_response(r) - # ## END Event Report ### - # ## BEGIN Analyst Data ### - def get_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID, + # ## BEGIN Analyst Data ###a + def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: """Get an analyst data from a MISP instance :param analyst_data: analyst data to get :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ - type = analyst_data.classObjectType + if isinstance(analyst_data, AnalystDataBehaviorMixin): + analyst_data_type = analyst_data.analyst_data_object_type + else: + analyst_data_type = 'all' analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) - r = self._prepare_request('GET', f'analyst_data/view/{type}/{analyst_data_id}') + r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}') analyst_data_r = self._check_json_response(r) - if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r: + if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all': return analyst_data_r - er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)() + er = type(analyst_data)() er.from_dict(**analyst_data_r) return er @@ -613,14 +615,13 @@ class PyMISP: :param analyst_data: analyst_data to add :param pythonify: Returns a PyMISP Object instead of the plain json output """ - type = analyst_data.classObjectType object_uuid = analyst_data.object_uuid object_type = analyst_data.object_type - r = self._prepare_request('POST', f'analyst_data/add/{type}/{object_uuid}/{object_type}', data=analyst_data) + r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data) new_analyst_data = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data: return new_analyst_data - er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)() + er = type(analyst_data)() er.from_dict(**new_analyst_data) return er @@ -632,14 +633,17 @@ class PyMISP: :param analyst_data_id: analyst data ID to update :param pythonify: Returns a PyMISP Object instead of the plain json output """ - type = analyst_data.classObjectType + if isinstance(analyst_data, AnalystDataBehaviorMixin): + analyst_data_type = analyst_data.analyst_data_object_type + else: + analyst_data_type = 'all' if analyst_data_id is None: analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) - r = self._prepare_request('POST', f'analyst_data/edit/{type}/{analyst_data_id}', data=analyst_data) + r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data) updated_analyst_data = self._check_json_response(r) - if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data: + if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all': return updated_analyst_data - er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)() + er = type(analyst_data)() er.from_dict(**updated_analyst_data) return er @@ -648,9 +652,12 @@ class PyMISP: :param analyst_data: analyst data to delete """ - type = analyst_data.classObjectType + if isinstance(analyst_data, AnalystDataBehaviorMixin): + analyst_data_type = analyst_data.analyst_data_object_type + else: + analyst_data_type = 'all' analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) - request_url = f'analyst_data/delete/{type}/{analyst_data_id}' + request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}' r = self._prepare_request('POST', request_url) return self._check_json_response(r) diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index c0a95c5..8b108e8 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -37,11 +37,15 @@ class AnalystDataBehaviorMixin(AbstractMISP): def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) self.uuid: str # Created in the child class - self.classObjectType: str # Must be defined in the child class + self._analyst_data_object_type: str # Must be defined in the child class self.Note: list[MISPNote] = [] self.Opinion: list[MISPOpinion] = [] self.Relationship: list[MISPRelationship] = [] + @property + def analyst_data_object_type(self) -> str: + return self._analyst_data_object_type + @property def notes(self) -> list[MISPNote]: return self.Note @@ -57,7 +61,7 @@ class AnalystDataBehaviorMixin(AbstractMISP): def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] the_note = MISPNote() the_note.from_dict(note=note, language=language, - object_uuid=self.uuid, object_type=self.classObjectType, + object_uuid=self.uuid, object_type=self.analyst_data_object_type, **kwargs) self.notes.append(the_note) self.edited = True @@ -66,7 +70,7 @@ class AnalystDataBehaviorMixin(AbstractMISP): def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] the_opinion = MISPOpinion() the_opinion.from_dict(opinion=opinion, comment=comment, - object_uuid=self.uuid, object_type=self.classObjectType, + object_uuid=self.uuid, object_type=self.analyst_data_object_type, **kwargs) self.opinions.append(the_opinion) self.edited = True @@ -76,7 +80,7 @@ class AnalystDataBehaviorMixin(AbstractMISP): the_relationship = MISPRelationship() the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid, relationship_type=relationship_type, - object_uuid=self.uuid, object_type=self.classObjectType, + object_uuid=self.uuid, object_type=self.analyst_data_object_type, **kwargs) self.relationships.append(the_relationship) self.edited = True @@ -303,7 +307,7 @@ class MISPAttribute(AnalystDataBehaviorMixin): 'deleted', 'timestamp', 'to_ids', 'disable_correlation', 'first_seen', 'last_seen'} - classObjectType = 'Attribute' + _analyst_data_object_type = 'Attribute' def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False): """Represents an Attribute @@ -746,7 +750,7 @@ class MISPObject(AnalystDataBehaviorMixin): 'template_version', 'uuid', 'timestamp', 'comment', 'first_seen', 'last_seen', 'deleted'} - classObjectType = 'Object' + _analyst_data_object_type = 'Object' def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def] default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None: @@ -1142,7 +1146,7 @@ class MISPObject(AnalystDataBehaviorMixin): class MISPEventReport(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'} - classObjectType = 'EventReport' + _analyst_data_object_type = 'EventReport' timestamp: float | int | datetime @@ -1537,7 +1541,7 @@ class MISPEvent(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', 'publish_timestamp', 'published', 'date', 'extends_uuid'} - classObjectType = 'Event' + _analyst_data_object_type = 'Event' def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) @@ -2507,7 +2511,7 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'}) - classObjectType = 'Note' + _analyst_data_object_type = 'Note' def __init__(self, **kwargs: dict[str, Any]) -> None: self.note: str @@ -2532,7 +2536,7 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'}) - classObjectType = 'Opinion' + _analyst_data_object_type = 'Opinion' def __init__(self, **kwargs: dict[str, Any]) -> None: self.opinion: int @@ -2566,7 +2570,7 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'}) - classObjectType = 'Relationship' + _analyst_data_object_type = 'Relationship' def __init__(self, **kwargs: dict[str, Any]) -> None: self.related_object_uuid: str @@ -2587,7 +2591,7 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): raise NewRelationshipError('The target UUID for this relationship is required.') else: self.related_object_uuid = self.related_object_type.uuid - self.related_object_type = self.related_object_type.classObjectType + self.related_object_type = self.related_object_type._analyst_data_object_type if self.related_object_type not in self.valid_object_type: raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')