new: [settings] Allow to use ThreatLevel.name for alert filter

pull/7890/head
Jakub Onderka 2021-10-25 19:24:18 +02:00
parent a7c44ba29b
commit fbbd51385f
4 changed files with 81 additions and 18 deletions

View File

@ -130,6 +130,13 @@ jobs:
# Get authkey
sudo usermod -a -G www-data $USER
# Ensure the perms of config files
sudo chown -R $USER:www-data `pwd`/app/Config
sudo chmod -R 777 `pwd`/app/Config
sudo -E su $USER -c 'app/Console/cake Admin setSetting "MISP.server_settings_skip_backup_rotate" 1'
sudo chown -R $USER:www-data `pwd`/app/Config
sudo chmod -R 777 `pwd`/app/Config
- name: DB Update
run: |
sudo -E su $USER -c 'app/Console/cake Admin setSetting "MISP.osuser" $USER'
@ -237,13 +244,6 @@ jobs:
run: |
export PATH=$HOME/.local/env:$PATH # enable poetry binary
# Ensure the perms of config files
sudo chown -R $USER:www-data `pwd`/app/Config
sudo chmod -R 777 `pwd`/app/Config
sudo -E su $USER -c 'app/Console/cake Admin setSetting "MISP.server_settings_skip_backup_rotate" 1'
sudo chown -R $USER:www-data `pwd`/app/Config
sudo chmod -R 777 `pwd`/app/Config
pushd tests
./curl_tests_GH.sh $AUTH $HOST
popd

View File

@ -3183,7 +3183,6 @@ class Event extends AppModel
);
$userCount = count($usersWithAccess);
$this->UserSetting = ClassRegistry::init('UserSetting');
$metadataOnly = Configure::read('MISP.event_alert_metadata_only') || Configure::read('MISP.publish_alerts_summary_only');
foreach ($usersWithAccess as $k => $user) {
// Fetch event for user that will receive alert e-mail to respect all ACLs
@ -3196,7 +3195,7 @@ class Event extends AppModel
'metadata' => $metadataOnly,
])[0];
if ($this->UserSetting->checkPublishFilter($user, $eventForUser)) {
if ($this->User->UserSetting->checkPublishFilter($user, $eventForUser)) {
$body = $this->prepareAlertEmail($eventForUser, $user, $oldpublish);
$this->User->sendEmail(['User' => $user], $body, false, null);
}

View File

@ -301,7 +301,7 @@ class UserSetting extends AppModel
}
}
/*
/**
* Checks if an event matches the given rule
* valid filters:
* - AttributeTag.name
@ -313,6 +313,11 @@ class UserSetting extends AppModel
* Values passed can be used for direct string comparisons or alternatively
* as substring matches by encapsulating the string in a pair of "%" characters
* Each rule can take a list of values
*
* @param string $rule
* @param array|string $lookup_values
* @param array $event
* @return bool
*/
private function __checkEvent($rule, $lookup_values, $event)
{
@ -347,7 +352,7 @@ class UserSetting extends AppModel
$extracted_value = mb_strtolower($extracted_value);
foreach ($lookup_values as $lookup_value) {
$lookup_value_trimmed = trim($lookup_value, "%");
if (strlen($lookup_value_trimmed) != strlen($lookup_value)) {
if (strlen($lookup_value_trimmed) !== strlen($lookup_value)) {
if (strpos($extracted_value, $lookup_value_trimmed) !== false) {
return true;
}

View File

@ -19,18 +19,19 @@ urllib3.disable_warnings()
def create_simple_event():
mispevent = MISPEvent()
mispevent.info = 'This is a super simple test'
mispevent.distribution = Distribution.your_organisation_only
mispevent.threat_level_id = ThreatLevel.low
mispevent.analysis = Analysis.completed
mispevent.add_attribute('text', str(uuid.uuid4()))
return mispevent
event = MISPEvent()
event.info = 'This is a super simple test'
event.distribution = Distribution.your_organisation_only
event.threat_level_id = ThreatLevel.low
event.analysis = Analysis.completed
event.add_attribute('text', str(uuid.uuid4()))
return event
def check_response(response):
if isinstance(response, dict) and "errors" in response:
raise Exception(response["errors"])
return response
class TestComprehensive(unittest.TestCase):
@ -397,6 +398,64 @@ class TestComprehensive(unittest.TestCase):
check_response(self.admin_misp_connector.delete_event(event))
def test_publish_alert_filter(self):
check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 0, force=True))
first = create_simple_event()
first.add_tag('test_publish_filter')
first.threat_level_id = ThreatLevel.medium
second = create_simple_event()
second.add_tag('test_publish_filter')
second.threat_level_id = ThreatLevel.high
third = create_simple_event()
third.add_tag('test_publish_filter')
third.threat_level_id = ThreatLevel.low
four = create_simple_event()
four.threat_level_id = ThreatLevel.high
try:
# Enable autoalert on admin
self.admin_misp_connector._current_user.autoalert = True
check_response(self.admin_misp_connector.update_user(self.admin_misp_connector._current_user))
# Set publish_alert_filter tag to `test_publish_filter`
setting_value = {'AND': {'Tag.name': 'test_publish_filter', 'ThreatLevel.name': ['High', 'Medium']}}
check_response(self.admin_misp_connector.set_user_setting('publish_alert_filter', setting_value))
# Add events
first = check_response(self.admin_misp_connector.add_event(first))
second = check_response(self.admin_misp_connector.add_event(second))
third = check_response(self.admin_misp_connector.add_event(third))
four = check_response(self.admin_misp_connector.add_event(four))
# Publish events
for event in (first, second, third, four):
check_response(self.admin_misp_connector.publish(event, alert=True))
# Email notification should be send just to first event
mail_logs = self.admin_misp_connector.search_logs(model='User', action='email')
log_titles = [log.title for log in mail_logs]
self.assertIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(first.id) + ' - Medium - TLP:AMBER".', log_titles)
self.assertIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(second.id) + ' - High - TLP:AMBER".', log_titles)
self.assertNotIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(third.id) + ' - Low - TLP:AMBER".', log_titles)
self.assertNotIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(four.id) + ' - High - TLP:AMBER".', log_titles)
finally:
# Disable autoalert
self.admin_misp_connector._current_user.autoalert = False
check_response(self.admin_misp_connector.update_user(self.admin_misp_connector._current_user))
# Delete filter
self.admin_misp_connector.delete_user_setting('publish_alert_filter')
# Reenable background jobs
check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 1, force=True))
# Delete events
for event in (first, second, third, four):
check_response(self.admin_misp_connector.delete_event(event))
def test_remove_orphaned_correlations(self):
result = self.admin_misp_connector._check_json_response(self.admin_misp_connector._prepare_request('GET', 'servers/removeOrphanedCorrelations'))
check_response(result)