mirror of https://github.com/MISP/PyMISP
new: Add in ability to create/update/delete MISP Event Reports
parent
de6125a623
commit
12c29e6a06
|
@ -24,7 +24,7 @@ Response (if any):
|
|||
try:
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport # noqa
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
from .tools import Neo4j # noqa
|
||||
from .tools import stix # noqa
|
||||
|
|
|
@ -23,7 +23,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
|
|||
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
|
||||
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
|
||||
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport
|
||||
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
||||
|
||||
SearchType = TypeVar('SearchType', str, int)
|
||||
|
@ -389,6 +389,75 @@ class PyMISP:
|
|||
|
||||
# ## END Event ###
|
||||
|
||||
# ## BEGIN Event Report ###
|
||||
|
||||
def get_event_report(self, event_report: Union[MISPEventReport, int, str, UUID],
|
||||
pythonify: bool = False) -> Union[Dict, MISPEventReport]:
|
||||
"""Get an event report from a MISP instance
|
||||
|
||||
:param event_report: event report to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
|
||||
r = self._prepare_request('GET', f'eventReports/view/{event_report_id}')
|
||||
event_report_r = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in event_report_r:
|
||||
return event_report_r
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**event_report_r)
|
||||
return er
|
||||
|
||||
def add_event_report(self, event: Union[MISPEvent, int, str, UUID], event_report: MISPEventReport, pythonify: bool = False) -> Union[Dict, MISPEventReport]:
|
||||
"""Add an event report to an existing MISP event
|
||||
|
||||
:param event: event to extend
|
||||
:param event_report: event report to add.
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||
r = self._prepare_request('POST', f'eventReports/add/{event_id}', data=event_report)
|
||||
new_event_report = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_event_report:
|
||||
return new_event_report
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**new_event_report)
|
||||
return er
|
||||
|
||||
def update_event_report(self, event_report: MISPEventReport, event_report_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventReport]:
|
||||
"""Update an event report on a MISP instance
|
||||
|
||||
:param event_report: event report to update
|
||||
:param event_report_id: event report ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
if event_report_id is None:
|
||||
erid = get_uuid_or_id_from_abstract_misp(event_report)
|
||||
else:
|
||||
erid = get_uuid_or_id_from_abstract_misp(event_report_id)
|
||||
r = self._prepare_request('POST', f'eventReports/edit/{erid}', data=event_report)
|
||||
updated_event_report = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in updated_event_report:
|
||||
return updated_event_report
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**updated_event_report)
|
||||
return er
|
||||
|
||||
def delete_event_report(self, event_report: Union[MISPEventReport, int, str, UUID], hard: bool = False) -> Dict:
|
||||
"""Delete an event report from a MISP instance
|
||||
|
||||
:param event_report: event report to delete
|
||||
:param hard: flag for hard delete
|
||||
"""
|
||||
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
|
||||
request_url = f'eventReports/delete/{event_report_id}'
|
||||
if hard:
|
||||
request_url += "/1"
|
||||
r = self._prepare_request('POST', request_url)
|
||||
response = self._check_json_response(r)
|
||||
return response
|
||||
|
||||
# ## END Event Report ###
|
||||
|
||||
# ## BEGIN Object ###
|
||||
|
||||
def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPObject]:
|
||||
|
|
|
@ -19,6 +19,10 @@ class NewAttributeError(PyMISPError):
|
|||
pass
|
||||
|
||||
|
||||
class NewEventReportError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class UpdateAttributeError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ from pathlib import Path
|
|||
from typing import List, Optional, Union, IO, Dict, Any
|
||||
|
||||
from .abstract import AbstractMISP, MISPTag
|
||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
|
||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError
|
||||
|
||||
logger = logging.getLogger('pymisp')
|
||||
|
||||
|
@ -982,6 +982,67 @@ class MISPObject(AbstractMISP):
|
|||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPEventReport(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if 'EventReport' in kwargs:
|
||||
kwargs = kwargs['EventReport']
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
self.distribution = kwargs.pop('distribution', None)
|
||||
if self.distribution is not None:
|
||||
self.distribution = int(self.distribution)
|
||||
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
||||
raise NewEventReportError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution))
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
|
||||
|
||||
self.name = kwargs.pop('name', None)
|
||||
if self.name is None:
|
||||
raise NewEventReportError('The name of the event report is required.')
|
||||
|
||||
self.content = kwargs.pop('content', None)
|
||||
if self.content is None:
|
||||
raise NewAttributeError('The content of the event report is required.')
|
||||
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('event_id'):
|
||||
self.event_id = int(kwargs.pop('event_id'))
|
||||
if kwargs.get('timestamp'):
|
||||
ts = kwargs.pop('timestamp')
|
||||
if isinstance(ts, datetime):
|
||||
self.timestamp = ts
|
||||
else:
|
||||
self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc)
|
||||
if kwargs.get('deleted'):
|
||||
self.deleted = kwargs.pop('deleted')
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'name'):
|
||||
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
def _set_default(self):
|
||||
if not hasattr(self, 'timestamp'):
|
||||
self.timestamp = datetime.timestamp(datetime.now())
|
||||
if not hasattr(self, 'name'):
|
||||
self.name = ''
|
||||
if not hasattr(self, 'content'):
|
||||
self.content = ''
|
||||
|
||||
|
||||
class MISPEvent(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||
|
@ -1005,6 +1066,7 @@ class MISPEvent(AbstractMISP):
|
|||
self.RelatedEvent: List[MISPEvent] = []
|
||||
self.ShadowAttribute: List[MISPShadowAttribute] = []
|
||||
self.SharingGroup: MISPSharingGroup
|
||||
self.EventReport: List[MISPEventReport] = []
|
||||
self.Tag: List[MISPTag] = []
|
||||
|
||||
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag:
|
||||
|
@ -1149,6 +1211,10 @@ class MISPEvent(AbstractMISP):
|
|||
else:
|
||||
raise PyMISPError('All the attributes have to be of type MISPAttribute.')
|
||||
|
||||
@property
|
||||
def event_reports(self) -> List[MISPEventReport]:
|
||||
return self.EventReport
|
||||
|
||||
@property
|
||||
def shadow_attributes(self) -> List[MISPShadowAttribute]:
|
||||
return self.ShadowAttribute
|
||||
|
@ -1272,6 +1338,8 @@ class MISPEvent(AbstractMISP):
|
|||
self.set_date(kwargs.pop('date'))
|
||||
if kwargs.get('Attribute'):
|
||||
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
|
||||
if kwargs.get('EventReport'):
|
||||
[self.add_event_report(**e) for e in kwargs.pop('EventReport')]
|
||||
|
||||
# All other keys
|
||||
if kwargs.get('id'):
|
||||
|
@ -1412,6 +1480,14 @@ class MISPEvent(AbstractMISP):
|
|||
return attr_list
|
||||
return attribute
|
||||
|
||||
def add_event_report(self, name: str, content: str, **kwargs) -> MISPEventReport:
|
||||
"""Add an event report. name and value are requred but you can pass all
|
||||
other parameters supported by MISPEventReport"""
|
||||
event_report = MISPEventReport()
|
||||
event_report.from_dict(name=name, content=content, **kwargs)
|
||||
self.event_reports.append(event_report)
|
||||
return event_report
|
||||
|
||||
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
|
||||
"""Get an object by ID
|
||||
|
||||
|
|
Loading…
Reference in New Issue