mirror of https://github.com/MISP/PyMISP
chg: [analyst-data] Added improvements, API endpoints and tests
parent
d03cea7a67
commit
56be46320e
|
@ -147,6 +147,19 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): # type: i
|
|||
treatment are processed.
|
||||
Note: This method is used when you initialize an object with existing data so by default,
|
||||
the class is flaged as not edited."""
|
||||
# Recursively loads more analyst data
|
||||
from pymisp.mispevent import AnalystDataBehaviorMixin, MISPNote, MISPOpinion, MISPRelationship
|
||||
if isinstance(self, AnalystDataBehaviorMixin):
|
||||
for analystType in ['Note', 'Opinion', 'Relationship']:
|
||||
if kwargs.get(analystType):
|
||||
analystDataList = kwargs.pop(analystType)
|
||||
for analystDataDict in analystDataList:
|
||||
analystData = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(analystType, MISPNote)()
|
||||
analystDataDict['object_uuid'] = self.uuid if 'object_uuid' not in analystDataDict else analystDataDict['object_uuid']
|
||||
analystDataDict['object_type'] = self.classObjectType
|
||||
analystData.from_dict(**analystDataDict)
|
||||
{'Note': self.notes, 'Opinion': self.opinions, 'Relationship': self.relationships}.get(analystType, 'Note').append(analystData)
|
||||
|
||||
for prop, value in kwargs.items():
|
||||
if value is None:
|
||||
continue
|
||||
|
|
183
pymisp/api.py
183
pymisp/api.py
|
@ -31,7 +31,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
|
|||
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
|
||||
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
|
||||
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
|
||||
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
|
||||
MISPNote, MISPOpinion, MISPRelationship
|
||||
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
||||
|
||||
|
||||
|
@ -587,6 +588,186 @@ class PyMISP:
|
|||
|
||||
# ## END Event Report ###
|
||||
|
||||
# ## BEGIN Analyst Data ###
|
||||
def get_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID,
|
||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||
"""Get an analyst data from a MISP instance
|
||||
|
||||
:param analyst_data: analyst data to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
type = analyst_data.classObjectType
|
||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||
r = self._prepare_request('GET', f'analyst_data/view/{type}/{analyst_data_id}')
|
||||
analyst_data_r = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r:
|
||||
return analyst_data_r
|
||||
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
|
||||
er.from_dict(**analyst_data_r)
|
||||
return er
|
||||
|
||||
def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship,
|
||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||
"""Add an analyst data to an existing MISP element
|
||||
|
||||
:param analyst_data: analyst_data to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
type = analyst_data.classObjectType
|
||||
object_uuid = analyst_data.object_uuid
|
||||
object_type = analyst_data.object_type
|
||||
r = self._prepare_request('POST', f'analyst_data/add/{type}/{object_uuid}/{object_type}', data=analyst_data)
|
||||
new_analyst_data = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
|
||||
return new_analyst_data
|
||||
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
|
||||
er.from_dict(**new_analyst_data)
|
||||
return er
|
||||
|
||||
def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None,
|
||||
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
|
||||
"""Update an analyst data on a MISP instance
|
||||
|
||||
:param analyst_data: analyst data to update
|
||||
:param analyst_data_id: analyst data ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
type = analyst_data.classObjectType
|
||||
if analyst_data_id is None:
|
||||
adid = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||
else:
|
||||
adid = get_uuid_or_id_from_abstract_misp(analyst_data_id)
|
||||
r = self._prepare_request('POST', f'analyst_data/edit/{type}/{adid}', data=analyst_data)
|
||||
updated_analyst_data = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data:
|
||||
return updated_analyst_data
|
||||
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
|
||||
er.from_dict(**updated_analyst_data)
|
||||
return er
|
||||
|
||||
def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete an analyst data from a MISP instance
|
||||
|
||||
:param analyst_data: analyst data to delete
|
||||
"""
|
||||
type = analyst_data.classObjectType
|
||||
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
|
||||
request_url = f'analyst_data/delete/{type}/{analyst_data_id}'
|
||||
data = {}
|
||||
r = self._prepare_request('POST', request_url, data=data)
|
||||
return self._check_json_response(r)
|
||||
|
||||
# ## END Analyst Data ###
|
||||
|
||||
# ## BEGIN Analyst Note ###
|
||||
|
||||
def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
|
||||
"""Get a note from a MISP instance
|
||||
|
||||
:param note: note to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
return self.get_analyst_data(note, pythonify)
|
||||
|
||||
def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
|
||||
"""Add a note to an existing MISP element
|
||||
|
||||
:param note: note to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.add_analyst_data(note, pythonify)
|
||||
|
||||
def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote:
|
||||
"""Update a note on a MISP instance
|
||||
|
||||
:param note: note to update
|
||||
:param note_id: note ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.update_analyst_data(note, note_id, pythonify)
|
||||
|
||||
def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete a note from a MISP instance
|
||||
|
||||
:param note: note delete
|
||||
"""
|
||||
return self.delete_analyst_data(note)
|
||||
|
||||
# ## END Analyst Note ###
|
||||
|
||||
# ## BEGIN Analyst Opinion ###
|
||||
|
||||
def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
|
||||
"""Get an opinion from a MISP instance
|
||||
|
||||
:param opinion: opinion to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
return self.get_analyst_data(opinion, pythonify)
|
||||
|
||||
def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
|
||||
"""Add an opinion to an existing MISP element
|
||||
|
||||
:param opinion: opinion to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.add_analyst_data(opinion, pythonify)
|
||||
|
||||
def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
|
||||
"""Update an opinion on a MISP instance
|
||||
|
||||
:param opinion: opinion to update
|
||||
:param opinion_id: opinion ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.update_analyst_data(opinion, opinion_id, pythonify)
|
||||
|
||||
def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete an opinion from a MISP instance
|
||||
|
||||
:param opinion: opinion to delete
|
||||
"""
|
||||
return self.delete_analyst_data(opinion)
|
||||
|
||||
# ## END Analyst Opinion ###
|
||||
|
||||
# ## BEGIN Analyst Relationship ###
|
||||
|
||||
def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
|
||||
"""Get a relationship from a MISP instance
|
||||
|
||||
:param relationship: relationship to get
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
return self.get_analyst_data(relationship, pythonify)
|
||||
|
||||
def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
|
||||
"""Add a relationship to an existing MISP element
|
||||
|
||||
:param relationship: relationship to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.add_analyst_data(relationship, pythonify)
|
||||
|
||||
def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
|
||||
"""Update a relationship on a MISP instance
|
||||
|
||||
:param relationship: relationship to update
|
||||
:param relationship_id: relationship ID to update
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
return self.update_analyst_data(relationship, relationship_id, pythonify)
|
||||
|
||||
def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
|
||||
"""Delete a relationship from a MISP instance
|
||||
|
||||
:param relationship: relationship to delete
|
||||
"""
|
||||
return self.delete_analyst_data(relationship)
|
||||
|
||||
# ## END Analyst Relationship ###
|
||||
|
||||
|
||||
# ## BEGIN Object ###
|
||||
|
||||
def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:
|
||||
|
|
|
@ -2425,9 +2425,6 @@ class MISPAnalystData(AbstractMISP):
|
|||
self.note_type_name = self.classObjectType
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'Note' in kwargs:
|
||||
kwargs = kwargs['Note']
|
||||
|
||||
self.distribution = kwargs.pop('distribution', None)
|
||||
if self.distribution is not None:
|
||||
self.distribution = int(self.distribution)
|
||||
|
@ -2447,10 +2444,10 @@ class MISPAnalystData(AbstractMISP):
|
|||
|
||||
self.object_uuid = kwargs.pop('object_uuid', None)
|
||||
if self.object_uuid is None:
|
||||
raise NewAnalystDataError('The UUID for which this note is attached is required.')
|
||||
raise NewAnalystDataError('The UUID for which this element is attached is required.')
|
||||
self.object_type = kwargs.pop('object_type', None)
|
||||
if self.object_type is None:
|
||||
raise NewAnalystDataError('The element type for which this note is attached is required.')
|
||||
raise NewAnalystDataError('The element type for which this element is attached is required.')
|
||||
if self.object_type not in self.valid_object_type:
|
||||
raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.')
|
||||
|
||||
|
@ -2461,13 +2458,13 @@ class MISPAnalystData(AbstractMISP):
|
|||
if isinstance(ts, datetime):
|
||||
self.created = ts
|
||||
else:
|
||||
self.created = datetime.fromtimestamp(int(ts), timezone.utc)
|
||||
self.created = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ
|
||||
if kwargs.get('modified'):
|
||||
ts = kwargs.pop('modified')
|
||||
if isinstance(ts, datetime):
|
||||
self.modified = ts
|
||||
else:
|
||||
self.modified = datetime.fromtimestamp(int(ts), timezone.utc)
|
||||
self.modified = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ
|
||||
|
||||
if kwargs.get('Org'):
|
||||
self.Org = MISPOrganisation()
|
||||
|
@ -2500,6 +2497,8 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
|
|||
super().__init__(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'Note' in kwargs:
|
||||
kwargs = kwargs['Note']
|
||||
self.note = kwargs.pop('note', None)
|
||||
if self.note is None:
|
||||
raise NewNoteError('The text note of the note is required.')
|
||||
|
@ -2524,6 +2523,8 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
|
|||
super().__init__(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
if 'Opinion' in kwargs:
|
||||
kwargs = kwargs['Opinion']
|
||||
self.opinion = kwargs.pop('opinion', None)
|
||||
if self.opinion is not None:
|
||||
self.opinion = int(self.opinion)
|
||||
|
@ -2557,7 +2558,8 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
|
|||
super().__init__(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
|
||||
|
||||
if 'Relationship' in kwargs:
|
||||
kwargs = kwargs['Relationship']
|
||||
self.related_object_type = kwargs.pop('related_object_type', None)
|
||||
if self.related_object_type is None:
|
||||
raise NewRelationshipError('The target object type for this relationship is required.')
|
||||
|
|
|
@ -25,7 +25,8 @@ try:
|
|||
MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag,
|
||||
MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting,
|
||||
MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster,
|
||||
MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist)
|
||||
MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist,
|
||||
MISPNote, MISPOpinion, MISPRelationship)
|
||||
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
|
||||
except ImportError:
|
||||
raise
|
||||
|
@ -3197,6 +3198,140 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.admin_misp_connector.delete_event(event)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
||||
def test_analyst_data_CRUD(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
try:
|
||||
fake_uuid = str(uuid4())
|
||||
new_note1 = MISPNote()
|
||||
new_note1.object_type = 'Event'
|
||||
new_note1.object_uuid = fake_uuid
|
||||
new_note1.note = 'Fake note'
|
||||
new_note1 = self.user_misp_connector.add_note(new_note1)
|
||||
# The Note should be linked even for non-existing data
|
||||
self.assertTrue(new_note1.object_uuid == fake_uuid)
|
||||
|
||||
new_note1.note = "Updated Note"
|
||||
new_note1 = self.user_misp_connector.update_note(new_note1)
|
||||
# The Note should be updatable
|
||||
self.assertTrue(new_note1.note == "Updated Note")
|
||||
|
||||
# The Note should be able to get an Opinion
|
||||
new_opinion = new_note1.add_opinion(42, 'Test Opinion')
|
||||
new_note1 = self.user_misp_connector.update_note(new_note1)
|
||||
# Fetch newly added node
|
||||
new_note1 = self.user_misp_connector.get_note(new_note1)
|
||||
# The Opinion shoud be able to be created via the Note
|
||||
self.assertTrue(new_note1.Opinion[0].opinion == new_opinion.opinion)
|
||||
|
||||
response = self.user_misp_connector.delete_note(new_note1)
|
||||
# The Note should be deletable
|
||||
self.assertTrue(response['success'])
|
||||
self.assertEqual(response['message'], f'Note deleted.')
|
||||
# The Opinion should not be deleted
|
||||
opinion_resp = self.user_misp_connector.get_opinion(new_opinion)
|
||||
self.assertTrue(opinion_resp.opinion == new_opinion.opinion)
|
||||
|
||||
new_note: MISPNote = event.add_note(note='Test Note', language='en')
|
||||
new_note.distribution = 1 # Community
|
||||
event = self.user_misp_connector.add_event(event)
|
||||
# The note should be linked by Event UUID
|
||||
self.assertEqual(new_note.object_type, 'Event')
|
||||
self.assertTrue(new_note.object_uuid == event.uuid)
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note should be present on the event
|
||||
self.assertTrue(event.Note[0].object_uuid == event.uuid)
|
||||
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
try:
|
||||
self.admin_misp_connector.delete_opinion(new_opinion)
|
||||
self.admin_misp_connector.delete_note(new_note)
|
||||
self.admin_misp_connector.delete_note(new_note1) # Should already be deleted
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_analyst_data_ACL(self) -> None:
|
||||
event = self.create_simple_event()
|
||||
event.distribution = 2
|
||||
sg = MISPSharingGroup()
|
||||
sg.name = 'Testcases SG'
|
||||
sg.releasability = 'Testing'
|
||||
sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True)
|
||||
# Chec that sharing group was created
|
||||
self.assertEqual(sharing_group.name, 'Testcases SG')
|
||||
|
||||
try:
|
||||
new_note: MISPNote = event.add_note(note='Test Note', language='en')
|
||||
new_note.distribution = 0 # Org only
|
||||
event = self.admin_misp_connector.add_event(event)
|
||||
|
||||
# The note should be linked by Event UUID
|
||||
self.assertEqual(new_note.object_type, 'Event')
|
||||
self.assertEqual(event.uuid, new_note.object_uuid)
|
||||
|
||||
event = self.admin_misp_connector.get_event(event)
|
||||
# The note should be visible for the creator
|
||||
self.assertEqual(len(event.Note), 1)
|
||||
self.assertTrue(new_note.note == "Test Note")
|
||||
|
||||
resp = self.user_misp_connector.get_note(new_note)
|
||||
# The note should not be visible to another org
|
||||
self.assertTrue(len(resp), 0)
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note attached to the event should not be visible for another org than the creator
|
||||
self.assertEqual(len(event.Note), 0)
|
||||
|
||||
new_note = self.admin_misp_connector.get_note(new_note)
|
||||
new_note.distribution = 4
|
||||
new_note.sharing_group_id = sharing_group.id
|
||||
new_note = self.admin_misp_connector.update_note(new_note)
|
||||
self.assertEqual(int(new_note.sharing_group_id), int(sharing_group.id))
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note attached to the event should not be visible for another org not part of the sharing group
|
||||
self.assertEqual(len(event.Note), 0)
|
||||
|
||||
# Add org to the sharing group
|
||||
r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group,
|
||||
self.test_org, extend=True)
|
||||
self.assertEqual(r['name'], 'Organisation added to the sharing group.')
|
||||
|
||||
event = self.user_misp_connector.get_event(event)
|
||||
# The Note attached to the event should now be visible
|
||||
self.assertEqual(len(event.Note), 1)
|
||||
|
||||
new_note.note = "Updated Note"
|
||||
resp = self.user_misp_connector.update_note(new_note)
|
||||
# The Note should not be updatable by another organisation
|
||||
self.assertTrue(resp['errors'])
|
||||
|
||||
resp = self.user_misp_connector.delete_note(new_note)
|
||||
# The Note should not be deletable by another organisation
|
||||
self.assertTrue(resp['errors'])
|
||||
|
||||
organisation = MISPOrganisation()
|
||||
organisation.name = 'Fake Org'
|
||||
fake_org = self.admin_misp_connector.add_organisation(organisation, pythonify=True)
|
||||
new_note_2 = new_note.add_note('Test Note 2')
|
||||
new_note_2.orgc_uuid = fake_org.uuid
|
||||
new_note_2 = self.user_misp_connector.add_note(new_note_2)
|
||||
# Regular user should not be able to create a note on behalf of another organisation
|
||||
self.assertFalse(new_note_2.orgc_uuid == fake_org.uuid)
|
||||
# Note should have the orgc set to the use's organisation for non-privileged users
|
||||
self.assertTrue(new_note_2.orgc_uuid == self.test_org.uuid)
|
||||
|
||||
finally:
|
||||
self.admin_misp_connector.delete_event(event)
|
||||
try:
|
||||
pass
|
||||
self.admin_misp_connector.delete_sharing_group(sharing_group.id)
|
||||
self.admin_misp_connector.delete_organisation(fake_org)
|
||||
self.admin_misp_connector.delete_note(new_note)
|
||||
except:
|
||||
pass
|
||||
|
||||
@unittest.skip("Internal use only")
|
||||
def missing_methods(self) -> None:
|
||||
skip = [
|
||||
|
|
Loading…
Reference in New Issue