mirror of https://github.com/MISP/PyMISP
parent
1de4c9d0b9
commit
99b2052449
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet
|
||||
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet
|
||||
from .api import PyMISP, everything_broken
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject
|
||||
from typing import TypeVar, Optional, Tuple, List, Dict, Union
|
||||
|
@ -100,7 +100,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
def add_object(self, event_id: int, misp_object: MISPObject):
|
||||
created_object = super().add_object(event_id, misp_object)
|
||||
if isinstance(created_object, str):
|
||||
raise NewEventError(f'Unexpected response from server: {created_object}')
|
||||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {created_object}')
|
||||
elif 'errors' in created_object:
|
||||
return created_object
|
||||
o = MISPObject(misp_object.name)
|
||||
|
@ -110,17 +110,31 @@ class ExpandedPyMISP(PyMISP):
|
|||
def update_object(self, misp_object: MISPObject):
|
||||
updated_object = super().edit_object(misp_object)
|
||||
if isinstance(updated_object, str):
|
||||
raise NewEventError(f'Unexpected response from server: {updated_object}')
|
||||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {updated_object}')
|
||||
elif 'errors' in updated_object:
|
||||
return updated_object
|
||||
o = MISPObject(misp_object.name)
|
||||
o.from_dict(**updated_object)
|
||||
return o
|
||||
|
||||
def get_object(self, object_id: int):
|
||||
"""Get an object
|
||||
|
||||
:param obj_id: Object id to get
|
||||
"""
|
||||
misp_object = super().get_object(object_id)
|
||||
if isinstance(misp_object, str):
|
||||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {misp_object}')
|
||||
elif 'errors' in misp_object:
|
||||
return misp_object
|
||||
o = MISPObject(misp_object['Object']['name'])
|
||||
o.from_dict(**misp_object)
|
||||
return o
|
||||
|
||||
def add_event(self, event: MISPEvent):
|
||||
created_event = super().add_event(event)
|
||||
if isinstance(created_event, str):
|
||||
raise NewEventError(f'Unexpected response from server: {created_event}')
|
||||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {created_event}')
|
||||
elif 'errors' in created_event:
|
||||
return created_event
|
||||
e = MISPEvent()
|
||||
|
@ -130,7 +144,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
def update_event(self, event: MISPEvent):
|
||||
updated_event = super().update_event(event.uuid, event)
|
||||
if isinstance(updated_event, str):
|
||||
raise UpdateEventError(f'Unexpected response from server: {updated_event}')
|
||||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {updated_event}')
|
||||
elif 'errors' in updated_event:
|
||||
return updated_event
|
||||
e = MISPEvent()
|
||||
|
@ -140,7 +154,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
def update_attribute(self, attribute: MISPAttribute):
|
||||
updated_attribute = super().update_attribute(attribute.uuid, attribute)
|
||||
if isinstance(updated_attribute, str):
|
||||
raise UpdateAttributeError(f'Unexpected response from server: {updated_attribute}')
|
||||
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {updated_attribute}')
|
||||
elif 'errors' in updated_attribute:
|
||||
return updated_attribute
|
||||
a = MISPAttribute()
|
||||
|
|
|
@ -911,10 +911,16 @@ class TestComprehensive(unittest.TestCase):
|
|||
ip_dom.add_attribute('ip', value='8.8.8.8')
|
||||
first.add_object(ip_dom)
|
||||
try:
|
||||
# Update with full event
|
||||
first = self.user_misp_connector.add_event(first)
|
||||
first.objects[0].add_attribute('ip', value='8.9.9.8')
|
||||
first = self.user_misp_connector.update_event(first)
|
||||
self.assertEqual(first.objects[0].attributes[2].value, '8.9.9.8')
|
||||
# Update object only
|
||||
misp_object = self.user_misp_connector.get_object(first.objects[0].id)
|
||||
misp_object.attributes[2].value = '8.9.9.9'
|
||||
misp_object = self.user_misp_connector.update_object(misp_object)
|
||||
self.assertEqual(misp_object.attributes[2].value, '8.9.9.9')
|
||||
finally:
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first.id)
|
||||
|
|
Loading…
Reference in New Issue