mirror of https://github.com/MISP/misp-modules
792 lines
38 KiB
Python
792 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import unittest
|
|
import requests
|
|
import base64
|
|
import json
|
|
import os
|
|
import io
|
|
import re
|
|
import zipfile
|
|
from hashlib import sha256
|
|
from email.mime.application import MIMEApplication
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.header import Header
|
|
from pathlib import Path
|
|
import importlib.util
|
|
import sys
|
|
|
|
|
|
class TestModules(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.maxDiff = None
|
|
self.headers = {'Content-Type': 'application/json'}
|
|
self.url = "http://127.0.0.1:6666/"
|
|
|
|
def test_introspection(self):
|
|
response = requests.get(self.url + "modules")
|
|
print(response.json())
|
|
response.connection.close()
|
|
|
|
def test_introspection_module_init(self):
|
|
"""checks if all modules are offered through the misp-modules service"""
|
|
try:
|
|
response = requests.get(self.url + "modules")
|
|
modules_api = [module["name"] for module in response.json()]
|
|
issues_found = []
|
|
root_path = Path(__file__).resolve().parent.parent
|
|
modules_path = root_path / 'misp_modules' / 'modules'
|
|
for d in os.listdir(modules_path):
|
|
if d.startswith('__'):
|
|
continue
|
|
mod_d_path = modules_path / d
|
|
module_files = [file[:-3] for file in os.listdir(mod_d_path) if file.endswith(".py") if file not in ['__init__.py', 'testimport.py']]
|
|
for module in module_files:
|
|
if module not in modules_api:
|
|
issues_found.append(f"Missing module {module} in {d}/__init__.py.")
|
|
|
|
error_message = '\n- '.join(issues_found)
|
|
self.assertEqual(issues_found, [], f"Found issues: \n{error_message}")
|
|
finally:
|
|
response.connection.close()
|
|
|
|
def test_introspection_module_structure(self):
|
|
moduleinfo_template = {
|
|
'version': '1.0',
|
|
'author': '',
|
|
'module-type': [],
|
|
'name': '',
|
|
'description': '',
|
|
'logo': '',
|
|
'requirements': [],
|
|
'features': '',
|
|
'references': [],
|
|
'input': '',
|
|
'output': ''
|
|
}
|
|
root_path = Path(__file__).resolve().parent.parent
|
|
modules_path = root_path / 'misp_modules' / 'modules'
|
|
issues_found = []
|
|
|
|
for d in os.listdir(modules_path):
|
|
if d.startswith('__'):
|
|
continue
|
|
|
|
d_module = importlib.import_module(f"misp_modules.modules.{d}")
|
|
for module_name in d_module.__all__:
|
|
try:
|
|
module_package_name = f"misp_modules.modules.{d}.{module_name}"
|
|
module = importlib.import_module(module_package_name)
|
|
moduleinfo = module.version()
|
|
for k in moduleinfo_template.keys():
|
|
if k not in moduleinfo:
|
|
issues_found.append(f"Module {d}.{module_name}: Key {k} not in moduleinfo.")
|
|
# sys.path.remove(str(m.parent))
|
|
except Exception as e:
|
|
issues_found.append(f"Error loading {module_name}: {e}")
|
|
continue
|
|
|
|
sys.path.remove(str(root_path / 'misp_modules' / 'lib'))
|
|
error_message = '\n- '.join(issues_found)
|
|
self.assertEqual(issues_found, [], f"Found issues: \n{error_message}")
|
|
|
|
def test_cve(self):
|
|
with open('tests/bodycve.json', 'r') as f:
|
|
response = requests.post(self.url + "query", data=f.read())
|
|
expected_response = {
|
|
'results': [
|
|
{
|
|
'types': ['text'],
|
|
'values': 'Stack-based buffer overflow in Microsoft Office XP SP3, Office 2003 SP3, Office 2007 SP2, Office 2010, Office 2004 and 2008 for Mac, Office for Mac 2011, and Open XML File Format Converter for Mac allows remote attackers to execute arbitrary code via crafted RTF data, aka "RTF Stack Buffer Overflow Vulnerability."'
|
|
}
|
|
]
|
|
}
|
|
self.assertDictEqual(response.json(), expected_response)
|
|
print(response.json())
|
|
response.connection.close()
|
|
|
|
def test_invalid_cve(self):
|
|
response = requests.post(self.url + "query", data='{"module": "cve", "vulnerability": "CVE-INVALID"}')
|
|
expected_response = {
|
|
'results': [
|
|
{
|
|
'types': ['text'],
|
|
'values': 'Non existing CVE'
|
|
}
|
|
]
|
|
}
|
|
self.assertDictEqual(response.json(), expected_response)
|
|
print(response.json())
|
|
response.connection.close()
|
|
|
|
def test_dns(self):
|
|
with open('tests/body.json', 'r') as f:
|
|
response = requests.post(self.url + "query", data=f.read())
|
|
print(response.json())
|
|
response.connection.close()
|
|
with open('tests/body_timeout.json', 'r') as f:
|
|
response = requests.post(self.url + "query", data=f.read())
|
|
print(response.json())
|
|
response.connection.close()
|
|
|
|
def test_openioc(self):
|
|
with open("tests/openioc.xml", "rb") as f:
|
|
content = base64.b64encode(f.read())
|
|
data = json.dumps({"module": "openiocimport",
|
|
"data": content.decode(),
|
|
})
|
|
response = requests.post(self.url + "query", data=data).json()
|
|
print(response)
|
|
|
|
print("OpenIOC :: {}".format(response))
|
|
values = [x["values"][0] for x in response["results"]]
|
|
assert ("mrxcls.sys" in values)
|
|
assert ("mdmcpq3.PNF" in values)
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_headers(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
results = response.json()['results']
|
|
values = [x["values"] for x in results]
|
|
types = {}
|
|
for i in results:
|
|
types.setdefault(i["type"], 0)
|
|
types[i["type"]] += 1
|
|
# Check that there are the appropriate number of items
|
|
# Check that all the items were correct
|
|
self.assertEqual(types['target-email'], 1)
|
|
self.assertIn('test@domain.com', values)
|
|
self.assertEqual(types['email-dst-display-name'], 4)
|
|
self.assertIn('Last One', values)
|
|
self.assertIn('Other Friend', values)
|
|
self.assertIn('Second Person', values)
|
|
self.assertIn('Testy Testerson', values)
|
|
self.assertEqual(types['email-dst'], 4)
|
|
self.assertIn('test@domain.com', values)
|
|
self.assertIn('second@domain.com', values)
|
|
self.assertIn('other@friend.net', values)
|
|
self.assertIn('last_one@finally.com', values)
|
|
self.assertEqual(types['email-src-display-name'], 2)
|
|
self.assertIn("Innocent Person", values)
|
|
self.assertEqual(types['email-src'], 2)
|
|
self.assertIn("evil_spoofer@example.com", values)
|
|
self.assertIn("IgnoreMeImInnocent@sender.com", values)
|
|
self.assertEqual(types['email-thread-index'], 1)
|
|
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
|
|
self.assertEqual(types['email-message-id'], 1)
|
|
self.assertIn("<4988EF2D.40804@example.com>", values)
|
|
self.assertEqual(types['email-subject'], 1)
|
|
self.assertIn("Example Message", values)
|
|
self.assertEqual(types['email-header'], 1)
|
|
self.assertEqual(types['email-x-mailer'], 1)
|
|
self.assertIn("mlx 5.1.7", values)
|
|
self.assertEqual(types['email-reply-to'], 1)
|
|
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_basic(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/EICAR.com", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'com')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()['results']]
|
|
self.assertIn('EICAR.com', values)
|
|
for i in response.json()['results']:
|
|
if i["type"] == 'email-attachment':
|
|
self.assertEqual(i["values"], "EICAR.com")
|
|
if i['type'] == 'malware-sample':
|
|
attch_data = base64.b64decode(i["data"])
|
|
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_unpack(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/EICAR.com.zip", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('EICAR.com', values)
|
|
self.assertIn('EICAR.com.zip', values)
|
|
for i in response.json()['results']:
|
|
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
|
|
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
|
|
with zf.open("EICAR.com") as ec:
|
|
attch_data = ec.read()
|
|
self.assertEqual(attch_data,
|
|
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
|
|
attch_data = base64.b64decode(i["data"])
|
|
self.assertEqual(attch_data,
|
|
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_dont_unpack_compressed_doc_attachments(self):
|
|
"""Ensures that compressed
|
|
"""
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/test_files/test.docx", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="test.docx")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('test.docx', values)
|
|
types = {}
|
|
for i in response.json()['results']:
|
|
types.setdefault(i["type"], 0)
|
|
types[i["type"]] += 1
|
|
# Check that there is only one attachment in the bundle
|
|
self.assertEqual(types['malware-sample'], 1)
|
|
for i in response.json()['results']:
|
|
if i['type'] == 'malware-sample' and i["values"] == 'test.docx':
|
|
attch_data = base64.b64decode(i["data"])
|
|
filesum = sha256()
|
|
filesum.update(attch_data)
|
|
self.assertEqual(filesum.hexdigest(),
|
|
'098da5381a90d4a51e6b844c18a0fecf2e364813c2f8b317cfdc51c21f2506a5')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_unpack_with_password(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": 'true',
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/infected.zip", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('EICAR.com', values)
|
|
self.assertIn('EICAR.com.zip', values)
|
|
for i in response.json()['results']:
|
|
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
|
|
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
|
|
# Make sure password was set and still in place
|
|
self.assertRaises(RuntimeError, zf.open, "EICAR.com")
|
|
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
|
|
attch_data = base64.b64decode(i["data"])
|
|
self.assertEqual(attch_data,
|
|
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_password_in_body(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": 'true',
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a -> STRINGS <- test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/short_password.zip", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('EICAR.com', values)
|
|
for i in response.json()['results']:
|
|
if i["values"] == 'EICAR.com':
|
|
attch_data = base64.b64decode(i["data"]).decode()
|
|
self.assertEqual(attch_data,
|
|
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_password_in_body_quotes(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": 'true',
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is "a long password".
|
|
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/longer_password.zip", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('EICAR.com', values)
|
|
for i in response.json()['results']:
|
|
# Check that it could be extracted.
|
|
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
|
|
attch_data = base64.b64decode(i["data"]).decode()
|
|
self.assertEqual(attch_data,
|
|
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_password_in_html_body(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": 'true',
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
|
|
That is all.
|
|
"""
|
|
html = """\
|
|
<html>
|
|
<head></head>
|
|
<body>
|
|
<p>Hi!<br>
|
|
This is the real password?<br>
|
|
It is "a long password".
|
|
</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
message.attach(MIMEText(html, 'html'))
|
|
with open("tests/longer_password.zip", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('EICAR.com', values)
|
|
for i in response.json()['results']:
|
|
# Check that it could be extracted.
|
|
if i["values"] == 'EICAR.com':
|
|
attch_data = base64.b64decode(i["data"]).decode()
|
|
self.assertEqual(attch_data,
|
|
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_body_encoding(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
filenames = os.listdir("tests/test_files/encodings")
|
|
for fn in filenames:
|
|
message = get_base_email()
|
|
encoding = os.path.splitext(fn)
|
|
with open("tests/test_files/encodings/{0}".format(fn), "r", encoding=encoding[0]) as fp:
|
|
# Encoding is used as the name of the file
|
|
text = fp.read()
|
|
message.attach(MIMEText(text, 'html', encoding[0]))
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data).json()
|
|
self.assertNotIn('error', response, response.get('error', ""))
|
|
self.assertIn('results', response, "No server results found.")
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_header_proper_encoding(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
filenames = os.listdir("tests/test_files/encodings")
|
|
for encoding in ['utf-8', 'utf-16', 'utf-32']:
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
for hdr, hdr_val in message.items():
|
|
msg = message
|
|
encoded_header = hdr_val.encode(encoding)
|
|
msg.replace_header(hdr, Header(encoded_header, encoding))
|
|
query['data'] = decode_email(msg)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
results = response.json()['results']
|
|
values = []
|
|
for x in results:
|
|
# Remove BOM from UTF-16 strings
|
|
if re.search('\ufeff', x["values"]):
|
|
values.append(re.sub('\ufeff', "", x["values"]))
|
|
else:
|
|
values.append(x["values"])
|
|
types = {}
|
|
for i in results:
|
|
types.setdefault(i["type"], 0)
|
|
types[i["type"]] += 1
|
|
# Check that all the items were correct
|
|
self.assertEqual(types['target-email'], 1)
|
|
self.assertIn('test@domain.com', values)
|
|
self.assertEqual(types['email-dst-display-name'], 4)
|
|
self.assertIn('Last One', values)
|
|
self.assertIn('Other Friend', values)
|
|
self.assertIn('Second Person', values)
|
|
self.assertIn('Testy Testerson', values)
|
|
self.assertEqual(types['email-dst'], 4)
|
|
self.assertIn('test@domain.com', values)
|
|
self.assertIn('second@domain.com', values)
|
|
self.assertIn('other@friend.net', values)
|
|
self.assertIn('last_one@finally.com', values)
|
|
self.assertEqual(types['email-src-display-name'], 2)
|
|
self.assertIn("Innocent Person", values)
|
|
self.assertEqual(types['email-src'], 2)
|
|
self.assertIn("evil_spoofer@example.com", values)
|
|
self.assertIn("IgnoreMeImInnocent@sender.com", values)
|
|
self.assertEqual(types['email-thread-index'], 1)
|
|
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
|
|
self.assertEqual(types['email-message-id'], 1)
|
|
self.assertIn("<4988EF2D.40804@example.com>", values)
|
|
self.assertEqual(types['email-subject'], 1)
|
|
self.assertIn("Example Message", values)
|
|
self.assertEqual(types['email-header'], 1)
|
|
self.assertEqual(types['email-x-mailer'], 1)
|
|
self.assertIn("mlx 5.1.7", values)
|
|
self.assertEqual(types['email-reply-to'], 1)
|
|
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
|
|
|
|
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_header_malformed_encoding(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
filenames = os.listdir("tests/test_files/encodings")
|
|
for encoding in ['utf-8', 'utf-16', 'utf-32']:
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
for hdr, hdr_val in message.items():
|
|
msg = message
|
|
encoded_header = hdr_val.encode(encoding)
|
|
pat = re.compile(hdr_val.encode())
|
|
message_bytes = pat.sub(encoded_header, msg.as_bytes())
|
|
message64 = base64.b64encode(message_bytes).decode()
|
|
query['data'] = message64
|
|
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
results = response.json()['results']
|
|
values = []
|
|
for x in results:
|
|
# Remove BOM from UTF-16 strings
|
|
if re.search('\ufeff', x["values"]):
|
|
values.append(re.sub('\ufeff', "", x["values"]))
|
|
else:
|
|
values.append(x["values"])
|
|
types = {}
|
|
for i in results:
|
|
types.setdefault(i["type"], 0)
|
|
types[i["type"]] += 1
|
|
# Check that all the items were correct
|
|
self.assertEqual(types['target-email'], 1)
|
|
self.assertIn('test@domain.com', values)
|
|
self.assertEqual(types['email-dst-display-name'], 4)
|
|
self.assertIn('Last One', values)
|
|
self.assertIn('Other Friend', values)
|
|
self.assertIn('Second Person', values)
|
|
self.assertIn('Testy Testerson', values)
|
|
self.assertEqual(types['email-dst'], 4)
|
|
self.assertIn('test@domain.com', values)
|
|
self.assertIn('second@domain.com', values)
|
|
self.assertIn('other@friend.net', values)
|
|
self.assertIn('last_one@finally.com', values)
|
|
self.assertEqual(types['email-src-display-name'], 2)
|
|
self.assertIn("Innocent Person", values)
|
|
self.assertEqual(types['email-src'], 2)
|
|
self.assertIn("evil_spoofer@example.com", values)
|
|
self.assertIn("IgnoreMeImInnocent@sender.com", values)
|
|
self.assertEqual(types['email-thread-index'], 1)
|
|
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
|
|
self.assertEqual(types['email-message-id'], 1)
|
|
self.assertIn("<4988EF2D.40804@example.com>", values)
|
|
self.assertEqual(types['email-subject'], 1)
|
|
self.assertIn("Example Message", values)
|
|
self.assertEqual(types['email-header'], 1)
|
|
self.assertEqual(types['email-x-mailer'], 1)
|
|
self.assertIn("mlx 5.1.7", values)
|
|
self.assertEqual(types['email-reply-to'], 1)
|
|
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
|
|
|
|
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_header_CJK_encoding(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
# filenames = os.listdir("tests/test_files/encodings")
|
|
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
|
|
jisx213 = Header(japanese_charset, 'euc_jisx0213')
|
|
message.replace_header("Subject", jisx213)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
# Parse Response
|
|
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
|
|
for i in response.json()['results']:
|
|
if i['type'] == 'email-subject':
|
|
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
|
|
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
|
|
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_malformed_header_CJK_encoding(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
# filenames = os.listdir("tests/test_files/encodings")
|
|
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
|
|
japanese_bytes = japanese_charset.encode()
|
|
message.replace_header('Subject', "{{REPLACE}}")
|
|
pat = re.compile(b'{{REPLACE}}')
|
|
message_bytes = pat.sub(japanese_bytes, message.as_bytes())
|
|
message64 = base64.b64encode(message_bytes).decode()
|
|
query['data'] = message64
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
# Parse Response
|
|
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
|
|
for i in response.json()['results']:
|
|
if i['type'] == 'email-subject':
|
|
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
|
|
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
|
|
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_malformed_header_emoji_encoding(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
# filenames = os.listdir("tests/test_files/encodings")
|
|
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
emoji_string = "Emoji Test 👍 checking this"
|
|
emoji_bytes = emoji_string.encode()
|
|
message.replace_header('Subject', "{{EMOJI}}")
|
|
pat = re.compile(b'{{EMOJI}}')
|
|
message_bytes = pat.sub(emoji_bytes, message.as_bytes())
|
|
message64 = base64.b64encode(message_bytes).decode()
|
|
query['data'] = message64
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
# Parse Response
|
|
RFC_format = "=?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?="
|
|
for i in response.json()['results']:
|
|
if i['type'] == 'email-subject':
|
|
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
|
|
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
|
|
self.assertEqual(emoji_string, i['values'], "Subject not properly decoded")
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_emoji_filename(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/EICAR.com", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'com')
|
|
eicar_mime.add_header('Content-Disposition',
|
|
'attachment',
|
|
filename="Emoji Test 👍 checking this")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()['results']]
|
|
self.assertIn("Emoji Test 👍 checking this", values)
|
|
for i in response.json()['results']:
|
|
if i["type"] == 'email-attachment':
|
|
self.assertEqual(i["values"], "Emoji Test 👍 checking this")
|
|
if i['type'] == 'malware-sample':
|
|
attch_data = base64.b64decode(i["data"])
|
|
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_attachment_password_in_subject(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": "true",
|
|
"guess_zip_attachment_passwords": 'true',
|
|
"extract_urls": None}
|
|
message = get_base_email()
|
|
message.replace_header("Subject", 'I contain the -> "a long password" <- that is the password')
|
|
text = """I am a test e-mail
|
|
the password is NOT "this string".
|
|
|
|
That is all.
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
with open("tests/longer_password.zip", "rb") as fp:
|
|
eicar_mime = MIMEApplication(fp.read(), 'zip')
|
|
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
|
|
message.attach(eicar_mime)
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn('EICAR.com', values)
|
|
self.assertIn('I contain the -> "a long password" <- that is the password', values)
|
|
for i in response.json()['results']:
|
|
# Check that it could be extracted.
|
|
if i["values"] == 'EICAR.com':
|
|
attch_data = base64.b64decode(i["data"]).decode()
|
|
self.assertEqual(attch_data,
|
|
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
|
|
|
@unittest.skip("Need Rewrite")
|
|
def test_email_extract_html_body_urls(self):
|
|
query = {"module": "email_import"}
|
|
query["config"] = {"unzip_attachments": None,
|
|
"guess_zip_attachment_passwords": None,
|
|
"extract_urls": "true"}
|
|
message = get_base_email()
|
|
text = """I am a test e-mail
|
|
|
|
That is all.
|
|
"""
|
|
html = """\
|
|
<html>
|
|
<head></head>
|
|
<body>
|
|
<p>Hi!<br>
|
|
<p>MISP modules are autonomous modules that can be used for expansion and other services in <a href="https://github.com/MISP/MISP">MISP</a>.</p>
|
|
<p>The modules are written in Python 3 following a simple API interface. The objective is to ease the extensions of MISP functionalities
|
|
without modifying core components. The API is available via a simple REST API which is independent from MISP installation or configuration.</p>
|
|
<p>MISP modules support is included in MISP starting from version 2.4.28.</p>
|
|
<p>For more information: <a href="https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf">Extending MISP with Python modules</a> slides from MISP training.</p>
|
|
</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
message.attach(MIMEText(text, 'plain'))
|
|
message.attach(MIMEText(html, 'html'))
|
|
query['data'] = decode_email(message)
|
|
data = json.dumps(query)
|
|
response = requests.post(self.url + "query", data=data)
|
|
# print(response.json())
|
|
values = [x["values"] for x in response.json()["results"]]
|
|
self.assertIn("https://github.com/MISP/MISP", values)
|
|
self.assertIn("https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf", values)
|
|
|
|
# def test_domaintools(self):
|
|
# query = {'config': {'username': 'test_user', 'api_key': 'test_key'}, 'module': 'domaintools', 'domain': 'domaintools.com'}
|
|
# try:
|
|
# response = requests.post(self.url + "query", data=json.dumps(query)).json()
|
|
# except:
|
|
# pass
|
|
# response = requests.post(self.url + "query", data=json.dumps(query)).json()
|
|
# print(response)
|
|
|
|
|
|
def decode_email(message):
|
|
message64 = base64.b64encode(message.as_bytes()).decode()
|
|
return message64
|
|
|
|
|
|
def get_base_email():
|
|
headers = {"Received": "via dmail-2008.19 for +INBOX; Tue, 3 Feb 2009 19:29:12 -0600 (CST)",
|
|
"Received": "from abc.luxsci.com ([10.10.10.10]) by xyz.luxsci.com (8.13.7/8.13.7) with ESMTP id n141TCa7022588 for <test@domain.com>; Tue, 3 Feb 2009 19:29:12 -0600",
|
|
"Received": "from [192.168.0.3] (verizon.net [44.44.44.44]) (user=test@sender.com mech=PLAIN bits=2) by abc.luxsci.com (8.13.7/8.13.7) with ESMTP id n141SAfo021855 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=NOT) for <test@domain.com>; Tue, 3 Feb 2009 19:28:10 -0600",
|
|
"X-Received": "by 192.168.0.45 with SMTP id q4mr156123401yw1g.911.1912342394963; Tue, 3 Feb 2009 19:32:15 -0600 (PST)",
|
|
"Message-ID": "<4988EF2D.40804@example.com>",
|
|
"Date": "Tue, 03 Feb 2009 20:28:13 -0500",
|
|
"From": '"Innocent Person" <IgnoreMeImInnocent@sender.com>',
|
|
"User-Agent": 'Thunderbird 2.0.0.19 (Windows/20081209)',
|
|
"Sender": '"Malicious MailAgent" <mailagent@example.com>',
|
|
"References": "<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>",
|
|
"In-Reply-To": "<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>",
|
|
"Accept-Language": 'en-US',
|
|
"X-Mailer": 'mlx 5.1.7',
|
|
"Return-Path": "evil_spoofer@example.com",
|
|
"Thread-Topic": 'This is a thread.',
|
|
"Thread-Index": 'AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==',
|
|
"Content-Language": 'en-US',
|
|
"To": '"Testy Testerson" <test@domain.com>',
|
|
"Cc": '"Second Person" <second@domain.com>, "Other Friend" <other@friend.net>, "Last One" <last_one@finally.com>',
|
|
"Subject": 'Example Message',
|
|
"MIME-Version": '1.0'}
|
|
msg = MIMEMultipart()
|
|
for key, val in headers.items():
|
|
msg.add_header(key, val)
|
|
return msg
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|