new: [test] Check object correlation

pull/8641/head
Jakub Onderka 2022-10-05 14:50:06 +02:00
parent b494d0db5f
commit 70556e5911
1 changed files with 27 additions and 1 deletions

View File

@ -16,6 +16,7 @@ logger = logging.getLogger('pymisp')
from pymisp import PyMISP, MISPOrganisation, MISPUser, MISPRole, MISPSharingGroup, MISPEvent, MISPLog, MISPSighting, Distribution, ThreatLevel, Analysis, MISPEventReport, MISPServerError from pymisp import PyMISP, MISPOrganisation, MISPUser, MISPRole, MISPSharingGroup, MISPEvent, MISPLog, MISPSighting, Distribution, ThreatLevel, Analysis, MISPEventReport, MISPServerError
from pymisp.tools import DomainIPObject
# Load access information for env variables # Load access information for env variables
url = "http://" + os.environ["HOST"] url = "http://" + os.environ["HOST"]
@ -536,9 +537,34 @@ class TestComprehensive(unittest.TestCase):
for event in (first, second): for event in (first, second):
check_response(self.admin_misp_connector.delete_event(event)) check_response(self.admin_misp_connector.delete_event(event))
def test_correlations_object(self):
first = create_simple_event()
dom_ip_obj = DomainIPObject({'ip': ['10.0.0.1']})
first.add_object(dom_ip_obj)
first = check_response(self.admin_misp_connector.add_event(first))
second = create_simple_event()
dom_ip_obj = DomainIPObject({'ip': ['10.0.0.1']})
second.add_object(dom_ip_obj)
second = check_response(self.admin_misp_connector.add_event(second))
# Reload to get event data with related events
first = check_response(self.admin_misp_connector.get_event(first))
try:
self.assertEqual(1, len(first.RelatedEvent), first.RelatedEvent)
self.assertEqual(1, len(second.RelatedEvent), second.RelatedEvent)
except:
raise
finally:
# Delete events
for event in (first, second):
check_response(self.admin_misp_connector.delete_event(event))
def test_correlations_noacl(self): def test_correlations_noacl(self):
with MISPSetting(self.admin_misp_connector, {"MISP.correlation_engine": "NoAcl"}): with MISPSetting(self.admin_misp_connector, {"MISP.correlation_engine": "NoAcl"}):
self.test_correlations() self.test_correlations()
self.test_correlations_object()
def test_advanced_correlations(self): def test_advanced_correlations(self):
with MISPSetting(self.admin_misp_connector, {"MISP.enable_advanced_correlations": True}): with MISPSetting(self.admin_misp_connector, {"MISP.enable_advanced_correlations": True}):
@ -898,7 +924,6 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(event) self.admin_misp_connector.delete_event(event)
def _search(self, query: dict): def _search(self, query: dict):
response = self.admin_misp_connector._prepare_request('POST', 'events/restSearch', data=query) response = self.admin_misp_connector._prepare_request('POST', 'events/restSearch', data=query)
response = self.admin_misp_connector._check_response(response) response = self.admin_misp_connector._check_response(response)
@ -911,5 +936,6 @@ class TestComprehensive(unittest.TestCase):
check_response(response) check_response(response)
return response return response
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()