mirror of https://github.com/MISP/PyMISP
Merge branch 'tomking2-feature/event_report' into main
commit
82ca4ff043
|
@ -24,7 +24,7 @@ Response (if any):
|
|||
try:
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport # noqa
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
from .tools import Neo4j # noqa
|
||||
from .tools import stix # noqa
|
||||
|
|
|
@ -23,7 +23,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
|
|||
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
|
||||
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
|
||||
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport
|
||||
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
||||
|
||||
SearchType = TypeVar('SearchType', str, int)
|
||||
|
@ -391,6 +391,92 @@ class PyMISP:
|
|||
|
||||
# ## END Event ###
|
||||
|
||||
# ## BEGIN Event Report ###
|
||||
|
||||
def get_event_report(self, event_report: Union[MISPEventReport, int, str, UUID],
|
||||
pythonify: bool = False) -> Union[Dict, MISPEventReport]:
|
||||
"""Get an event report from a MISP instance
|
||||
|
||||
:param event_report: event report to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
|
||||
r = self._prepare_request('GET', f'eventReports/view/{event_report_id}')
|
||||
event_report_r = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in event_report_r:
|
||||
return event_report_r
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**event_report_r)
|
||||
return er
|
||||
|
||||
def get_event_reports(self, event_id: Union[int, str],
|
||||
pythonify: bool = False) -> Union[Dict, List[MISPEventReport]]:
|
||||
"""Get event report from a MISP instance that are attached to an event ID
|
||||
|
||||
:param event_id: event id to get the event reports for
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output.
|
||||
"""
|
||||
r = self._prepare_request('GET', f'eventReports/index/event_id:{event_id}')
|
||||
event_reports = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in event_reports:
|
||||
return event_reports
|
||||
to_return = []
|
||||
for event_report in event_reports:
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**event_report)
|
||||
to_return.append(er)
|
||||
return to_return
|
||||
|
||||
def add_event_report(self, event: Union[MISPEvent, int, str, UUID], event_report: MISPEventReport, pythonify: bool = False) -> Union[Dict, MISPEventReport]:
|
||||
"""Add an event report to an existing MISP event
|
||||
|
||||
:param event: event to extend
|
||||
:param event_report: event report to add.
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||
r = self._prepare_request('POST', f'eventReports/add/{event_id}', data=event_report)
|
||||
new_event_report = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_event_report:
|
||||
return new_event_report
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**new_event_report)
|
||||
return er
|
||||
|
||||
def update_event_report(self, event_report: MISPEventReport, event_report_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventReport]:
|
||||
"""Update an event report on a MISP instance
|
||||
|
||||
:param event_report: event report to update
|
||||
:param event_report_id: event report ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
if event_report_id is None:
|
||||
erid = get_uuid_or_id_from_abstract_misp(event_report)
|
||||
else:
|
||||
erid = get_uuid_or_id_from_abstract_misp(event_report_id)
|
||||
r = self._prepare_request('POST', f'eventReports/edit/{erid}', data=event_report)
|
||||
updated_event_report = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in updated_event_report:
|
||||
return updated_event_report
|
||||
er = MISPEventReport()
|
||||
er.from_dict(**updated_event_report)
|
||||
return er
|
||||
|
||||
def delete_event_report(self, event_report: Union[MISPEventReport, int, str, UUID], hard: bool = False) -> Dict:
|
||||
"""Delete an event report from a MISP instance
|
||||
|
||||
:param event_report: event report to delete
|
||||
:param hard: flag for hard delete
|
||||
"""
|
||||
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
|
||||
request_url = f'eventReports/delete/{event_report_id}'
|
||||
if hard:
|
||||
request_url += "/1"
|
||||
r = self._prepare_request('POST', request_url)
|
||||
return self._check_json_response(r)
|
||||
|
||||
# ## END Event Report ###
|
||||
|
||||
# ## BEGIN Object ###
|
||||
|
||||
def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPObject]:
|
||||
|
|
|
@ -19,6 +19,10 @@ class NewAttributeError(PyMISPError):
|
|||
pass
|
||||
|
||||
|
||||
class NewEventReportError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class UpdateAttributeError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ from pathlib import Path
|
|||
from typing import List, Optional, Union, IO, Dict, Any
|
||||
|
||||
from .abstract import AbstractMISP, MISPTag
|
||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
|
||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError
|
||||
|
||||
logger = logging.getLogger('pymisp')
|
||||
|
||||
|
@ -991,6 +991,68 @@ class MISPObject(AbstractMISP):
|
|||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPEventReport(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if 'EventReport' in kwargs:
|
||||
kwargs = kwargs['EventReport']
|
||||
|
||||
self.distribution = kwargs.pop('distribution', None)
|
||||
if self.distribution is not None:
|
||||
self.distribution = int(self.distribution)
|
||||
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
||||
raise NewEventReportError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution))
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewEventReportError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
|
||||
|
||||
self.name = kwargs.pop('name', None)
|
||||
if self.name is None:
|
||||
raise NewEventReportError('The name of the event report is required.')
|
||||
|
||||
self.content = kwargs.pop('content', None)
|
||||
if self.content is None:
|
||||
raise NewAttributeError('The content of the event report is required.')
|
||||
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('event_id'):
|
||||
self.event_id = int(kwargs.pop('event_id'))
|
||||
if kwargs.get('timestamp'):
|
||||
ts = kwargs.pop('timestamp')
|
||||
if isinstance(ts, datetime):
|
||||
self.timestamp = ts
|
||||
else:
|
||||
self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc)
|
||||
if kwargs.get('deleted'):
|
||||
self.deleted = kwargs.pop('deleted')
|
||||
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'name'):
|
||||
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
def _set_default(self):
|
||||
if not hasattr(self, 'timestamp'):
|
||||
self.timestamp = datetime.timestamp(datetime.now())
|
||||
if not hasattr(self, 'name'):
|
||||
self.name = ''
|
||||
if not hasattr(self, 'content'):
|
||||
self.content = ''
|
||||
|
||||
|
||||
class MISPEvent(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||
|
@ -1014,6 +1076,7 @@ class MISPEvent(AbstractMISP):
|
|||
self.RelatedEvent: List[MISPEvent] = []
|
||||
self.ShadowAttribute: List[MISPShadowAttribute] = []
|
||||
self.SharingGroup: MISPSharingGroup
|
||||
self.EventReport: List[MISPEventReport] = []
|
||||
self.Tag: List[MISPTag] = []
|
||||
|
||||
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag:
|
||||
|
@ -1158,6 +1221,10 @@ class MISPEvent(AbstractMISP):
|
|||
else:
|
||||
raise PyMISPError('All the attributes have to be of type MISPAttribute.')
|
||||
|
||||
@property
|
||||
def event_reports(self) -> List[MISPEventReport]:
|
||||
return self.EventReport
|
||||
|
||||
@property
|
||||
def shadow_attributes(self) -> List[MISPShadowAttribute]:
|
||||
return self.ShadowAttribute
|
||||
|
@ -1281,6 +1348,8 @@ class MISPEvent(AbstractMISP):
|
|||
self.set_date(kwargs.pop('date'))
|
||||
if kwargs.get('Attribute'):
|
||||
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
|
||||
if kwargs.get('EventReport'):
|
||||
[self.add_event_report(**e) for e in kwargs.pop('EventReport')]
|
||||
|
||||
# All other keys
|
||||
if kwargs.get('id'):
|
||||
|
@ -1315,6 +1384,7 @@ class MISPEvent(AbstractMISP):
|
|||
if kwargs.get('SharingGroup'):
|
||||
self.SharingGroup = MISPSharingGroup()
|
||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||
|
||||
super(MISPEvent, self).from_dict(**kwargs)
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
|
@ -1421,6 +1491,15 @@ class MISPEvent(AbstractMISP):
|
|||
return attr_list
|
||||
return attribute
|
||||
|
||||
def add_event_report(self, name: str, content: str, **kwargs) -> MISPEventReport:
|
||||
"""Add an event report. name and value are requred but you can pass all
|
||||
other parameters supported by MISPEventReport"""
|
||||
event_report = MISPEventReport()
|
||||
event_report.from_dict(name=name, content=content, **kwargs)
|
||||
self.event_reports.append(event_report)
|
||||
self.edited = True
|
||||
return event_report
|
||||
|
||||
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
|
||||
"""Get an object by ID
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ logger = logging.getLogger('pymisp')
|
|||
|
||||
|
||||
try:
|
||||
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist
|
||||
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport
|
||||
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
|
||||
from pymisp.exceptions import MISPServerError
|
||||
except ImportError:
|
||||
|
@ -2652,6 +2652,44 @@ class TestComprehensive(unittest.TestCase):
|
|||
for blo in to_delete['bl_organisations']:
|
||||
self.admin_misp_connector.delete_organisation_blocklist(blo)
|
||||
|
||||
def test_event_report(self):
|
||||
event = self.create_simple_event()
|
||||
new_event_report = MISPEventReport()
|
||||
new_event_report.name = "Test Event Report"
|
||||
new_event_report.content = "# Example report markdown"
|
||||
new_event_report.distribution = 5 # Inherit
|
||||
try:
|
||||
event = self.user_misp_connector.add_event(event)
|
||||
new_event_report = self.user_misp_connector.add_event_report(event.id, new_event_report)
|
||||
# The event report should be linked by Event ID
|
||||
self.assertEqual(event.id, new_event_report.event_id)
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Event Report should be present on the event
|
||||
self.assertEqual(new_event_report.id, event.event_reports[0].id)
|
||||
|
||||
new_event_report.name = "Updated Event Report"
|
||||
new_event_report.content = "Updated content"
|
||||
new_event_report = self.user_misp_connector.update_event_report(new_event_report)
|
||||
# The event report should be updatable
|
||||
self.assertTrue(new_event_report.name == "Updated Event Report")
|
||||
self.assertTrue(new_event_report.content == "Updated content")
|
||||
|
||||
event_reports = self.user_misp_connector.get_event_reports(event.id)
|
||||
# The event report should be requestable by the Event ID
|
||||
self.assertEqual(new_event_report.id, event_reports[0].id)
|
||||
|
||||
response = self.user_misp_connector.delete_event_report(new_event_report)
|
||||
# The event report should be soft-deletable
|
||||
self.assertTrue(response['success'])
|
||||
self.assertEqual(response['name'], f'Event Report {new_event_report.uuid} soft deleted')
|
||||
|
||||
response = self.user_misp_connector.delete_event_report(new_event_report, True)
|
||||
self.assertTrue(response['success'])
|
||||
finally:
|
||||
self.user_misp_connector.delete_event(event)
|
||||
self.user_misp_connector.delete_event_report(new_event_report)
|
||||
|
||||
@unittest.skip("Internal use only")
|
||||
def missing_methods(self):
|
||||
skip = [
|
||||
|
|
Loading…
Reference in New Issue