new: Add in ability to create/update/delete MISP Event Reports

pull/700/head
Tom King 2021-01-12 15:13:32 +00:00 committed by Raphaël Vinot
parent ebd5a2397b
commit f71c250402
4 changed files with 152 additions and 3 deletions

View File

@ -24,7 +24,7 @@ Response (if any):
try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -24,7 +24,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
SearchType = TypeVar('SearchType', str, int)
@ -415,6 +415,75 @@ class PyMISP:
# ## END Event ###
# ## BEGIN Event Report ###
def get_event_report(self, event_report: Union[MISPEventReport, int, str, UUID],
pythonify: bool = False) -> Union[Dict, MISPEventReport]:
"""Get an event report from a MISP instance
:param event_report: event report to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
r = self._prepare_request('GET', f'eventReports/view/{event_report_id}')
event_report_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in event_report_r:
return event_report_r
er = MISPEventReport()
er.from_dict(**event_report_r)
return er
def add_event_report(self, event: Union[MISPEvent, int, str, UUID], event_report: MISPEventReport, pythonify: bool = False) -> Union[Dict, MISPEventReport]:
"""Add an event report to an existing MISP event
:param event: event to extend
:param event_report: event report to add.
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
event_id = get_uuid_or_id_from_abstract_misp(event)
r = self._prepare_request('POST', f'eventReports/add/{event_id}', data=event_report)
new_event_report = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_event_report:
return new_event_report
er = MISPEventReport()
er.from_dict(**new_event_report)
return er
def update_event_report(self, event_report: MISPEventReport, event_report_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventReport]:
"""Update an event report on a MISP instance
:param event_report: event report to update
:param event_report_id: event report ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if event_report_id is None:
erid = get_uuid_or_id_from_abstract_misp(event_report)
else:
erid = get_uuid_or_id_from_abstract_misp(event_report_id)
r = self._prepare_request('POST', f'eventReports/edit/{erid}', data=event_report)
updated_event_report = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_event_report:
return updated_event_report
er = MISPEventReport()
er.from_dict(**updated_event_report)
return er
def delete_event_report(self, event_report: Union[MISPEventReport, int, str, UUID], hard: bool = False) -> Dict:
"""Delete an event report from a MISP instance
:param event_report: event report to delete
:param hard: flag for hard delete
"""
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
request_url = f'eventReports/delete/{event_report_id}'
if hard:
request_url += "/1"
r = self._prepare_request('POST', request_url)
response = self._check_json_response(r)
return response
# ## END Event Report ###
# ## BEGIN Object ###
def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPObject]:

View File

@ -19,6 +19,10 @@ class NewAttributeError(PyMISPError):
pass
class NewEventReportError(PyMISPError):
pass
class UpdateAttributeError(PyMISPError):
pass

View File

@ -16,7 +16,7 @@ from pathlib import Path
from typing import List, Optional, Union, IO, Dict, Any
from .abstract import AbstractMISP, MISPTag
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError
logger = logging.getLogger('pymisp')
@ -991,6 +991,67 @@ class MISPObject(AbstractMISP):
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPEventReport(AbstractMISP):
_fields_for_feed: set = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
def from_dict(self, **kwargs):
if 'EventReport' in kwargs:
kwargs = kwargs['EventReport']
super().from_dict(**kwargs)
self.distribution = kwargs.pop('distribution', None)
if self.distribution is not None:
self.distribution = int(self.distribution)
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewEventReportError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
self.name = kwargs.pop('name', None)
if self.name is None:
raise NewEventReportError('The name of the event report is required.')
self.content = kwargs.pop('content', None)
if self.content is None:
raise NewAttributeError('The content of the event report is required.')
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('event_id'):
self.event_id = int(kwargs.pop('event_id'))
if kwargs.get('timestamp'):
ts = kwargs.pop('timestamp')
if isinstance(ts, datetime):
self.timestamp = ts
else:
self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc)
if kwargs.get('deleted'):
self.deleted = kwargs.pop('deleted')
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def _set_default(self):
if not hasattr(self, 'timestamp'):
self.timestamp = datetime.timestamp(datetime.now())
if not hasattr(self, 'name'):
self.name = ''
if not hasattr(self, 'content'):
self.content = ''
class MISPEvent(AbstractMISP):
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
@ -1014,6 +1075,7 @@ class MISPEvent(AbstractMISP):
self.RelatedEvent: List[MISPEvent] = []
self.ShadowAttribute: List[MISPShadowAttribute] = []
self.SharingGroup: MISPSharingGroup
self.EventReport: List[MISPEventReport] = []
self.Tag: List[MISPTag] = []
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag:
@ -1158,6 +1220,10 @@ class MISPEvent(AbstractMISP):
else:
raise PyMISPError('All the attributes have to be of type MISPAttribute.')
@property
def event_reports(self) -> List[MISPEventReport]:
return self.EventReport
@property
def shadow_attributes(self) -> List[MISPShadowAttribute]:
return self.ShadowAttribute
@ -1281,6 +1347,8 @@ class MISPEvent(AbstractMISP):
self.set_date(kwargs.pop('date'))
if kwargs.get('Attribute'):
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
if kwargs.get('EventReport'):
[self.add_event_report(**e) for e in kwargs.pop('EventReport')]
# All other keys
if kwargs.get('id'):
@ -1421,6 +1489,14 @@ class MISPEvent(AbstractMISP):
return attr_list
return attribute
def add_event_report(self, name: str, content: str, **kwargs) -> MISPEventReport:
"""Add an event report. name and value are requred but you can pass all
other parameters supported by MISPEventReport"""
event_report = MISPEventReport()
event_report.from_dict(name=name, content=content, **kwargs)
self.event_reports.append(event_report)
return event_report
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
"""Get an object by ID