new: [test] Security test for publishing events

pull/7539/head
Jakub Onderka 2021-07-02 11:16:18 +02:00
parent fa681622d7
commit 77f99f7c3e
1 changed files with 96 additions and 17 deletions

View File

@ -5,6 +5,7 @@ import time
import json
import datetime
import unittest
from unittest.util import safe_repr
from typing import Union, List, Optional
import urllib3 # type: ignore
import logging
@ -39,6 +40,7 @@ class ROLE(Enum):
ADMIN = 1
ORG_ADMIN = 2
USER = 3
PUBLISHER = 4
SYNC_USER = 5
@ -387,7 +389,7 @@ class TestSecurity(unittest.TestCase):
self.__delete_advanced_authkey(auth_key["id"])
self.__assertErrorResponse(logged_in.get_user())
self.assertErrorResponse(logged_in.get_user())
def test_advanced_authkeys_deleted_keep_session(self):
with self.__setting({
@ -453,7 +455,7 @@ class TestSecurity(unittest.TestCase):
# Reset auth key for different user
new_auth_key = send(logged_in, "POST", "users/resetauthkey/1", check_errors=False)
self.__assertErrorResponse(new_auth_key)
self.assertErrorResponse(new_auth_key)
# Try to login again
logged_in = PyMISP(url, auth_key["authkey_raw"])
@ -805,7 +807,7 @@ class TestSecurity(unittest.TestCase):
user.org_id = self.test_org.id
user.role_id = 3
created_user = self.org_admin_misp_connector.add_user(user)
self.__assertErrorResponse(created_user)
self.assertErrorResponse(created_user)
def test_change_user_org_by_org_admin_different_org(self):
updated_user = self.org_admin_misp_connector.update_user({'org_id': 1}, self.test_usr)
@ -1168,6 +1170,68 @@ class TestSecurity(unittest.TestCase):
self.assertIn("X-Username", response.headers)
self.assertEqual(f"{self.test_usr.email}/API/{auth_key['id']}", response.headers["X-Username"])
def test_event_publish_no_perm(self):
test_usr = self.__login(self.test_usr)
created_event = test_usr.add_event(self.__generate_event())
self.assertSuccessfulResponse(created_event, "User should be able to create event")
access_event = test_usr.get_event(created_event)
self.assertSuccessfulResponse(access_event, "User should be able to access that event")
published = test_usr.publish(access_event)
self.assertErrorResponse(published, "User should not be able to publish that event without perm_publish permission")
published = test_usr.publish(access_event, alert=True)
self.assertErrorResponse(published, "User should not be able to publish that event without perm_publish permission")
self.assertSuccessfulResponse(test_usr.delete_event(access_event), "User should be able to delete his event")
def test_event_publish_with_perm(self):
publisher_user = self.__create_user(self.test_org.id, ROLE.PUBLISHER)
logged_in = self.__login(publisher_user)
created_event = logged_in.add_event(self.__generate_event())
self.assertSuccessfulResponse(created_event, "User should be able to create event")
access_event = logged_in.get_event(created_event)
self.assertSuccessfulResponse(access_event, "User should be able to access that event")
published = logged_in.publish(access_event)
self.assertSuccessfulResponse(published, "User should be able to publish that event without perm_publish permission")
published = logged_in.publish(access_event, alert=True)
self.assertSuccessfulResponse(published, "User should be able to publish (alert) that event without perm_publish permission")
self.assertSuccessfulResponse(logged_in.delete_event(access_event), "User should be able to delete his event")
# Cleanup
self.admin_misp_connector.delete_user(publisher_user)
def test_event_publish_different_org(self):
different_org = self.__create_org()
publisher_user = self.__create_user(different_org.id, ROLE.PUBLISHER)
logged_in = self.__login(publisher_user)
test_usr = self.__login(self.test_usr)
created_event = test_usr.add_event(self.__generate_event())
self.assertSuccessfulResponse(created_event, "User should be able to create event")
access_event = test_usr.get_event(created_event)
self.assertSuccessfulResponse(access_event, "User should be able to access that event")
published = logged_in.publish(created_event)
self.assertErrorResponse(published, "User from different org should not be able to publish that event")
published = logged_in.publish(created_event, alert=True)
self.assertErrorResponse(published, "User from different org should not be able to publish that event")
# Cleanup
test_usr.delete_event(created_event)
self.admin_misp_connector.delete_user(publisher_user)
self.admin_misp_connector.delete_organisation(different_org)
def test_sg_index_user_cannot_see(self):
org = self.__create_org()
hidden_sg = self.__create_sharing_group()
@ -1221,9 +1285,9 @@ class TestSecurity(unittest.TestCase):
with self.assertRaises(Exception):
send(logged_in, "POST", f"/sharingGroups/edit/{hidden_sg.uuid}", {"name": "New name2"})
self.__assertErrorResponse(logged_in.add_org_to_sharing_group(hidden_sg, self.test_org.uuid))
self.__assertErrorResponse(logged_in.remove_org_from_sharing_group(hidden_sg, org.uuid))
self.__assertErrorResponse(logged_in.delete_sharing_group(hidden_sg))
self.assertErrorResponse(logged_in.add_org_to_sharing_group(hidden_sg, self.test_org.uuid))
self.assertErrorResponse(logged_in.remove_org_from_sharing_group(hidden_sg, org.uuid))
self.assertErrorResponse(logged_in.delete_sharing_group(hidden_sg))
self.admin_misp_connector.delete_sharing_group(hidden_sg)
self.admin_misp_connector.delete_organisation(org)
@ -1260,9 +1324,9 @@ class TestSecurity(unittest.TestCase):
send(logged_in, "POST", f"/sharingGroups/edit/{sg.uuid}", {"name": "New name2"})
self.assertEqual(sg.name, send(logged_in, "GET", f"/sharingGroups/view/{sg.id}")["SharingGroup"]["name"])
self.__assertErrorResponse(logged_in.add_org_to_sharing_group(sg, self.test_org.uuid))
self.__assertErrorResponse(logged_in.remove_org_from_sharing_group(sg, org.uuid))
self.__assertErrorResponse(logged_in.delete_sharing_group(sg))
self.assertErrorResponse(logged_in.add_org_to_sharing_group(sg, self.test_org.uuid))
self.assertErrorResponse(logged_in.remove_org_from_sharing_group(sg, org.uuid))
self.assertErrorResponse(logged_in.delete_sharing_group(sg))
self.admin_misp_connector.delete_sharing_group(sg)
self.admin_misp_connector.delete_user(sync_user)
@ -1283,7 +1347,7 @@ class TestSecurity(unittest.TestCase):
after_edit = send(logged_in, "POST", f"/sharingGroups/edit/{sg.uuid}", {"name": "New name2"})
self.assertEqual("New name2", after_edit["SharingGroup"]["name"])
self.__assertErrorResponse(logged_in.delete_sharing_group(sg))
self.assertErrorResponse(logged_in.delete_sharing_group(sg))
self.admin_misp_connector.delete_sharing_group(sg)
self.admin_misp_connector.delete_user(sync_user)
@ -1305,8 +1369,8 @@ class TestSecurity(unittest.TestCase):
org = self.__create_org()
with self.__setting("Security.hide_organisation_index_from_users", True):
logged_in = PyMISP(url, self.test_usr.authkey)
self.__assertErrorResponse(logged_in.get_organisation(org.id))
self.__assertErrorResponse(logged_in.get_organisation(org.uuid))
self.assertErrorResponse(logged_in.get_organisation(org.id))
self.assertErrorResponse(logged_in.get_organisation(org.uuid))
self.admin_misp_connector.delete_organisation(org)
@ -1335,7 +1399,7 @@ class TestSecurity(unittest.TestCase):
with self.__setting("Security.hide_organisation_index_from_users", True):
logged_in = PyMISP(url, self.test_usr.authkey)
for key in (org.id, org.uuid, org.name):
self.__assertErrorResponse(logged_in.get_organisation(key))
self.assertErrorResponse(logged_in.get_organisation(key))
self.admin_misp_connector.delete_event(event)
self.admin_misp_connector.delete_user(user)
@ -1511,6 +1575,13 @@ class TestSecurity(unittest.TestCase):
assert user_id == auth_key["user_id"], "Key was created for different user"
return auth_key
def __login(self, user: MISPUser) -> PyMISP:
logged_in = PyMISP(url, user.authkey)
self.assertEqual(logged_in._current_user.id, user.id, "Logged in by different user")
if int(user.role_id) == ROLE.PUBLISHER.value:
self.assertTrue(logged_in._current_role.perm_publish, "Publisher user should have permission to publish events")
return logged_in
def __login_by_advanced_authkey(self, auth_key: dict) -> PyMISP:
logged_in = PyMISP(url, auth_key["authkey_raw"])
self.assertEqual(logged_in._current_user.id, auth_key["user_id"], "Logged in by different user")
@ -1524,6 +1595,18 @@ class TestSecurity(unittest.TestCase):
check_response(response)
return response
def assertSuccessfulResponse(self, response, msg=None):
self.assertIsInstance(response, dict)
if "errors" in response:
msg = self._formatMessage(msg, safe_repr(response["errors"]))
self.fail(msg)
def assertErrorResponse(self, response, msg=None):
self.assertIsInstance(response, dict)
if "errors" not in response:
msg = self._formatMessage(msg, safe_repr(response))
self.fail(msg)
def __setting(self, key, value=None) -> MISPSetting:
if not isinstance(key, dict):
new_setting = {key: value}
@ -1531,10 +1614,6 @@ class TestSecurity(unittest.TestCase):
new_setting = key
return MISPSetting(self.admin_misp_connector, new_setting)
def __assertErrorResponse(self, response):
if "errors" not in response:
self.fail(response)
def __default_shibb_config(self) -> dict:
return {
"ApacheShibbAuth": {