misp-modules/tests/test.py

792 lines
38 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import unittest
import requests
import base64
import json
import os
import io
import re
import zipfile
from hashlib import sha256
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from pathlib import Path
import importlib.util
import sys
class TestModules(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.headers = {'Content-Type': 'application/json'}
self.url = "http://127.0.0.1:6666/"
def test_introspection(self):
response = requests.get(self.url + "modules")
print(response.json())
response.connection.close()
def test_introspection_module_init(self):
"""checks if all modules are offered through the misp-modules service"""
try:
response = requests.get(self.url + "modules")
modules_api = [module["name"] for module in response.json()]
issues_found = []
root_path = Path(__file__).resolve().parent.parent
modules_path = root_path / 'misp_modules' / 'modules'
for d in os.listdir(modules_path):
if d.startswith('__'):
continue
mod_d_path = modules_path / d
module_files = [file[:-3] for file in os.listdir(mod_d_path) if file.endswith(".py") if file not in ['__init__.py', 'testimport.py']]
for module in module_files:
if module not in modules_api:
issues_found.append(f"Missing module {module} in {d}/__init__.py.")
error_message = '\n- '.join(issues_found)
self.assertEqual(issues_found, [], f"Found issues: \n{error_message}")
finally:
response.connection.close()
def test_introspection_module_structure(self):
moduleinfo_template = {
'version': '1.0',
'author': '',
'module-type': [],
'name': '',
'description': '',
'logo': '',
'requirements': [],
'features': '',
'references': [],
'input': '',
'output': ''
}
root_path = Path(__file__).resolve().parent.parent
modules_path = root_path / 'misp_modules' / 'modules'
issues_found = []
for d in os.listdir(modules_path):
if d.startswith('__'):
continue
d_module = importlib.import_module(f"misp_modules.modules.{d}")
for module_name in d_module.__all__:
try:
module_package_name = f"misp_modules.modules.{d}.{module_name}"
module = importlib.import_module(module_package_name)
moduleinfo = module.version()
for k in moduleinfo_template.keys():
if k not in moduleinfo:
issues_found.append(f"Module {d}.{module_name}: Key {k} not in moduleinfo.")
# sys.path.remove(str(m.parent))
except Exception as e:
issues_found.append(f"Error loading {module_name}: {e}")
continue
sys.path.remove(str(root_path / 'misp_modules' / 'lib'))
error_message = '\n- '.join(issues_found)
self.assertEqual(issues_found, [], f"Found issues: \n{error_message}")
def test_cve(self):
with open('tests/bodycve.json', 'r') as f:
response = requests.post(self.url + "query", data=f.read())
expected_response = {
'results': [
{
'types': ['text'],
'values': 'Stack-based buffer overflow in Microsoft Office XP SP3, Office 2003 SP3, Office 2007 SP2, Office 2010, Office 2004 and 2008 for Mac, Office for Mac 2011, and Open XML File Format Converter for Mac allows remote attackers to execute arbitrary code via crafted RTF data, aka "RTF Stack Buffer Overflow Vulnerability."'
}
]
}
self.assertDictEqual(response.json(), expected_response)
print(response.json())
response.connection.close()
def test_invalid_cve(self):
response = requests.post(self.url + "query", data='{"module": "cve", "vulnerability": "CVE-INVALID"}')
expected_response = {
'results': [
{
'types': ['text'],
'values': 'Non existing CVE'
}
]
}
self.assertDictEqual(response.json(), expected_response)
print(response.json())
response.connection.close()
def test_dns(self):
with open('tests/body.json', 'r') as f:
response = requests.post(self.url + "query", data=f.read())
print(response.json())
response.connection.close()
with open('tests/body_timeout.json', 'r') as f:
response = requests.post(self.url + "query", data=f.read())
print(response.json())
response.connection.close()
def test_openioc(self):
with open("tests/openioc.xml", "rb") as f:
content = base64.b64encode(f.read())
data = json.dumps({"module": "openiocimport",
"data": content.decode(),
})
response = requests.post(self.url + "query", data=data).json()
print(response)
print("OpenIOC :: {}".format(response))
values = [x["values"][0] for x in response["results"]]
assert ("mrxcls.sys" in values)
assert ("mdmcpq3.PNF" in values)
@unittest.skip("Need Rewrite")
def test_email_headers(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
results = response.json()['results']
values = [x["values"] for x in results]
types = {}
for i in results:
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that there are the appropriate number of items
# Check that all the items were correct
self.assertEqual(types['target-email'], 1)
self.assertIn('test@domain.com', values)
self.assertEqual(types['email-dst-display-name'], 4)
self.assertIn('Last One', values)
self.assertIn('Other Friend', values)
self.assertIn('Second Person', values)
self.assertIn('Testy Testerson', values)
self.assertEqual(types['email-dst'], 4)
self.assertIn('test@domain.com', values)
self.assertIn('second@domain.com', values)
self.assertIn('other@friend.net', values)
self.assertIn('last_one@finally.com', values)
self.assertEqual(types['email-src-display-name'], 2)
self.assertIn("Innocent Person", values)
self.assertEqual(types['email-src'], 2)
self.assertIn("evil_spoofer@example.com", values)
self.assertIn("IgnoreMeImInnocent@sender.com", values)
self.assertEqual(types['email-thread-index'], 1)
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
self.assertEqual(types['email-message-id'], 1)
self.assertIn("<4988EF2D.40804@example.com>", values)
self.assertEqual(types['email-subject'], 1)
self.assertIn("Example Message", values)
self.assertEqual(types['email-header'], 1)
self.assertEqual(types['email-x-mailer'], 1)
self.assertIn("mlx 5.1.7", values)
self.assertEqual(types['email-reply-to'], 1)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
@unittest.skip("Need Rewrite")
def test_email_attachment_basic(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/EICAR.com", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'com')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()['results']]
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
if i["type"] == 'email-attachment':
self.assertEqual(i["values"], "EICAR.com")
if i['type'] == 'malware-sample':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_attachment_unpack(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": None,
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/EICAR.com.zip", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
self.assertIn('EICAR.com.zip', values)
for i in response.json()['results']:
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
with zf.open("EICAR.com") as ec:
attch_data = ec.read()
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_dont_unpack_compressed_doc_attachments(self):
"""Ensures that compressed
"""
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": None,
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/test_files/test.docx", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="test.docx")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('test.docx', values)
types = {}
for i in response.json()['results']:
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that there is only one attachment in the bundle
self.assertEqual(types['malware-sample'], 1)
for i in response.json()['results']:
if i['type'] == 'malware-sample' and i["values"] == 'test.docx':
attch_data = base64.b64decode(i["data"])
filesum = sha256()
filesum.update(attch_data)
self.assertEqual(filesum.hexdigest(),
'098da5381a90d4a51e6b844c18a0fecf2e364813c2f8b317cfdc51c21f2506a5')
@unittest.skip("Need Rewrite")
def test_email_attachment_unpack_with_password(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": 'true',
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/infected.zip", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
self.assertIn('EICAR.com.zip', values)
for i in response.json()['results']:
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
# Make sure password was set and still in place
self.assertRaises(RuntimeError, zf.open, "EICAR.com")
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_attachment_password_in_body(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": 'true',
"extract_urls": None}
message = get_base_email()
text = """I am a -> STRINGS <- test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/short_password.zip", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
if i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"]).decode()
self.assertEqual(attch_data,
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_attachment_password_in_body_quotes(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": 'true',
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail
the password is "a long password".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
with open("tests/longer_password.zip", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
# Check that it could be extracted.
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"]).decode()
self.assertEqual(attch_data,
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_attachment_password_in_html_body(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": 'true',
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
html = """\
<html>
<head></head>
<body>
<p>Hi!<br>
This is the real password?<br>
It is "a long password".
</p>
</body>
</html>
"""
message.attach(MIMEText(text, 'plain'))
message.attach(MIMEText(html, 'html'))
with open("tests/longer_password.zip", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
# Check that it could be extracted.
if i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"]).decode()
self.assertEqual(attch_data,
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_body_encoding(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
filenames = os.listdir("tests/test_files/encodings")
for fn in filenames:
message = get_base_email()
encoding = os.path.splitext(fn)
with open("tests/test_files/encodings/{0}".format(fn), "r", encoding=encoding[0]) as fp:
# Encoding is used as the name of the file
text = fp.read()
message.attach(MIMEText(text, 'html', encoding[0]))
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data).json()
self.assertNotIn('error', response, response.get('error', ""))
self.assertIn('results', response, "No server results found.")
@unittest.skip("Need Rewrite")
def test_email_header_proper_encoding(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
filenames = os.listdir("tests/test_files/encodings")
for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
for hdr, hdr_val in message.items():
msg = message
encoded_header = hdr_val.encode(encoding)
msg.replace_header(hdr, Header(encoded_header, encoding))
query['data'] = decode_email(msg)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
results = response.json()['results']
values = []
for x in results:
# Remove BOM from UTF-16 strings
if re.search('\ufeff', x["values"]):
values.append(re.sub('\ufeff', "", x["values"]))
else:
values.append(x["values"])
types = {}
for i in results:
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that all the items were correct
self.assertEqual(types['target-email'], 1)
self.assertIn('test@domain.com', values)
self.assertEqual(types['email-dst-display-name'], 4)
self.assertIn('Last One', values)
self.assertIn('Other Friend', values)
self.assertIn('Second Person', values)
self.assertIn('Testy Testerson', values)
self.assertEqual(types['email-dst'], 4)
self.assertIn('test@domain.com', values)
self.assertIn('second@domain.com', values)
self.assertIn('other@friend.net', values)
self.assertIn('last_one@finally.com', values)
self.assertEqual(types['email-src-display-name'], 2)
self.assertIn("Innocent Person", values)
self.assertEqual(types['email-src'], 2)
self.assertIn("evil_spoofer@example.com", values)
self.assertIn("IgnoreMeImInnocent@sender.com", values)
self.assertEqual(types['email-thread-index'], 1)
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
self.assertEqual(types['email-message-id'], 1)
self.assertIn("<4988EF2D.40804@example.com>", values)
self.assertEqual(types['email-subject'], 1)
self.assertIn("Example Message", values)
self.assertEqual(types['email-header'], 1)
self.assertEqual(types['email-x-mailer'], 1)
self.assertIn("mlx 5.1.7", values)
self.assertEqual(types['email-reply-to'], 1)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
@unittest.skip("Need Rewrite")
def test_email_header_malformed_encoding(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
filenames = os.listdir("tests/test_files/encodings")
for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
for hdr, hdr_val in message.items():
msg = message
encoded_header = hdr_val.encode(encoding)
pat = re.compile(hdr_val.encode())
message_bytes = pat.sub(encoded_header, msg.as_bytes())
message64 = base64.b64encode(message_bytes).decode()
query['data'] = message64
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
results = response.json()['results']
values = []
for x in results:
# Remove BOM from UTF-16 strings
if re.search('\ufeff', x["values"]):
values.append(re.sub('\ufeff', "", x["values"]))
else:
values.append(x["values"])
types = {}
for i in results:
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that all the items were correct
self.assertEqual(types['target-email'], 1)
self.assertIn('test@domain.com', values)
self.assertEqual(types['email-dst-display-name'], 4)
self.assertIn('Last One', values)
self.assertIn('Other Friend', values)
self.assertIn('Second Person', values)
self.assertIn('Testy Testerson', values)
self.assertEqual(types['email-dst'], 4)
self.assertIn('test@domain.com', values)
self.assertIn('second@domain.com', values)
self.assertIn('other@friend.net', values)
self.assertIn('last_one@finally.com', values)
self.assertEqual(types['email-src-display-name'], 2)
self.assertIn("Innocent Person", values)
self.assertEqual(types['email-src'], 2)
self.assertIn("evil_spoofer@example.com", values)
self.assertIn("IgnoreMeImInnocent@sender.com", values)
self.assertEqual(types['email-thread-index'], 1)
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
self.assertEqual(types['email-message-id'], 1)
self.assertIn("<4988EF2D.40804@example.com>", values)
self.assertEqual(types['email-subject'], 1)
self.assertIn("Example Message", values)
self.assertEqual(types['email-header'], 1)
self.assertEqual(types['email-x-mailer'], 1)
self.assertIn("mlx 5.1.7", values)
self.assertEqual(types['email-reply-to'], 1)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
@unittest.skip("Need Rewrite")
def test_email_header_CJK_encoding(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
jisx213 = Header(japanese_charset, 'euc_jisx0213')
message.replace_header("Subject", jisx213)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# Parse Response
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
for i in response.json()['results']:
if i['type'] == 'email-subject':
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
@unittest.skip("Need Rewrite")
def test_email_malformed_header_CJK_encoding(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
japanese_bytes = japanese_charset.encode()
message.replace_header('Subject', "{{REPLACE}}")
pat = re.compile(b'{{REPLACE}}')
message_bytes = pat.sub(japanese_bytes, message.as_bytes())
message64 = base64.b64encode(message_bytes).decode()
query['data'] = message64
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# Parse Response
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
for i in response.json()['results']:
if i['type'] == 'email-subject':
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
@unittest.skip("Need Rewrite")
def test_email_malformed_header_emoji_encoding(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
emoji_string = "Emoji Test 👍 checking this"
emoji_bytes = emoji_string.encode()
message.replace_header('Subject', "{{EMOJI}}")
pat = re.compile(b'{{EMOJI}}')
message_bytes = pat.sub(emoji_bytes, message.as_bytes())
message64 = base64.b64encode(message_bytes).decode()
query['data'] = message64
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# Parse Response
RFC_format = "=?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?="
for i in response.json()['results']:
if i['type'] == 'email-subject':
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
self.assertEqual(emoji_string, i['values'], "Subject not properly decoded")
@unittest.skip("Need Rewrite")
def test_email_attachment_emoji_filename(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/EICAR.com", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'com')
eicar_mime.add_header('Content-Disposition',
'attachment',
filename="Emoji Test 👍 checking this")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()['results']]
self.assertIn("Emoji Test 👍 checking this", values)
for i in response.json()['results']:
if i["type"] == 'email-attachment':
self.assertEqual(i["values"], "Emoji Test 👍 checking this")
if i['type'] == 'malware-sample':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_attachment_password_in_subject(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",
"guess_zip_attachment_passwords": 'true',
"extract_urls": None}
message = get_base_email()
message.replace_header("Subject", 'I contain the -> "a long password" <- that is the password')
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
with open("tests/longer_password.zip", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'zip')
eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
self.assertIn('I contain the -> "a long password" <- that is the password', values)
for i in response.json()['results']:
# Check that it could be extracted.
if i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"]).decode()
self.assertEqual(attch_data,
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@unittest.skip("Need Rewrite")
def test_email_extract_html_body_urls(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": "true"}
message = get_base_email()
text = """I am a test e-mail
That is all.
"""
html = """\
<html>
<head></head>
<body>
<p>Hi!<br>
<p>MISP modules are autonomous modules that can be used for expansion and other services in <a href="https://github.com/MISP/MISP">MISP</a>.</p>
<p>The modules are written in Python 3 following a simple API interface. The objective is to ease the extensions of MISP functionalities
without modifying core components. The API is available via a simple REST API which is independent from MISP installation or configuration.</p>
<p>MISP modules support is included in MISP starting from version 2.4.28.</p>
<p>For more information: <a href="https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf">Extending MISP with Python modules</a> slides from MISP training.</p>
</p>
</body>
</html>
"""
message.attach(MIMEText(text, 'plain'))
message.attach(MIMEText(html, 'html'))
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# print(response.json())
values = [x["values"] for x in response.json()["results"]]
self.assertIn("https://github.com/MISP/MISP", values)
self.assertIn("https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf", values)
# def test_domaintools(self):
# query = {'config': {'username': 'test_user', 'api_key': 'test_key'}, 'module': 'domaintools', 'domain': 'domaintools.com'}
# try:
# response = requests.post(self.url + "query", data=json.dumps(query)).json()
# except:
# pass
# response = requests.post(self.url + "query", data=json.dumps(query)).json()
# print(response)
def decode_email(message):
message64 = base64.b64encode(message.as_bytes()).decode()
return message64
def get_base_email():
headers = {"Received": "via dmail-2008.19 for +INBOX; Tue, 3 Feb 2009 19:29:12 -0600 (CST)",
"Received": "from abc.luxsci.com ([10.10.10.10]) by xyz.luxsci.com (8.13.7/8.13.7) with ESMTP id n141TCa7022588 for <test@domain.com>; Tue, 3 Feb 2009 19:29:12 -0600",
"Received": "from [192.168.0.3] (verizon.net [44.44.44.44]) (user=test@sender.com mech=PLAIN bits=2) by abc.luxsci.com (8.13.7/8.13.7) with ESMTP id n141SAfo021855 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=NOT) for <test@domain.com>; Tue, 3 Feb 2009 19:28:10 -0600",
"X-Received": "by 192.168.0.45 with SMTP id q4mr156123401yw1g.911.1912342394963; Tue, 3 Feb 2009 19:32:15 -0600 (PST)",
"Message-ID": "<4988EF2D.40804@example.com>",
"Date": "Tue, 03 Feb 2009 20:28:13 -0500",
"From": '"Innocent Person" <IgnoreMeImInnocent@sender.com>',
"User-Agent": 'Thunderbird 2.0.0.19 (Windows/20081209)',
"Sender": '"Malicious MailAgent" <mailagent@example.com>',
"References": "<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>",
"In-Reply-To": "<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>",
"Accept-Language": 'en-US',
"X-Mailer": 'mlx 5.1.7',
"Return-Path": "evil_spoofer@example.com",
"Thread-Topic": 'This is a thread.',
"Thread-Index": 'AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==',
"Content-Language": 'en-US',
"To": '"Testy Testerson" <test@domain.com>',
"Cc": '"Second Person" <second@domain.com>, "Other Friend" <other@friend.net>, "Last One" <last_one@finally.com>',
"Subject": 'Example Message',
"MIME-Version": '1.0'}
msg = MIMEMultipart()
for key, val in headers.items():
msg.add_header(key, val)
return msg
if __name__ == '__main__':
unittest.main()