chg: [test] Add snort attribute to test

pull/9521/head
Jakub Onderka 2024-01-27 14:58:34 +01:00
parent ba1387edb5
commit 58928a4497
2 changed files with 25 additions and 22 deletions

View File

@ -10,9 +10,10 @@ if (!isset($argv[2])) {
if (!in_array($argv[1], ['modify', 'replace'], true)) {
fail(1, "Invalid argument '{$argv[1]}', it must be 'modify' or 'replace'.");
}
$newConfig = json_decode($argv[2], true);
if ($newConfig === null) {
fail(2, "Could not decode new config, it is not JSON: " . json_last_error_msg());
try {
$newConfig = json_decode($argv[2], true, JSON_THROW_ON_ERROR);
} catch (Exception $e) {
fail(2, "Could not decode new config, it is not JSON: " . $e->getMessage());
}
if (!is_array($newConfig)) {
fail(2, "Provided new config is not array, `" . gettype($newConfig) . "` given.");
@ -41,4 +42,4 @@ if ($argv[1] === 'modify') {
file_put_contents($configFile, "<?php\n\$config = " . var_export($merged, true) . ';', LOCK_EX);
// Returns config file before modification
echo json_encode($config);
echo json_encode($config, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);

View File

@ -2,6 +2,7 @@
import os
import json
import uuid
import logging
import inspect
import subprocess
import unittest
@ -11,21 +12,19 @@ from xml.etree import ElementTree as ET
from io import BytesIO
import urllib3 # type: ignore
from datetime import datetime, timedelta
import logging
logging.disable(logging.CRITICAL)
logger = logging.getLogger('pymisp')
from typing import Union
from pymisp import PyMISP, MISPOrganisation, MISPUser, MISPRole, MISPSharingGroup, MISPEvent, MISPLog, MISPSighting, Distribution, ThreatLevel, Analysis, MISPEventReport, MISPServerError
from pymisp.tools import DomainIPObject
from pymisp.api import get_uuid_or_id_from_abstract_misp
logging.disable(logging.CRITICAL)
logger = logging.getLogger('pymisp')
urllib3.disable_warnings()
# Load access information for env variables
url = "http://" + os.environ["HOST"]
key = os.environ["AUTH"]
urllib3.disable_warnings()
def create_simple_event() -> MISPEvent:
caller_name = inspect.stack()[1].function
@ -52,13 +51,19 @@ def request(pymisp: PyMISP, request_type: str, url: str, data: dict = {}) -> dic
return pymisp._check_json_response(response)
def publish_immediately(pymisp: PyMISP, event: Union[MISPEvent, int, str, uuid.UUID], with_email: bool = False):
event_id = get_uuid_or_id_from_abstract_misp(event)
action = "alert" if with_email else "publish"
return check_response(request(pymisp, 'POST', f'events/{action}/{event_id}/disable_background_processing:1'))
class MISPSetting:
def __init__(self, admin_connector: PyMISP, new_setting: dict):
self.admin_connector = admin_connector
self.new_setting = new_setting
def __enter__(self):
self.original = self.__run("modify", json.dumps(self.new_setting))
self.original = self.__run("modify", json.dumps(self.new_setting).encode("utf-8"))
# Try to reset config cache
self.admin_connector.get_server_setting("MISP.live")
@ -68,12 +73,12 @@ class MISPSetting:
self.admin_connector.get_server_setting("MISP.live")
@staticmethod
def __run(command: str, data: str) -> str:
def __run(command: str, data: bytes) -> bytes:
dir_path = os.path.dirname(os.path.realpath(__file__))
r = subprocess.run(["php", dir_path + "/modify_config.php", command, data], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if r.returncode != 0:
raise Exception([r.returncode, r.stdout, r.stderr])
return r.stdout.decode("utf-8")
return r.stdout
class TestComprehensive(unittest.TestCase):
@ -465,8 +470,6 @@ class TestComprehensive(unittest.TestCase):
check_response(self.admin_misp_connector.delete_event(event))
def test_publish_alert_filter(self):
check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 0, force=True))
first = create_simple_event()
first.add_tag('test_publish_filter')
first.threat_level_id = ThreatLevel.medium
@ -499,7 +502,7 @@ class TestComprehensive(unittest.TestCase):
# Publish events
for event in (first, second, third, four):
check_response(self.admin_misp_connector.publish(event, alert=True))
publish_immediately(self.admin_misp_connector, event, with_email=True)
# Email notification should be send just to first event
mail_logs = self.admin_misp_connector.search_logs(model='User', action='email')
@ -516,8 +519,6 @@ class TestComprehensive(unittest.TestCase):
check_response(self.admin_misp_connector.update_user(self.admin_misp_connector._current_user))
# Delete filter
self.admin_misp_connector.delete_user_setting('publish_alert_filter')
# Reenable background jobs
check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 1, force=True))
# Delete events
for event in (first, second, third, four):
check_response(self.admin_misp_connector.delete_event(event))
@ -923,11 +924,12 @@ class TestComprehensive(unittest.TestCase):
def test_search_snort_suricata(self):
event = create_simple_event()
event.add_attribute('ip-src', '8.8.8.8', to_ids=True)
event.add_attribute('snort', 'alert tcp 192.168.1.0/24 any -> 131.171.127.1 25 (content: "hacking"; msg: "malicious packet"; sid:2000001;)', to_ids=True)
event = self.user_misp_connector.add_event(event)
check_response(event)
self.admin_misp_connector.publish(event, alert=False)
time.sleep(6)
publish_immediately(self.admin_misp_connector, event)
snort = self._search_event({'returnFormat': 'snort', 'eventid': event.id})
self.assertIsInstance(snort, str)
self.assertIn('8.8.8.8', snort)