From 12c29e6a06060d4c6c88983c6fd14643ae7a3106 Mon Sep 17 00:00:00 2001 From: Tom King Date: Tue, 12 Jan 2021 15:13:32 +0000 Subject: [PATCH] new: Add in ability to create/update/delete MISP Event Reports --- pymisp/__init__.py | 2 +- pymisp/api.py | 71 +++++++++++++++++++++++++++++++++++++++- pymisp/exceptions.py | 4 +++ pymisp/mispevent.py | 78 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 152 insertions(+), 3 deletions(-) diff --git a/pymisp/__init__.py b/pymisp/__init__.py index dfdf831..30b6e3e 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -24,7 +24,7 @@ Response (if any): try: from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa - from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist # noqa + from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport # noqa from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import stix # noqa diff --git a/pymisp/api.py b/pymisp/api.py index a7811a4..114fbb2 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -23,7 +23,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \ MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ - MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist + MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types SearchType = TypeVar('SearchType', str, int) @@ -389,6 +389,75 @@ class PyMISP: # ## END Event ### + # ## BEGIN Event Report ### + + def get_event_report(self, event_report: Union[MISPEventReport, int, str, UUID], + pythonify: bool = False) -> Union[Dict, MISPEventReport]: + """Get an event report from a MISP instance + + :param event_report: event report to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + event_report_id = get_uuid_or_id_from_abstract_misp(event_report) + r = self._prepare_request('GET', f'eventReports/view/{event_report_id}') + event_report_r = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in event_report_r: + return event_report_r + er = MISPEventReport() + er.from_dict(**event_report_r) + return er + + def add_event_report(self, event: Union[MISPEvent, int, str, UUID], event_report: MISPEventReport, pythonify: bool = False) -> Union[Dict, MISPEventReport]: + """Add an event report to an existing MISP event + + :param event: event to extend + :param event_report: event report to add. + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + event_id = get_uuid_or_id_from_abstract_misp(event) + r = self._prepare_request('POST', f'eventReports/add/{event_id}', data=event_report) + new_event_report = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in new_event_report: + return new_event_report + er = MISPEventReport() + er.from_dict(**new_event_report) + return er + + def update_event_report(self, event_report: MISPEventReport, event_report_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventReport]: + """Update an event report on a MISP instance + + :param event_report: event report to update + :param event_report_id: event report ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + if event_report_id is None: + erid = get_uuid_or_id_from_abstract_misp(event_report) + else: + erid = get_uuid_or_id_from_abstract_misp(event_report_id) + r = self._prepare_request('POST', f'eventReports/edit/{erid}', data=event_report) + updated_event_report = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in updated_event_report: + return updated_event_report + er = MISPEventReport() + er.from_dict(**updated_event_report) + return er + + def delete_event_report(self, event_report: Union[MISPEventReport, int, str, UUID], hard: bool = False) -> Dict: + """Delete an event report from a MISP instance + + :param event_report: event report to delete + :param hard: flag for hard delete + """ + event_report_id = get_uuid_or_id_from_abstract_misp(event_report) + request_url = f'eventReports/delete/{event_report_id}' + if hard: + request_url += "/1" + r = self._prepare_request('POST', request_url) + response = self._check_json_response(r) + return response + + # ## END Event Report ### + # ## BEGIN Object ### def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPObject]: diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index 8a809cc..e79453e 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -19,6 +19,10 @@ class NewAttributeError(PyMISPError): pass +class NewEventReportError(PyMISPError): + pass + + class UpdateAttributeError(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 04c33f9..ee126de 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -16,7 +16,7 @@ from pathlib import Path from typing import List, Optional, Union, IO, Dict, Any from .abstract import AbstractMISP, MISPTag -from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError +from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError logger = logging.getLogger('pymisp') @@ -982,6 +982,67 @@ class MISPObject(AbstractMISP): return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) +class MISPEventReport(AbstractMISP): + + _fields_for_feed: set = {'uuid', 'name', 'content', 'timestamp', 'deleted'} + + def from_dict(self, **kwargs): + if 'EventReport' in kwargs: + kwargs = kwargs['EventReport'] + super().from_dict(**kwargs) + + self.distribution = kwargs.pop('distribution', None) + if self.distribution is not None: + self.distribution = int(self.distribution) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewEventReportError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution)) + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + + self.name = kwargs.pop('name', None) + if self.name is None: + raise NewEventReportError('The name of the event report is required.') + + self.content = kwargs.pop('content', None) + if self.content is None: + raise NewAttributeError('The content of the event report is required.') + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('event_id'): + self.event_id = int(kwargs.pop('event_id')) + if kwargs.get('timestamp'): + ts = kwargs.pop('timestamp') + if isinstance(ts, datetime): + self.timestamp = ts + else: + self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc) + if kwargs.get('deleted'): + self.deleted = kwargs.pop('deleted') + + def __repr__(self) -> str: + if hasattr(self, 'name'): + return '<{self.__class__.__name__}(name={self.name})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def _set_default(self): + if not hasattr(self, 'timestamp'): + self.timestamp = datetime.timestamp(datetime.now()) + if not hasattr(self, 'name'): + self.name = '' + if not hasattr(self, 'content'): + self.content = '' + + class MISPEvent(AbstractMISP): _fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', @@ -1005,6 +1066,7 @@ class MISPEvent(AbstractMISP): self.RelatedEvent: List[MISPEvent] = [] self.ShadowAttribute: List[MISPShadowAttribute] = [] self.SharingGroup: MISPSharingGroup + self.EventReport: List[MISPEventReport] = [] self.Tag: List[MISPTag] = [] def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag: @@ -1149,6 +1211,10 @@ class MISPEvent(AbstractMISP): else: raise PyMISPError('All the attributes have to be of type MISPAttribute.') + @property + def event_reports(self) -> List[MISPEventReport]: + return self.EventReport + @property def shadow_attributes(self) -> List[MISPShadowAttribute]: return self.ShadowAttribute @@ -1272,6 +1338,8 @@ class MISPEvent(AbstractMISP): self.set_date(kwargs.pop('date')) if kwargs.get('Attribute'): [self.add_attribute(**a) for a in kwargs.pop('Attribute')] + if kwargs.get('EventReport'): + [self.add_event_report(**e) for e in kwargs.pop('EventReport')] # All other keys if kwargs.get('id'): @@ -1412,6 +1480,14 @@ class MISPEvent(AbstractMISP): return attr_list return attribute + def add_event_report(self, name: str, content: str, **kwargs) -> MISPEventReport: + """Add an event report. name and value are requred but you can pass all + other parameters supported by MISPEventReport""" + event_report = MISPEventReport() + event_report.from_dict(name=name, content=content, **kwargs) + self.event_reports.append(event_report) + return event_report + def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject: """Get an object by ID