diff --git a/tests/test_eventreport.py b/tests/test_eventreport.py index 078225b..7ce302b 100644 --- a/tests/test_eventreport.py +++ b/tests/test_eventreport.py @@ -42,6 +42,7 @@ except ImportError as e: urllib3.disable_warnings() fast_mode = True +background_job_time = 6 def setup_report_env(func): @functools.wraps(func) @@ -53,8 +54,8 @@ def setup_report_env(func): event.threat_level_id = ThreatLevel.low event.analysis = Analysis.completed event.set_date("2017-12-31") - self.event = self.popKey(self.admin_misp_connector.add_event(event)) - self.publish_event() + self.event = self.org_user_misp_connector.add_event(event, pythonify=True) + self.publish_event(self.org_user_misp_connector) self.testReport['event_id'] = self.event['id'] func(self,*args,**kwargs) finally: @@ -87,17 +88,29 @@ class TestEventReportComprehensive(unittest.TestCase): if not fast_mode: r = cls.admin_misp_connector.update_misp() print(r) - # Creates an org + # Creates an org 1 organisation = MISPOrganisation() - organisation.name = 'Test Org' - cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) + organisation.name = 'Test Org 1' + cls.test_org1 = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) + # Creates an org 2 + organisation = MISPOrganisation() + organisation.name = 'Test Org 2' + cls.test_org2 = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) # Creates a user user = MISPUser() user.email = 'testusr@user.local' - user.org_id = cls.test_org.id + user.org_id = cls.test_org1.id cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True) cls.user_misp_connector = PyMISP(url, cls.test_usr.authkey, verifycert, debug=True) cls.user_misp_connector.toggle_global_pythonify() + # Creates an org user + user = MISPUser() + user.email = 'testorgusr@user.local' + user.org_id = cls.test_org2.id + user.role_id = 2 + cls.org_usr = cls.admin_misp_connector.add_user(user, pythonify=True) + cls.org_user_misp_connector = PyMISP(url, cls.org_usr.authkey, verifycert, debug=True) + cls.org_user_misp_connector.toggle_global_pythonify() cls.event = None cls.sharing_group = None @@ -110,10 +123,12 @@ class TestEventReportComprehensive(unittest.TestCase): @classmethod def tearDownClass(cls): - # Delete user + # Delete users cls.admin_misp_connector.delete_user(cls.test_usr) + cls.admin_misp_connector.delete_user(cls.org_usr) # Delete org - cls.admin_misp_connector.delete_organisation(cls.test_org) + cls.admin_misp_connector.delete_organisation(cls.test_org1) + cls.admin_misp_connector.delete_organisation(cls.test_org2) def compare_report(self, base_report, report): to_check = ['uuid', 'name', 'content', 'distribution', 'sharing_group_id'] @@ -243,7 +258,7 @@ class TestEventReportComprehensive(unittest.TestCase): def add_report(self, report, connector = None): relative_path = f"eventReports/add/{self.event['id']}" if connector is None: - newReport = self.admin_misp_connector.direct_call(relative_path, data=report) + newReport = self.org_user_misp_connector.direct_call(relative_path, data=report) else: newReport = connector.direct_call(relative_path, data=report) newReport = self.popKey(newReport) @@ -255,7 +270,7 @@ class TestEventReportComprehensive(unittest.TestCase): report['content'] = 'Report content - content changed' report['distribution'] = str(Distribution.inherit.value) if connector is None: - returnedReport = self.admin_misp_connector.direct_call(relative_path, data=report) + returnedReport = self.org_user_misp_connector.direct_call(relative_path, data=report) else: returnedReport = connector.direct_call(relative_path, data=report) returnedReport = self.popKey(returnedReport) @@ -264,7 +279,7 @@ class TestEventReportComprehensive(unittest.TestCase): def soft_delete_report(self, report, connector = None): relative_path = f"eventReports/delete/{report['id']}" if connector is None: - report = self.admin_misp_connector.direct_call(relative_path, data={}) + report = self.org_user_misp_connector.direct_call(relative_path, data={}) else: report = connector.direct_call(relative_path) report = self.popKey(report) @@ -273,7 +288,7 @@ class TestEventReportComprehensive(unittest.TestCase): def hard_delete_report(self, report, connector = None): relative_path = f"eventReports/delete/{report['id']}/1" if connector is None: - report = self.admin_misp_connector.direct_call(relative_path, data={}) + report = self.org_user_misp_connector.direct_call(relative_path, data={}) else: report = connector.direct_call(relative_path) report = self.popKey(report) @@ -282,7 +297,7 @@ class TestEventReportComprehensive(unittest.TestCase): def restore_report(self, report, connector = None): relative_path = f"eventReports/restore/{report['id']}" if connector is None: - report = self.admin_misp_connector.direct_call(relative_path, data={}) + report = self.org_user_misp_connector.direct_call(relative_path, data={}) else: report = connector.direct_call(relative_path) report = self.popKey(report) @@ -291,7 +306,7 @@ class TestEventReportComprehensive(unittest.TestCase): def get_report(self, reportID, connector = None): relative_path = f"eventReports/view/{reportID}" if connector is None: - report = self.admin_misp_connector.direct_call(relative_path) + report = self.org_user_misp_connector.direct_call(relative_path) else: report = connector.direct_call(relative_path) report = self.popKey(report) @@ -302,15 +317,22 @@ class TestEventReportComprehensive(unittest.TestCase): if eventID is not None: relative_path += f"/event_id:{eventID}" if connector is None: - report = self.admin_misp_connector.direct_call(relative_path) + report = self.org_user_misp_connector.direct_call(relative_path) else: report = connector.direct_call(relative_path) report = self.popKey(report) return report - def publish_event(self): - result = self.admin_misp_connector.publish(self.event) - self.event = self.popKey(result, keyname='Event') + def publish_event(self, connector = None): + if connector is None: + self.org_user_misp_connector.publish(self.event) + else: + connector.publish(self.event) + time.sleep(background_job_time) + self.event = self.get_event(connector = connector) - def get_event(self): - return self.popKey(self.admin_misp_connector.get_event(self.event['id'], pythonify=True), keyname='Event') + def get_event(self, connector = None): + if connector is None: + return self.org_user_misp_connector.get_event(self.event.id, pythonify=True) + else: + return connector.get_event(self.event.id, pythonify=True)