diff --git a/tests/modify_config.php b/tests/modify_config.php index 25cc18b9d..d5119bf03 100644 --- a/tests/modify_config.php +++ b/tests/modify_config.php @@ -10,9 +10,10 @@ if (!isset($argv[2])) { if (!in_array($argv[1], ['modify', 'replace'], true)) { fail(1, "Invalid argument '{$argv[1]}', it must be 'modify' or 'replace'."); } -$newConfig = json_decode($argv[2], true); -if ($newConfig === null) { - fail(2, "Could not decode new config, it is not JSON: " . json_last_error_msg()); +try { + $newConfig = json_decode($argv[2], true, JSON_THROW_ON_ERROR); +} catch (Exception $e) { + fail(2, "Could not decode new config, it is not JSON: " . $e->getMessage()); } if (!is_array($newConfig)) { fail(2, "Provided new config is not array, `" . gettype($newConfig) . "` given."); @@ -41,4 +42,4 @@ if ($argv[1] === 'modify') { file_put_contents($configFile, " MISPEvent: caller_name = inspect.stack()[1].function @@ -52,13 +51,19 @@ def request(pymisp: PyMISP, request_type: str, url: str, data: dict = {}) -> dic return pymisp._check_json_response(response) +def publish_immediately(pymisp: PyMISP, event: Union[MISPEvent, int, str, uuid.UUID], with_email: bool = False): + event_id = get_uuid_or_id_from_abstract_misp(event) + action = "alert" if with_email else "publish" + return check_response(request(pymisp, 'POST', f'events/{action}/{event_id}/disable_background_processing:1')) + + class MISPSetting: def __init__(self, admin_connector: PyMISP, new_setting: dict): self.admin_connector = admin_connector self.new_setting = new_setting def __enter__(self): - self.original = self.__run("modify", json.dumps(self.new_setting)) + self.original = self.__run("modify", json.dumps(self.new_setting).encode("utf-8")) # Try to reset config cache self.admin_connector.get_server_setting("MISP.live") @@ -68,12 +73,12 @@ class MISPSetting: self.admin_connector.get_server_setting("MISP.live") @staticmethod - def __run(command: str, data: str) -> str: + def __run(command: str, data: bytes) -> bytes: dir_path = os.path.dirname(os.path.realpath(__file__)) r = subprocess.run(["php", dir_path + "/modify_config.php", command, data], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if r.returncode != 0: raise Exception([r.returncode, r.stdout, r.stderr]) - return r.stdout.decode("utf-8") + return r.stdout class TestComprehensive(unittest.TestCase): @@ -465,8 +470,6 @@ class TestComprehensive(unittest.TestCase): check_response(self.admin_misp_connector.delete_event(event)) def test_publish_alert_filter(self): - check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 0, force=True)) - first = create_simple_event() first.add_tag('test_publish_filter') first.threat_level_id = ThreatLevel.medium @@ -499,7 +502,7 @@ class TestComprehensive(unittest.TestCase): # Publish events for event in (first, second, third, four): - check_response(self.admin_misp_connector.publish(event, alert=True)) + publish_immediately(self.admin_misp_connector, event, with_email=True) # Email notification should be send just to first event mail_logs = self.admin_misp_connector.search_logs(model='User', action='email') @@ -516,8 +519,6 @@ class TestComprehensive(unittest.TestCase): check_response(self.admin_misp_connector.update_user(self.admin_misp_connector._current_user)) # Delete filter self.admin_misp_connector.delete_user_setting('publish_alert_filter') - # Reenable background jobs - check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 1, force=True)) # Delete events for event in (first, second, third, four): check_response(self.admin_misp_connector.delete_event(event)) @@ -923,11 +924,12 @@ class TestComprehensive(unittest.TestCase): def test_search_snort_suricata(self): event = create_simple_event() event.add_attribute('ip-src', '8.8.8.8', to_ids=True) + event.add_attribute('snort', 'alert tcp 192.168.1.0/24 any -> 131.171.127.1 25 (content: "hacking"; msg: "malicious packet"; sid:2000001;)', to_ids=True) event = self.user_misp_connector.add_event(event) check_response(event) - self.admin_misp_connector.publish(event, alert=False) - time.sleep(6) + publish_immediately(self.admin_misp_connector, event) + snort = self._search_event({'returnFormat': 'snort', 'eventid': event.id}) self.assertIsInstance(snort, str) self.assertIn('8.8.8.8', snort)