diff --git a/tests/testlive_security.py b/tests/testlive_security.py index 254e9fe6c..8237c87dc 100644 --- a/tests/testlive_security.py +++ b/tests/testlive_security.py @@ -5,6 +5,7 @@ import time import json import datetime import unittest +from unittest.util import safe_repr from typing import Union, List, Optional import urllib3 # type: ignore import logging @@ -39,6 +40,7 @@ class ROLE(Enum): ADMIN = 1 ORG_ADMIN = 2 USER = 3 + PUBLISHER = 4 SYNC_USER = 5 @@ -387,7 +389,7 @@ class TestSecurity(unittest.TestCase): self.__delete_advanced_authkey(auth_key["id"]) - self.__assertErrorResponse(logged_in.get_user()) + self.assertErrorResponse(logged_in.get_user()) def test_advanced_authkeys_deleted_keep_session(self): with self.__setting({ @@ -453,7 +455,7 @@ class TestSecurity(unittest.TestCase): # Reset auth key for different user new_auth_key = send(logged_in, "POST", "users/resetauthkey/1", check_errors=False) - self.__assertErrorResponse(new_auth_key) + self.assertErrorResponse(new_auth_key) # Try to login again logged_in = PyMISP(url, auth_key["authkey_raw"]) @@ -805,7 +807,7 @@ class TestSecurity(unittest.TestCase): user.org_id = self.test_org.id user.role_id = 3 created_user = self.org_admin_misp_connector.add_user(user) - self.__assertErrorResponse(created_user) + self.assertErrorResponse(created_user) def test_change_user_org_by_org_admin_different_org(self): updated_user = self.org_admin_misp_connector.update_user({'org_id': 1}, self.test_usr) @@ -1168,6 +1170,68 @@ class TestSecurity(unittest.TestCase): self.assertIn("X-Username", response.headers) self.assertEqual(f"{self.test_usr.email}/API/{auth_key['id']}", response.headers["X-Username"]) + def test_event_publish_no_perm(self): + test_usr = self.__login(self.test_usr) + + created_event = test_usr.add_event(self.__generate_event()) + self.assertSuccessfulResponse(created_event, "User should be able to create event") + + access_event = test_usr.get_event(created_event) + self.assertSuccessfulResponse(access_event, "User should be able to access that event") + + published = test_usr.publish(access_event) + self.assertErrorResponse(published, "User should not be able to publish that event without perm_publish permission") + + published = test_usr.publish(access_event, alert=True) + self.assertErrorResponse(published, "User should not be able to publish that event without perm_publish permission") + + self.assertSuccessfulResponse(test_usr.delete_event(access_event), "User should be able to delete his event") + + def test_event_publish_with_perm(self): + publisher_user = self.__create_user(self.test_org.id, ROLE.PUBLISHER) + logged_in = self.__login(publisher_user) + + created_event = logged_in.add_event(self.__generate_event()) + self.assertSuccessfulResponse(created_event, "User should be able to create event") + + access_event = logged_in.get_event(created_event) + self.assertSuccessfulResponse(access_event, "User should be able to access that event") + + published = logged_in.publish(access_event) + self.assertSuccessfulResponse(published, "User should be able to publish that event without perm_publish permission") + + published = logged_in.publish(access_event, alert=True) + self.assertSuccessfulResponse(published, "User should be able to publish (alert) that event without perm_publish permission") + + self.assertSuccessfulResponse(logged_in.delete_event(access_event), "User should be able to delete his event") + + # Cleanup + self.admin_misp_connector.delete_user(publisher_user) + + def test_event_publish_different_org(self): + different_org = self.__create_org() + publisher_user = self.__create_user(different_org.id, ROLE.PUBLISHER) + logged_in = self.__login(publisher_user) + + test_usr = self.__login(self.test_usr) + + created_event = test_usr.add_event(self.__generate_event()) + self.assertSuccessfulResponse(created_event, "User should be able to create event") + + access_event = test_usr.get_event(created_event) + self.assertSuccessfulResponse(access_event, "User should be able to access that event") + + published = logged_in.publish(created_event) + self.assertErrorResponse(published, "User from different org should not be able to publish that event") + + published = logged_in.publish(created_event, alert=True) + self.assertErrorResponse(published, "User from different org should not be able to publish that event") + + # Cleanup + test_usr.delete_event(created_event) + self.admin_misp_connector.delete_user(publisher_user) + self.admin_misp_connector.delete_organisation(different_org) + def test_sg_index_user_cannot_see(self): org = self.__create_org() hidden_sg = self.__create_sharing_group() @@ -1221,9 +1285,9 @@ class TestSecurity(unittest.TestCase): with self.assertRaises(Exception): send(logged_in, "POST", f"/sharingGroups/edit/{hidden_sg.uuid}", {"name": "New name2"}) - self.__assertErrorResponse(logged_in.add_org_to_sharing_group(hidden_sg, self.test_org.uuid)) - self.__assertErrorResponse(logged_in.remove_org_from_sharing_group(hidden_sg, org.uuid)) - self.__assertErrorResponse(logged_in.delete_sharing_group(hidden_sg)) + self.assertErrorResponse(logged_in.add_org_to_sharing_group(hidden_sg, self.test_org.uuid)) + self.assertErrorResponse(logged_in.remove_org_from_sharing_group(hidden_sg, org.uuid)) + self.assertErrorResponse(logged_in.delete_sharing_group(hidden_sg)) self.admin_misp_connector.delete_sharing_group(hidden_sg) self.admin_misp_connector.delete_organisation(org) @@ -1260,9 +1324,9 @@ class TestSecurity(unittest.TestCase): send(logged_in, "POST", f"/sharingGroups/edit/{sg.uuid}", {"name": "New name2"}) self.assertEqual(sg.name, send(logged_in, "GET", f"/sharingGroups/view/{sg.id}")["SharingGroup"]["name"]) - self.__assertErrorResponse(logged_in.add_org_to_sharing_group(sg, self.test_org.uuid)) - self.__assertErrorResponse(logged_in.remove_org_from_sharing_group(sg, org.uuid)) - self.__assertErrorResponse(logged_in.delete_sharing_group(sg)) + self.assertErrorResponse(logged_in.add_org_to_sharing_group(sg, self.test_org.uuid)) + self.assertErrorResponse(logged_in.remove_org_from_sharing_group(sg, org.uuid)) + self.assertErrorResponse(logged_in.delete_sharing_group(sg)) self.admin_misp_connector.delete_sharing_group(sg) self.admin_misp_connector.delete_user(sync_user) @@ -1283,7 +1347,7 @@ class TestSecurity(unittest.TestCase): after_edit = send(logged_in, "POST", f"/sharingGroups/edit/{sg.uuid}", {"name": "New name2"}) self.assertEqual("New name2", after_edit["SharingGroup"]["name"]) - self.__assertErrorResponse(logged_in.delete_sharing_group(sg)) + self.assertErrorResponse(logged_in.delete_sharing_group(sg)) self.admin_misp_connector.delete_sharing_group(sg) self.admin_misp_connector.delete_user(sync_user) @@ -1305,8 +1369,8 @@ class TestSecurity(unittest.TestCase): org = self.__create_org() with self.__setting("Security.hide_organisation_index_from_users", True): logged_in = PyMISP(url, self.test_usr.authkey) - self.__assertErrorResponse(logged_in.get_organisation(org.id)) - self.__assertErrorResponse(logged_in.get_organisation(org.uuid)) + self.assertErrorResponse(logged_in.get_organisation(org.id)) + self.assertErrorResponse(logged_in.get_organisation(org.uuid)) self.admin_misp_connector.delete_organisation(org) @@ -1335,7 +1399,7 @@ class TestSecurity(unittest.TestCase): with self.__setting("Security.hide_organisation_index_from_users", True): logged_in = PyMISP(url, self.test_usr.authkey) for key in (org.id, org.uuid, org.name): - self.__assertErrorResponse(logged_in.get_organisation(key)) + self.assertErrorResponse(logged_in.get_organisation(key)) self.admin_misp_connector.delete_event(event) self.admin_misp_connector.delete_user(user) @@ -1511,6 +1575,13 @@ class TestSecurity(unittest.TestCase): assert user_id == auth_key["user_id"], "Key was created for different user" return auth_key + def __login(self, user: MISPUser) -> PyMISP: + logged_in = PyMISP(url, user.authkey) + self.assertEqual(logged_in._current_user.id, user.id, "Logged in by different user") + if int(user.role_id) == ROLE.PUBLISHER.value: + self.assertTrue(logged_in._current_role.perm_publish, "Publisher user should have permission to publish events") + return logged_in + def __login_by_advanced_authkey(self, auth_key: dict) -> PyMISP: logged_in = PyMISP(url, auth_key["authkey_raw"]) self.assertEqual(logged_in._current_user.id, auth_key["user_id"], "Logged in by different user") @@ -1524,6 +1595,18 @@ class TestSecurity(unittest.TestCase): check_response(response) return response + def assertSuccessfulResponse(self, response, msg=None): + self.assertIsInstance(response, dict) + if "errors" in response: + msg = self._formatMessage(msg, safe_repr(response["errors"])) + self.fail(msg) + + def assertErrorResponse(self, response, msg=None): + self.assertIsInstance(response, dict) + if "errors" not in response: + msg = self._formatMessage(msg, safe_repr(response)) + self.fail(msg) + def __setting(self, key, value=None) -> MISPSetting: if not isinstance(key, dict): new_setting = {key: value} @@ -1531,10 +1614,6 @@ class TestSecurity(unittest.TestCase): new_setting = key return MISPSetting(self.admin_misp_connector, new_setting) - def __assertErrorResponse(self, response): - if "errors" not in response: - self.fail(response) - def __default_shibb_config(self) -> dict: return { "ApacheShibbAuth": {