MISP/tests/testlive_comprehensive_loca...

1144 lines
50 KiB
Python

#!/usr/bin/env python3
import os
import json
import uuid
import logging
import inspect
import subprocess
import unittest
import requests
import time
from xml.etree import ElementTree as ET
from io import BytesIO
import urllib3 # type: ignore
from datetime import datetime, timedelta
from typing import Union
from pymisp import PyMISP, MISPOrganisation, MISPUser, MISPRole, MISPSharingGroup, MISPEvent, MISPLog, MISPSighting, Distribution, ThreatLevel, Analysis, MISPEventReport, MISPServerError
from pymisp.tools import DomainIPObject
from pymisp.api import get_uuid_or_id_from_abstract_misp
logging.disable(logging.CRITICAL)
logger = logging.getLogger('pymisp')
urllib3.disable_warnings()
# Load access information for env variables
url = "http://" + os.environ["HOST"]
key = os.environ["AUTH"]
def create_simple_event() -> MISPEvent:
caller_name = inspect.stack()[1].function
event_uuid = str(uuid.uuid4())
event = MISPEvent()
event.uuid = event_uuid
event.info = 'This is a super simple test ({}, {})'.format(event_uuid.split('-')[0], caller_name)
event.distribution = Distribution.your_organisation_only
event.threat_level_id = ThreatLevel.low
event.analysis = Analysis.completed
event.add_attribute('text', event_uuid)
return event
def check_response(response):
if isinstance(response, dict) and "errors" in response:
raise Exception(response["errors"])
return response
def request(pymisp: PyMISP, request_type: str, url: str, data: dict = {}) -> dict:
response = pymisp._prepare_request(request_type, url, data)
return pymisp._check_response(response)
def publish_immediately(pymisp: PyMISP, event: Union[MISPEvent, int, str, uuid.UUID], with_email: bool = False):
event_id = get_uuid_or_id_from_abstract_misp(event)
action = "alert" if with_email else "publish"
return check_response(request(pymisp, 'POST', f'events/{action}/{event_id}/disable_background_processing:1'))
class MISPSetting:
def __init__(self, admin_connector: PyMISP, new_setting: dict):
self.admin_connector = admin_connector
self.new_setting = new_setting
def __enter__(self):
self.original = self.__run("modify", json.dumps(self.new_setting).encode("utf-8"))
# Try to reset config cache
self.admin_connector.get_server_setting("MISP.live")
def __exit__(self, exc_type, exc_val, exc_tb):
self.__run("replace", self.original)
# Try to reset config cache
self.admin_connector.get_server_setting("MISP.live")
@staticmethod
def __run(command: str, data: bytes) -> bytes:
dir_path = os.path.dirname(os.path.realpath(__file__))
r = subprocess.run(["php", dir_path + "/modify_config.php", command, data], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if r.returncode != 0:
raise Exception([r.returncode, r.stdout, r.stderr])
return r.stdout
class TestComprehensive(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.maxDiff = None
# Connect as admin
cls.admin_misp_connector = PyMISP(url, key, ssl=False, debug=False)
cls.admin_misp_connector.set_server_setting('debug', 1, force=True)
# Creates an org
organisation = MISPOrganisation()
organisation.name = 'Test Org'
cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True)
# Set the default role (id 3 on the VM)
cls.admin_misp_connector.set_default_role(3)
# Creates a user
user = MISPUser()
user.email = 'testusr@user.local'
user.org_id = cls.test_org.id
cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True)
cls.user_misp_connector = PyMISP(url, cls.test_usr.authkey, ssl=False, debug=True)
@classmethod
def tearDownClass(cls):
# Delete user
cls.admin_misp_connector.delete_user(cls.test_usr)
# Delete org
cls.admin_misp_connector.delete_organisation(cls.test_org)
def setUp(self):
self.user_misp_connector.global_pythonify = True
self.admin_misp_connector.global_pythonify = True
@unittest.skip("FIXME: That index will be empty.")
def test_search_index(self):
# Search all events
index = self.user_misp_connector.search_index()
self.assertGreater(len(index), 0)
# Search published
index_published = self.user_misp_connector.search_index(published=True)
if len(index_published):
print(index_published)
self.assertEqual(len(index_published), 0, "No event should be published.")
# Create test event
event = create_simple_event()
event = self.user_misp_connector.add_event(event)
check_response(event)
# Search by org name
index_org = self.user_misp_connector.search_index(org="Test Org")
self.assertGreater(len(index_org), 0)
# Search by org name with different case
index_org_lower = self.user_misp_connector.search_index(org="test org")
self.assertGreater(len(index_org_lower), 0)
# Search by org uuid
index_org_uuid = self.user_misp_connector.search_index(org=self.test_org.uuid)
self.assertGreater(len(index_org_uuid), 0)
# Search by org ID
index_org_id = self.user_misp_connector.search_index(org=self.test_org.id)
self.assertGreater(len(index_org_id), 0)
self.assertEqual(len(index_org), len(index_org_lower))
self.assertEqual(len(index_org), len(index_org_uuid))
self.assertEqual(len(index_org), len(index_org_id))
self.user_misp_connector.delete_event(event)
def test_search_index_by_info(self):
event = create_simple_event()
event.info = uuid.uuid4()
# No event should exist
index = self.user_misp_connector.search_index(eventinfo=event.info)
self.assertEqual(len(index), 0, "No event should exists")
event = self.user_misp_connector.add_event(event)
check_response(event)
# One event should exist
index = self.user_misp_connector.search_index(eventinfo=event.info)
self.assertEqual(len(index), 1)
self.assertEqual(index[0].uuid, event.uuid)
index = self.user_misp_connector.search_index(eventinfo="!" + str(event.info))
for index_event in index:
self.assertNotEqual(event.uuid, index_event.uuid, index)
self.user_misp_connector.delete_event(event)
def test_search_index_by_all(self):
event = create_simple_event()
index = self.user_misp_connector.search_index(all=event.attributes[0].value)
self.assertEqual(len(index), 0, "No event should exists")
event = self.user_misp_connector.add_event(event)
check_response(event)
index = self.user_misp_connector.search_index(all=event.attributes[0].value)
self.assertEqual(len(index), 1, "One event should exists")
self.assertEqual(index[0].uuid, event.uuid)
index = self.user_misp_connector.search_index(all=event.attributes[0].value.upper())
self.assertEqual(len(index), 1, "One event should exists")
self.assertEqual(index[0].uuid, event.uuid)
self.user_misp_connector.delete_event(event)
def test_search_index_by_attribute(self):
event = create_simple_event()
index = self.user_misp_connector.search_index(attribute=event.attributes[0].value)
self.assertEqual(len(index), 0, "No event should exists")
event = self.user_misp_connector.add_event(event)
check_response(event)
index = self.user_misp_connector.search_index(attribute=event.attributes[0].value)
self.assertEqual(len(index), 1, "One event should exists")
self.assertEqual(index[0].uuid, event.uuid)
index = self.user_misp_connector.search_index(attribute=event.attributes[0].value.upper())
self.assertEqual(len(index), 1, "One event should exists")
self.assertEqual(index[0].uuid, event.uuid)
self.user_misp_connector.delete_event(event)
def test_search_index_by_tag(self):
tags = self.user_misp_connector.search_tags("tlp:red", True)
self.assertEqual(len(tags), 1, tags) # tlp:red tag doesn't exists
index = self.user_misp_connector.search_index(tags="tlp:red")
self.assertEqual(len(index), 0, "No event should exists")
index = self.user_misp_connector.search_index(tags=tags[0].id)
self.assertEqual(len(index), 0, "No event should exists")
event = create_simple_event()
event.add_tag("tlp:red")
event = self.user_misp_connector.add_event(event)
check_response(event)
index = self.user_misp_connector.search_index(tags="tlp:red")
self.assertEqual(len(index), 1, "One event should exists")
index = self.user_misp_connector.search_index(tags="tlp:red|not_exists")
self.assertEqual(len(index), 1, "One event should exists")
index = self.user_misp_connector.search_index(tags=["tlp:red", "not_exists"])
self.assertEqual(len(index), 1, "One event should exists")
index = self.user_misp_connector.search_index(tags=tags[0].id)
self.assertEqual(len(index), 1, "One event should exists")
index = self.user_misp_connector.search_index(tags="!tlp:red")
for index_event in index:
self.assertNotEqual(event.uuid, index_event.uuid, index)
index = self.user_misp_connector.search_index(tags="!" + str(tags[0].id))
for index_event in index:
self.assertNotEqual(event.uuid, index_event.uuid, index)
self.user_misp_connector.delete_event(event)
def test_search_index_by_email(self):
index = self.user_misp_connector.search_index(email=self.test_usr.email)
self.assertEqual(len(index), 0, index)
event = create_simple_event()
event = self.user_misp_connector.add_event(event)
check_response(event)
index = self.user_misp_connector.search_index(email=self.test_usr.email)
self.assertEqual(len(index), 1, "One event should exists")
self.user_misp_connector.delete_event(event)
def test_search_index_by_email_admin(self):
index = self.admin_misp_connector.search_index(email="no_existing_exmail@example.com")
self.assertEqual(len(index), 0, index)
index = self.admin_misp_connector.search_index(email=self.test_usr.email)
self.assertEqual(len(index), 0, index)
event = create_simple_event()
event = self.user_misp_connector.add_event(event)
check_response(event)
index = self.admin_misp_connector.search_index(email=self.test_usr.email)
self.assertEqual(len(index), 1, index)
# Search by partial match
index = self.admin_misp_connector.search_index(email="testusr@user")
self.assertEqual(len(index), 1, index)
self.user_misp_connector.delete_event(event)
def test_search_index_by_eventid(self):
# Search by non exists uuid
index = self.admin_misp_connector.search_index(eventid=uuid.uuid4())
self.assertEqual(len(index), 0, index)
# Search by non exists id
index = self.admin_misp_connector.search_index(eventid=9999)
self.assertEqual(len(index), 0, index)
event = create_simple_event()
event = self.user_misp_connector.add_event(event)
check_response(event)
index = self.admin_misp_connector.search_index(eventid=event.id)
self.assertEqual(len(index), 1, index)
index = self.admin_misp_connector.search_index(eventid=event.uuid)
self.assertEqual(len(index), 1, index)
self.user_misp_connector.delete_event(event)
@unittest.skip("FIXME: That index will be empty.")
def test_search_index_minimal(self):
# pythonify is not supported for minimal results
self.user_misp_connector.global_pythonify = False
minimal = self.user_misp_connector.search_index(minimal=True)
self.assertGreater(len(minimal), 0)
minimal_event = minimal[0]
self.assertIn("id", minimal_event)
self.assertIn("timestamp", minimal_event)
self.assertIn("sighting_timestamp", minimal_event)
self.assertIn("published", minimal_event)
self.assertIn("uuid", minimal_event)
self.assertIn("orgc_uuid", minimal_event)
for event in minimal:
self.assertFalse(event["published"], "No event should be published.")
def test_search_index_minimal_published(self):
# pythonify is not supported for minimal results
self.user_misp_connector.global_pythonify = False
index = self.user_misp_connector.search_index(minimal=True, published=True)
self.assertEqual(len(index), 0, "No event should be published.")
index = self.user_misp_connector.search_index(minimal=True)
not_published = self.user_misp_connector.search_index(minimal=True, published=0)
both_2 = self.user_misp_connector.search_index(minimal=True, published=2)
both_array = self.user_misp_connector.search_index(minimal=True, published=[0, 1])
self.assertEqual(len(index), len(not_published))
self.assertEqual(len(index), len(both_2))
self.assertEqual(len(index), len(both_array))
def test_search_index_minimal_by_org(self):
# pythonify is not supported for minimal results
self.user_misp_connector.global_pythonify = False
# Create test event
event = create_simple_event()
event = self.user_misp_connector.add_event(event, pythonify=True)
check_response(event)
# Search by org name
minimal_org = self.user_misp_connector.search_index(minimal=True, org="Test Org")
self.assertGreater(len(minimal_org), 0)
for event in minimal_org:
self.assertEqual(event["orgc_uuid"], self.test_org.uuid)
# Search by org name with different case
minimal_org_lower = self.user_misp_connector.search_index(minimal=True, org="test org")
self.assertGreater(len(minimal_org), 0)
for event in minimal_org:
self.assertEqual(event["orgc_uuid"], self.test_org.uuid)
# Search by non exists org name
minimal_org_non_existing = self.user_misp_connector.search_index(minimal=True, org="Test Org that doesn't exists")
self.assertEqual(len(minimal_org_non_existing), 0)
# Search by org uuid
minimal_org_uuid = self.user_misp_connector.search_index(minimal=True, org=self.test_org.uuid)
self.assertGreater(len(minimal_org), 0)
for event in minimal_org:
self.assertEqual(event["orgc_uuid"], self.test_org.uuid)
# Search by non-existing uuid
minimal_org_uuid_non_existing = self.user_misp_connector.search_index(minimal=True, org=uuid.uuid4())
self.assertEqual(len(minimal_org_uuid_non_existing), 0)
# Search by org ID
minimal_org_id = self.user_misp_connector.search_index(minimal=True, org=self.test_org.id)
self.assertGreater(len(minimal_org), 0)
for event in minimal_org:
self.assertEqual(event["orgc_uuid"], self.test_org.uuid)
self.assertEqual(len(minimal_org), len(minimal_org_lower))
self.assertEqual(len(minimal_org), len(minimal_org_uuid))
self.assertEqual(len(minimal_org), len(minimal_org_id))
# Search not by org
minimal_org_not = self.user_misp_connector.search_index(minimal=True, org="!Test Org")
for event in minimal_org_not:
self.assertNotEqual(event["orgc_uuid"], self.test_org.uuid)
minimal_org_lower_not = self.user_misp_connector.search_index(minimal=True, org="!test org")
for event in minimal_org_lower_not:
self.assertNotEqual(event["orgc_uuid"], self.test_org.uuid)
minimal_org_uuid_not = self.user_misp_connector.search_index(minimal=True, org="!" + self.test_org.uuid)
for event in minimal_org_uuid_not:
self.assertNotEqual(event["orgc_uuid"], self.test_org.uuid)
minimal_org_id_not = self.user_misp_connector.search_index(minimal=True, org="!" + self.test_org.id)
for event in minimal_org_id_not:
self.assertNotEqual(event["orgc_uuid"], self.test_org.uuid)
self.assertEqual(len(minimal_org_not), len(minimal_org_lower_not))
self.assertEqual(len(minimal_org_not), len(minimal_org_uuid_not))
self.assertEqual(len(minimal_org_not), len(minimal_org_id_not))
self.user_misp_connector.delete_event(event)
def test_delete_event_blocklist(self):
check_response(self.admin_misp_connector.set_server_setting('MISP.enableEventBlocklisting', 1))
# Create test event
event = create_simple_event()
event = self.user_misp_connector.add_event(event)
check_response(event)
# Delete event
check_response(self.user_misp_connector.delete_event(event))
check_response(self.admin_misp_connector.set_server_setting('MISP.enableEventBlocklisting', 0))
def test_deleted_attributes(self):
# Create test event
event = create_simple_event()
event.add_attribute('text', "deleted", deleted=True)
event.add_attribute('text', "not-deleted")
event = self.user_misp_connector.add_event(event)
check_response(event)
# Not deleted
fetched_event = self.user_misp_connector.get_event(event)
check_response(fetched_event)
self.assertEqual(len(fetched_event.attributes), 2, fetched_event)
# Not deleted
fetched_event = self.user_misp_connector.get_event(event, deleted=0)
check_response(fetched_event)
self.assertEqual(len(fetched_event.attributes), 2, fetched_event)
# Include deleted
fetched_event = self.user_misp_connector.get_event(event, deleted=1)
check_response(fetched_event)
self.assertEqual(len(fetched_event.attributes), 3, fetched_event)
# Deleted only
fetched_event = self.user_misp_connector.get_event(event, deleted=2)
check_response(fetched_event)
self.assertEqual(len(fetched_event.attributes), 1, fetched_event)
# Both
fetched_event = self.user_misp_connector.get_event(event, deleted=[0, 1])
check_response(fetched_event)
self.assertEqual(len(fetched_event.attributes), 3, fetched_event)
check_response(self.user_misp_connector.delete_event(event))
def test_view_event_exclude_local_tags(self):
event = create_simple_event()
event.add_tag({"name": "local", "local": 1})
event.add_tag({"name": "global", "local": 0})
event.attributes[0].add_tag({"name": "local", "local": 1})
event.attributes[0].add_tag({"name": "global", "local": 0})
event = self.admin_misp_connector.add_event(event)
check_response(event)
event_with_local_tags = self.admin_misp_connector.get_event(event)
check_response(event_with_local_tags)
self.assertEqual(len(event_with_local_tags.tags), 2)
self.assertEqual(len(event_with_local_tags.attributes[0].tags), 2)
event_without_local_tags = self.admin_misp_connector._check_response(self.admin_misp_connector._prepare_request('GET', f'events/view/{event.id}/excludeLocalTags:1'))
check_response(event_without_local_tags)
self.assertEqual(event_without_local_tags["Event"]["Tag"][0]["local"], 0, event_without_local_tags)
self.assertEqual(event_without_local_tags["Event"]["Attribute"][0]["Tag"][0]["local"], 0, event_without_local_tags)
check_response(self.admin_misp_connector.delete_event(event))
def test_publish_alert_filter(self):
first = create_simple_event()
first.add_tag('test_publish_filter')
first.threat_level_id = ThreatLevel.medium
second = create_simple_event()
second.add_tag('test_publish_filter')
second.threat_level_id = ThreatLevel.high
third = create_simple_event()
third.add_tag('test_publish_filter')
third.threat_level_id = ThreatLevel.low
four = create_simple_event()
four.threat_level_id = ThreatLevel.high
try:
# Enable autoalert on admin
self.admin_misp_connector._current_user.autoalert = True
check_response(self.admin_misp_connector.update_user(self.admin_misp_connector._current_user))
# Set publish_alert_filter tag to `test_publish_filter`
setting_value = {'AND': {'Tag.name': 'test_publish_filter', 'ThreatLevel.name': ['High', 'Medium']}}
check_response(self.admin_misp_connector.set_user_setting('publish_alert_filter', setting_value))
# Add events
first = check_response(self.admin_misp_connector.add_event(first))
second = check_response(self.admin_misp_connector.add_event(second))
third = check_response(self.admin_misp_connector.add_event(third))
four = check_response(self.admin_misp_connector.add_event(four))
# Publish events
for event in (first, second, third, four):
publish_immediately(self.admin_misp_connector, event, with_email=True)
# Email notification should be send just to first event
mail_logs = self.admin_misp_connector.search_logs(model='User', action='email')
log_titles = [log.title for log in mail_logs]
self.assertIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(first.id) + ' - Medium - TLP:AMBER".', log_titles)
self.assertIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(second.id) + ' - High - TLP:AMBER".', log_titles)
self.assertNotIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(third.id) + ' - Low - TLP:AMBER".', log_titles)
self.assertNotIn('Email to admin@admin.test sent, titled "[ORGNAME MISP] Event ' + str(four.id) + ' - High - TLP:AMBER".', log_titles)
finally:
# Disable autoalert
self.admin_misp_connector._current_user.autoalert = False
check_response(self.admin_misp_connector.update_user(self.admin_misp_connector._current_user))
# Delete filter
self.admin_misp_connector.delete_user_setting('publish_alert_filter')
# Delete events
for event in (first, second, third, four):
check_response(self.admin_misp_connector.delete_event(event))
def test_correlations(self):
first = create_simple_event()
first.add_attribute("ip-src", "10.0.0.1")
first = check_response(self.admin_misp_connector.add_event(first))
second = create_simple_event()
second.add_attribute("ip-src", "10.0.0.1")
second = check_response(self.admin_misp_connector.add_event(second))
# Reload to get event data with related events
first = check_response(self.admin_misp_connector.get_event(first))
try:
self.assertEqual(1, len(first.RelatedEvent), first.RelatedEvent)
self.assertEqual(1, len(second.RelatedEvent), second.RelatedEvent)
except:
raise
finally:
# Delete events
for event in (first, second):
check_response(self.admin_misp_connector.delete_event(event))
def test_correlations_object(self):
first = create_simple_event()
dom_ip_obj = DomainIPObject({'ip': ['10.0.0.1']})
first.add_object(dom_ip_obj)
first = check_response(self.admin_misp_connector.add_event(first))
second = create_simple_event()
dom_ip_obj = DomainIPObject({'ip': ['10.0.0.1']})
second.add_object(dom_ip_obj)
second = check_response(self.admin_misp_connector.add_event(second))
# Reload to get event data with related events
first = check_response(self.admin_misp_connector.get_event(first))
try:
self.assertEqual(1, len(first.RelatedEvent), first.RelatedEvent)
self.assertEqual(1, len(second.RelatedEvent), second.RelatedEvent)
except:
raise
finally:
# Delete events
for event in (first, second):
check_response(self.admin_misp_connector.delete_event(event))
def test_correlations_noacl(self):
with MISPSetting(self.admin_misp_connector, {"MISP.correlation_engine": "NoAcl"}):
self.test_correlations()
self.test_correlations_object()
self.test_recorrelate()
def test_advanced_correlations(self):
with MISPSetting(self.admin_misp_connector, {"MISP.enable_advanced_correlations": True}):
first = create_simple_event()
first.add_attribute("ip-src", "10.0.0.0/8")
first = check_response(self.admin_misp_connector.add_event(first))
second = create_simple_event()
second.add_attribute("ip-src", "10.0.0.1")
second = check_response(self.admin_misp_connector.add_event(second))
# Reload to get event data with related events
first = check_response(self.admin_misp_connector.get_event(first))
try:
self.assertEqual(1, len(first.RelatedEvent), first.RelatedEvent)
self.assertEqual(1, len(second.RelatedEvent), second.RelatedEvent)
except:
raise
finally:
# Delete events
for event in (first, second):
check_response(self.admin_misp_connector.delete_event(event))
def test_remove_orphaned_correlations(self):
result = self.admin_misp_connector._check_response(self.admin_misp_connector._prepare_request('GET', 'servers/removeOrphanedCorrelations'))
check_response(result)
self.assertIn("message", result)
def test_recorrelate(self):
first = create_simple_event()
dom_ip_obj = DomainIPObject({'ip': ['10.0.0.1']})
first.add_object(dom_ip_obj)
first = check_response(self.admin_misp_connector.add_event(first))
second = create_simple_event()
dom_ip_obj = DomainIPObject({'ip': ['10.0.0.1']})
second.add_object(dom_ip_obj)
second = check_response(self.admin_misp_connector.add_event(second))
check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 0, force=True))
result = self.admin_misp_connector._check_response(self.admin_misp_connector._prepare_request('POST', 'attributes/generateCorrelation'))
check_response(result)
self.assertIn("message", result)
check_response(self.admin_misp_connector.set_server_setting('MISP.background_jobs', 1, force=True))
first = check_response(self.admin_misp_connector.get_event(first))
second = check_response(self.admin_misp_connector.get_event(second))
try:
self.assertEqual(1, len(first.RelatedEvent), first.RelatedEvent)
self.assertEqual(1, len(second.RelatedEvent), second.RelatedEvent)
except:
raise
finally:
# Delete events
for event in (first, second):
check_response(self.admin_misp_connector.delete_event(event))
def test_restsearch_event_by_tags(self):
first = create_simple_event()
first.add_tag('test_search_tag')
first.add_tag('test_search_tag_third')
first.add_tag('test_search_tag_both')
first = self.admin_misp_connector.add_event(first)
check_response(first)
second = create_simple_event()
second.add_tag('test_search_tag_second')
second.add_tag('test_search_tag_both')
second = self.admin_misp_connector.add_event(second)
check_response(second)
search_result = self.admin_misp_connector.search(metadata=True, tags=["non_exists_tag"])
self.assertEqual(0, len(search_result))
search_result = self.admin_misp_connector.search(metadata=True, tags=["test_search_tag"])
self.assertEqual(1, len(search_result))
self.assertEqual(first.id, search_result[0].id)
search_result = self.admin_misp_connector.search(metadata=True, tags="test_search_tag")
self.assertEqual(1, len(search_result))
self.assertEqual(first.id, search_result[0].id)
# Like style match
search_result = self.admin_misp_connector.search(metadata=True, tags=["test_search_tag%"])
self.assertEqual(2, len(search_result))
search_result = self.admin_misp_connector.search(metadata=True, tags=["test_search_tag_second"])
self.assertEqual(1, len(search_result))
self.assertEqual(second.id, search_result[0].id)
search_result = self.admin_misp_connector.search(metadata=True, tags=["!test_search_tag"])
search_result_ids = [event.id for event in search_result]
self.assertNotIn(first.id, search_result_ids)
self.assertIn(second.id, search_result_ids)
search_result = self.admin_misp_connector.search(metadata=True, tags={"NOT": ["test_search_tag"]})
search_result_ids = [event.id for event in search_result]
self.assertNotIn(first.id, search_result_ids)
self.assertIn(second.id, search_result_ids)
search_result = self.admin_misp_connector.search(metadata=True, tags={"NOT": "test_search_tag"})
search_result_ids = [event.id for event in search_result]
self.assertNotIn(first.id, search_result_ids)
self.assertIn(second.id, search_result_ids)
search_result = self.admin_misp_connector.search(metadata=True, tags=["test_search_tag", "test_search_tag_second"])
self.assertEqual(2, len(search_result))
search_result = self.admin_misp_connector.search(metadata=True, tags={"AND": ["test_search_tag", "test_search_tag_third"]})
self.assertEqual(1, len(search_result))
self.assertEqual(first.id, search_result[0].id)
search_result = self.admin_misp_connector.search(metadata=True, tags={"AND": ["test_search_tag", "test_search_tag_both"]})
search_result_ids = [event.id for event in search_result]
self.assertEqual(1, len(search_result_ids))
self.assertIn(first.id, search_result_ids)
check_response(self.admin_misp_connector.delete_event(first))
check_response(self.admin_misp_connector.delete_event(second))
def test_log_new_audit(self):
check_response(self.admin_misp_connector.set_server_setting('MISP.log_new_audit', 1, force=True))
event = create_simple_event()
event.add_tag('test_log_new_audit_tag')
event = check_response(self.admin_misp_connector.add_event(event))
check_response(self.admin_misp_connector.delete_event(event))
check_response(self.admin_misp_connector.set_server_setting('MISP.log_new_audit', 0, force=True))
audit_logs = self.admin_misp_connector._check_response(self.admin_misp_connector._prepare_request('GET', 'admin/audit_logs/index'))
check_response(audit_logs)
self.assertGreater(len(audit_logs), 0)
def test_add_tag_to_attachment(self):
event = create_simple_event()
with open(__file__, 'rb') as f:
event.add_attribute('attachment', value='testfile.py', data=BytesIO(f.read()))
event = check_response(self.admin_misp_connector.add_event(event))
attribute_uuids = [attribute.uuid for attribute in event.attributes if attribute.type == 'attachment']
self.assertEqual(1, len(attribute_uuids))
check_response(self.admin_misp_connector.tag(attribute_uuids[0], 'generic_tag_test'))
check_response(self.admin_misp_connector.delete_event(event))
def test_add_duplicate_tags(self):
event = create_simple_event()
event = check_response(self.admin_misp_connector.add_event(event))
# Just first tag should be added
check_response(self.admin_misp_connector.tag(event.uuid, 'generic_tag_test', local=True))
check_response(self.admin_misp_connector.tag(event.uuid, 'generic_tag_test', local=False))
fetched_event = check_response(self.admin_misp_connector.get_event(event))
self.assertEqual(1, len(fetched_event.tags), fetched_event.tags)
self.assertTrue(fetched_event.tags[0].local, fetched_event.tags[0])
def test_export(self):
event = create_simple_event()
event.add_attribute("ip-src", "1.2.4.5", to_ids=True)
event = check_response(self.admin_misp_connector.add_event(event))
result = self._search_event({'returnFormat': "openioc", 'eventid': event.id, "published": [0, 1]})
ET.fromstring(result) # check if result is valid XML
self.assertTrue("1.2.4.5" in result, result)
result = self._search_event({'returnFormat': "yara", 'eventid': event.id, "published": [0, 1]})
self.assertTrue("1.2.4.5" in result, result)
self.assertTrue("GENERATED" in result, result)
self.assertTrue("AS-IS" in result, result)
result = self._search_event({'returnFormat': "yara-json", 'eventid': event.id, "published": [0, 1]})
self.assertIn("generated", result)
self.assertEqual(len(result["generated"]), 1, result)
self.assertIn("as-is", result)
# RPZ
result = self._search_event({'returnFormat': "rpz", 'eventid': event.id, "published": [0, 1]})
self.assertTrue("32.5.4.2.1" in result, result)
result = self._search_attribute({'returnFormat': "rpz", 'eventid': event.id, "published": [0, 1]})
self.assertTrue("32.5.4.2.1" in result, result)
check_response(self.admin_misp_connector.delete_event(event))
def test_event_report_empty_name(self):
event = create_simple_event()
new_event_report = MISPEventReport()
new_event_report.name = ""
new_event_report.content = "# Example report markdown"
new_event_report.distribution = 5 # Inherit
try:
event = check_response(self.user_misp_connector.add_event(event))
new_event_report = self.user_misp_connector.add_event_report(event.id, new_event_report)
self.assertIn("errors", new_event_report)
finally:
self.user_misp_connector.delete_event(event)
def test_new_audit(self):
with MISPSetting(self.admin_misp_connector, {"MISP.log_new_audit": True}):
event = create_simple_event()
event = check_response(self.user_misp_connector.add_event(event))
self.user_misp_connector.delete_event(event)
def test_csp_report(self):
response = self.admin_misp_connector._prepare_request('POST', 'servers/cspReport', data={
"csp-report": {
"test": "test",
}
})
self.assertEqual(204, response.status_code)
def test_redacted_setting(self):
response = self.admin_misp_connector.get_server_setting('Security.salt')
self.assertEqual(403, response["errors"][0])
response = self.admin_misp_connector._prepare_request('GET', 'servers/serverSettingsEdit/Security.salt')
response = self.admin_misp_connector._check_response(response)
self.assertEqual(403, response["errors"][0])
def test_custom_warninglist(self):
warninglist = {
"Warninglist": {
"name": "Test",
"description": "Test",
"type": "cidr",
"category": "false_positive",
"matching_attributes": ["ip-src", "ip-dst"],
"entries": "1.2.3.4",
}
}
wl = request(self.admin_misp_connector, 'POST', 'warninglists/add', data=warninglist)
check_response(wl)
exported = request(self.admin_misp_connector, 'GET', f'warninglists/export/{wl["Warninglist"]["id"]}')
self.assertIn('name', exported)
self.assertEqual('Test', exported['name'])
check_response(self.admin_misp_connector.enable_warninglist(wl["Warninglist"]["id"]))
response = self.admin_misp_connector.values_in_warninglist("1.2.3.4")
self.assertEqual(wl["Warninglist"]["id"], response["1.2.3.4"][0]["id"])
warninglist["Warninglist"]["entries"] = "1.2.3.4\n2.3.4.5"
response = request(self.admin_misp_connector, 'POST', f'warninglists/edit/{wl["Warninglist"]["id"]}', data=warninglist)
check_response(response)
response = self.admin_misp_connector.values_in_warninglist("2.3.4.5")
self.assertEqual(wl["Warninglist"]["id"], response["2.3.4.5"][0]["id"])
warninglist["Warninglist"]["entries"] = "2.3.4.5"
response = request(self.admin_misp_connector, 'POST', f'warninglists/edit/{wl["Warninglist"]["id"]}', data=warninglist)
check_response(response)
response = self.admin_misp_connector.values_in_warninglist("1.2.3.4")
self.assertEqual(0, len(response))
response = self.admin_misp_connector.values_in_warninglist("2.3.4.5")
self.assertEqual(wl["Warninglist"]["id"], response["2.3.4.5"][0]["id"])
check_response(self.admin_misp_connector.disable_warninglist(wl["Warninglist"]["id"]))
response = self.admin_misp_connector.values_in_warninglist("2.3.4.5")
self.assertEqual(0, len(response))
# Update by importing
response = request(self.admin_misp_connector, 'POST', f'warninglists/import', exported)
check_response(response)
response = request(self.admin_misp_connector, 'POST', f'warninglists/delete/{wl["Warninglist"]["id"]}')
check_response(response)
# Create new warninglist by importing under different name
exported["name"] = "Test2"
response = request(self.admin_misp_connector, 'POST', f'warninglists/import', exported)
check_response(response)
response = request(self.admin_misp_connector, 'POST', f'warninglists/delete/{response["id"]}')
check_response(response)
def test_protected_event(self):
event = create_simple_event()
event = check_response(self.admin_misp_connector.add_event(event))
response = request(self.admin_misp_connector, 'POST', f'events/protect/{event.id}')
check_response(response)
response = request(self.admin_misp_connector, 'POST', f'events/unprotect/{event.uuid}')
check_response(response)
response = request(self.admin_misp_connector, 'POST', f'events/protect/{event.uuid}')
check_response(response)
response = self.admin_misp_connector._prepare_request('GET', f'events/view/{event.id}')
self.assertIn('x-pgp-signature', response.headers)
self.assertTrue(len(response.headers['x-pgp-signature']) > 0, response.headers['x-pgp-signature'])
def test_get_all_apis(self):
response = self.admin_misp_connector._prepare_request('GET', 'api/getAllApis.json')
self.assertEqual(200, response.status_code, response)
response.json()
def test_taxonomy_export(self):
response = self.admin_misp_connector._prepare_request('GET', 'taxonomies/export/1')
self.assertEqual(200, response.status_code, response)
response.json()
def test_etag(self):
headers = {
'Authorization': key.strip(),
'Accept': 'application/json',
'User-Agent': 'PyMISP',
'If-None-Match': '',
}
response = requests.get(self.admin_misp_connector.root_url + '/attributes/describeTypes.json', headers=headers)
self.assertEqual(200, response.status_code)
self.assertIn('Etag', response.headers)
self.assertTrue(len(response.headers['Etag']) > 0, response.headers['Etag'])
headers['If-None-Match'] = response.headers['Etag']
response = requests.get(self.admin_misp_connector.root_url + '/attributes/describeTypes.json', headers=headers)
self.assertEqual(304, response.status_code, response.headers)
def test_event_alert_default_enabled(self):
user = MISPUser()
user.email = 'testusr_alert_disabled@user.local'
user.org_id = self.test_org.id
created_user = check_response(self.admin_misp_connector.add_user(user))
self.assertFalse(created_user.autoalert, created_user)
self.admin_misp_connector.delete_user(created_user)
with MISPSetting(self.admin_misp_connector, {"MISP.default_publish_alert": True}):
user = MISPUser()
user.email = 'testusr_alert_enabled@user.local'
user.org_id = self.test_org.id
created_user = check_response(self.admin_misp_connector.add_user(user))
self.assertTrue(created_user.autoalert, created_user)
self.admin_misp_connector.delete_user(created_user)
def test_attribute_search(self):
request(self.admin_misp_connector, "GET", "/attributes/search/value:8.8.8.8.json")
def test_search_snort_suricata(self):
event = create_simple_event()
event.add_attribute('ip-src', '8.8.8.8', to_ids=True)
event.add_attribute('snort', 'alert tcp 192.168.1.0/24 any -> 131.171.127.1 25 (content: "hacking"; msg: "malicious packet"; sid:2000001;)', to_ids=True)
# Snort rule without msg, test for #9515
event.add_attribute('snort', 'alert tcp 192.168.1.0/24 any -> 131.171.127.1 25 (content: "hacking"; sid:2000001;)', to_ids=True)
event = check_response(self.user_misp_connector.add_event(event))
publish_immediately(self.admin_misp_connector, event)
snort = self._search_event({'returnFormat': 'snort', 'eventid': event.id})
self.assertIsInstance(snort, str)
self.assertIn('8.8.8.8', snort)
suricata = self._search_event({'returnFormat': 'suricata', 'eventid': event.id})
self.assertIsInstance(suricata, str)
self.assertIn('8.8.8.8', suricata)
self.admin_misp_connector.delete_event(event)
def test_restsearch_composite_attribute(self):
event = create_simple_event()
attribute_1 = event.add_attribute('ip-src|port', '10.0.0.1|8080')
attribute_2 = event.add_attribute('ip-src|port', '10.0.0.2|8080')
event = self.user_misp_connector.add_event(event)
check_response(event)
search_result = self._search_attribute({'value': '10.0.0.1', 'eventid': event.id})
self.assertEqual(search_result['Attribute'][0]['uuid'], attribute_1.uuid)
self.assertEqual(len(search_result['Attribute']), 1)
search_result = self._search_attribute({'value': '8080', 'eventid': event.id})
self.assertEqual(len(search_result['Attribute']), 2)
search_result = self._search_attribute({'value1': '10.0.0.1', 'eventid': event.id})
self.assertEqual(len(search_result['Attribute']), 1)
self.assertEqual(search_result['Attribute'][0]['uuid'], attribute_1.uuid)
search_result = self._search_attribute({'value1': '10.0.0.2', 'eventid': event.id})
self.assertEqual(len(search_result['Attribute']), 1)
self.assertEqual(search_result['Attribute'][0]['uuid'], attribute_2.uuid)
search_result = self._search_attribute({'value2': '8080', 'eventid': event.id})
self.assertEqual(len(search_result['Attribute']), 2)
search_result = self._search_attribute({'value1': '10.0.0.1', 'value2': '8080', 'eventid': event.id})
self.assertEqual(len(search_result['Attribute']), 1)
self.assertEqual(search_result['Attribute'][0]['uuid'], attribute_1.uuid)
self.admin_misp_connector.delete_event(event)
def test_restsearch_sightings(self):
# Create test event
event = create_simple_event()
event = self.admin_misp_connector.add_event(event)
check_response(event)
# Add sighting
sighting = MISPSighting()
sighting.value = 'test'
sighting.source = 'Testcases'
sighting.type = '1'
response = self.admin_misp_connector.add_sighting(sighting, event.attributes[0])
check_response(response)
self.assertEqual(response.source, 'Testcases')
# Try to find sighting by event UUID, this is the same type of request when doing sync
search_result = self._search_sighting('event', {
'returnFormat': 'json',
'last': 0,
'includeUuid': True,
'uuid': [event.uuid],
})
self.assertEqual(len(search_result), 1, search_result)
sighting = search_result[0]["Sighting"]
self.assertIn("attribute_uuid", sighting)
self.assertIn("event_uuid", sighting)
self.assertEqual(sighting["event_uuid"], event.uuid, search_result)
self.admin_misp_connector.delete_event(event)
def _search_event(self, query: dict):
response = self.admin_misp_connector._prepare_request('POST', 'events/restSearch', data=query)
response = self.admin_misp_connector._check_response(response)
check_response(response)
return response
def _search_attribute(self, query: dict):
response = self.admin_misp_connector._prepare_request('POST', 'attributes/restSearch', data=query)
response = self.admin_misp_connector._check_response(response)
check_response(response)
return response
def _search_sighting(self, context: str, query: dict):
response = self.admin_misp_connector._prepare_request('POST', f'sightings/restSearch/{context}', data=query)
response = self.admin_misp_connector._check_response(response)
check_response(response)
return response
class TestLastPwChange(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.admin_misp_connector = PyMISP(url, key)
organisation = MISPOrganisation()
organisation.name = 'Test org for last pw change tests'
cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True)
check_response(cls.test_org)
cls.test_org_id = cls.test_org.id
@classmethod
def tearDownClass(cls) -> None:
cls.admin_misp_connector.delete_organisation(cls.test_org)
def setUp(self) -> None:
self.admin_misp_connector = type(self).admin_misp_connector
# Create a user
user = MISPUser()
user.email = 'testusr_last_pw_change@user' + gen_random_id() + '.local' # make name always unique
user.org_id = type(self).test_org_id
user.role_id = 3 # User role
user.password = str(uuid.uuid4())
self.test_usr = self.admin_misp_connector.add_user(user, pythonify=True)
check_response(self.test_usr)
self.test_usr_misp_connector = PyMISP(url, self.test_usr.authkey)
def tearDown(self) -> None:
# Delete Authkey and user
body = {
"authkey_start": self.test_usr.authkey[0:4],
"authkey_end": self.test_usr.authkey[-4:],
"User.id": self.test_usr.id
}
auth_key = type(self).admin_misp_connector.direct_call('auth_keys', body)
check_response(auth_key)
if len(auth_key) == 1 and "AuthKey" in auth_key[0]:
type(self).admin_misp_connector.direct_call(f'auth_keys/delete/{auth_key[0]["AuthKey"]["id"]}', {})
type(self).admin_misp_connector.delete_user(self.test_usr)
def test_new_user_last_pw_change_is_date_created(self):
self.assertEqual(self.test_usr.last_pw_change, self.test_usr.date_created)
time.sleep(1)
def test_admin_edit_password_updates_last_pw_change(self):
old_last_pw_change = self.test_usr.last_pw_change
# edit user password
self.test_usr.password = uuid.uuid4()
time_just_before_update = datetime.now()
self.updated_test_usr = self.admin_misp_connector.update_user(self.test_usr, pythonify=True)
time_just_after_update = datetime.now()
check_response(self.updated_test_usr)
self.check_last_pw_change_timestamp(old_last_pw_change, time_just_before_update, time_just_after_update)
time.sleep(1)
def test_user_change_password_updates_last_pw_change(self):
old_last_pw_change = self.test_usr.last_pw_change
# edit user password
time_just_before_update = datetime.now()
change_password_result = self.test_usr_misp_connector.change_user_password(uuid.uuid4())
time_just_after_update = datetime.now()
check_response(change_password_result)
self.updated_test_usr = self.test_usr_misp_connector.get_user(pythonify=True)
check_response(self.updated_test_usr)
self.check_last_pw_change_timestamp(old_last_pw_change, time_just_before_update, time_just_after_update)
time.sleep(1)
def test_reset_user_password_updates_last_pw_change(self):
old_last_pw_change = self.test_usr.last_pw_change
# reset user password
time_just_before_update = datetime.now()
self.admin_misp_connector.direct_call(f'users/initiatePasswordReset/{self.test_usr.id}', {})
time.sleep(1)
time_just_after_update = datetime.now()
self.updated_test_usr = self.test_usr_misp_connector.get_user(pythonify=True)
check_response(self.updated_test_usr)
self.check_last_pw_change_timestamp(old_last_pw_change, time_just_before_update, time_just_after_update)
time.sleep(1)
def last_pw_change_almost_equal_to_date_modified(self):
date_modified = datetime.fromtimestamp(int(self.updated_test_usr.date_modified))
last_pw_change = datetime.fromtimestamp(int(self.updated_test_usr.last_pw_change))
return date_modified - last_pw_change < timedelta(milliseconds=5)
def last_pw_change_time_is_in_expected_range(self, time_just_before_update, time_just_after_update):
timediff_last_pw_change_now = datetime.fromtimestamp(int(self.updated_test_usr.last_pw_change)) - time_just_before_update
max_accepted_timediff = time_just_after_update - time_just_before_update
return timediff_last_pw_change_now <= max_accepted_timediff
def check_last_pw_change_timestamp(self, old_last_pw_change, time_just_before_update, time_just_after_update):
# check if new last_pw_change timestamp looks okay, starting with fact that it should be newer than previous one
# self.assertGreater(self.updated_test_usr.last_pw_change, old_last_pw_change)
# last pw change should be set to timestamp sometime between time_just_before_update and time_just_after_update
self.assertTrue(self.last_pw_change_time_is_in_expected_range(time_just_before_update, time_just_after_update))
# last_pw_change should be relatively close to date_modified
self.assertTrue(self.last_pw_change_almost_equal_to_date_modified())
def gen_random_id() -> str:
return str(uuid.uuid4()).split("-")[0]
if __name__ == '__main__':
unittest.main()