diff --git a/misp_modules/modules/import_mod/email_import.py b/misp_modules/modules/import_mod/email_import.py index e7564f1..fa7d5dc 100644 --- a/misp_modules/modules/import_mod/email_import.py +++ b/misp_modules/modules/import_mod/email_import.py @@ -5,11 +5,14 @@ import json import base64 import io import zipfile +import codecs import re from email import message_from_bytes from email.utils import parseaddr from email.iterators import typed_subpart_iterator +from email.parser import Parser from html.parser import HTMLParser +from email.header import decode_header misperrors = {'error': 'Error'} userConfig = {} @@ -38,7 +41,14 @@ def handler(q=False): request = json.loads(q) # request data is always base 64 byte encoded data = base64.b64decode(request["data"]) - message = message_from_bytes(data) + + # Double decode to force headers to be re-parsed with proper encoding + message = Parser().parsestr(message_from_bytes(data).as_string()) + # Decode any encoded headers to get at proper string + for key, val in message.items(): + replacement = get_decoded_header(key, val) + if replacement is not None: + message.replace_header(key, replacement) # Extract all header information all_headers = "" @@ -340,6 +350,36 @@ def get_charset(message, default="ascii"): return default +def get_decoded_header(header, value): + subject, encoding = decode_header(value)[0] + subject = subject.strip() # extra whitespace will mess up encoding + if isinstance(subject, bytes): + # Remove Byte Order Mark (BOM) from UTF strings + if encoding == 'utf-8': + return re.sub(codecs.BOM_UTF8, b"", subject).decode(encoding) + if encoding == 'utf-16': + return re.sub(codecs.BOM_UTF16, b"", subject).decode(encoding) + elif encoding == 'utf-32': + return re.sub(codecs.BOM_UTF32, b"", subject).decode(encoding) + # Try various UTF decodings for any unknown 8bit encodings + elif encoding == 'unknown-8bit': + for enc in [('utf-8', codecs.BOM_UTF8), + ('utf-32', codecs.BOM_UTF32), # 32 before 16 so it raises errors + ('utf-16', codecs.BOM_UTF16)]: + try: + return re.sub(enc[1], b"", subject).decode(enc[0]) + except UnicodeDecodeError: + continue + # If none of those encoding work return it in RFC2047 format + return str(subject) + # Provide RFC2047 format string if encoding is a unknown encoding + # Better to have the analyst decode themselves than to provide a mangled string + elif encoding is None: + return str(subject) + else: + return subject.decode(encoding) + + def introspection(): modulesetup = {} try: diff --git a/tests/test.py b/tests/test.py index b9e4842..5a56f5a 100644 --- a/tests/test.py +++ b/tests/test.py @@ -7,6 +7,7 @@ import base64 import json import os import io +import re import zipfile from hashlib import sha256 from email.mime.application import MIMEApplication @@ -316,7 +317,6 @@ class TestModules(unittest.TestCase): query['data'] = decode_email(message) data = json.dumps(query) response = requests.post(self.url + "query", data=data) - # print(response.json()) values = [x["values"] for x in response.json()["results"]] self.assertIn('EICAR.com', values) for i in response.json()['results']: @@ -341,10 +341,12 @@ class TestModules(unittest.TestCase): message.attach(MIMEText(text, 'html', encoding[0])) query['data'] = decode_email(message) data = json.dumps(query) - response = requests.post(self.url + "query", data=data) + response = requests.post(self.url + "query", data=data).json() + self.assertNotIn('error', response, response.get('error', "")) + self.assertIn('results', response, "No server results found.") - def test_email_header_encoding(self): + def test_email_header_proper_encoding(self): query = {"module":"email_import"} query["config"] = {"unzip_attachments": None, "guess_zip_attachment_passwords": None, @@ -358,13 +360,236 @@ class TestModules(unittest.TestCase): """ message.attach(MIMEText(text, 'plain')) for hdr, hdr_val in message.items(): - # Encoding is used as the name of the file msg = message - hdr_encoded = MIMEText(hdr_val.encode(encoding), 'plain', encoding) - msg[hdr] = Header(hdr_val, encoding) + encoded_header = hdr_val.encode(encoding) + msg.replace_header(hdr, Header(encoded_header, encoding)) query['data'] = decode_email(msg) data = json.dumps(query) response = requests.post(self.url + "query", data=data) + results = response.json()['results'] + values = [] + for x in results: + # Remove BOM from UTF-16 strings + if re.search('\ufeff', x["values"]): + values.append(re.sub('\ufeff', "", x["values"])) + else: + values.append(x["values"]) + types = {} + for i in results: + types.setdefault(i["type"], 0) + types[i["type"]] += 1 + # Check that all the items were correct + self.assertEqual(types['target-email'], 1) + self.assertIn('test@domain.com', values) + self.assertEqual(types['email-dst-display-name'], 4) + self.assertIn('Last One', values) + self.assertIn('Other Friend', values) + self.assertIn('Second Person', values) + self.assertIn('Testy Testerson', values) + self.assertEqual(types['email-dst'], 4) + self.assertIn('test@domain.com', values) + self.assertIn('second@domain.com', values) + self.assertIn('other@friend.net', values) + self.assertIn('last_one@finally.com', values) + self.assertEqual(types['email-src-display-name'], 2) + self.assertIn("Innocent Person", values) + self.assertEqual(types['email-src'], 2) + self.assertIn("evil_spoofer@example.com", values) + self.assertIn("IgnoreMeImInnocent@sender.com", values) + self.assertEqual(types['email-thread-index'], 1) + self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values) + self.assertEqual(types['email-message-id'], 1) + self.assertIn("<4988EF2D.40804@example.com>", values) + self.assertEqual(types['email-subject'], 1) + self.assertIn("Example Message", values) + self.assertEqual(types['email-header'], 1) + self.assertEqual(types['email-x-mailer'], 1) + self.assertIn("mlx 5.1.7", values) + self.assertEqual(types['email-reply-to'], 1) + self.assertIn("", values) + + self.assertIn("", values) + + def test_email_header_malformed_encoding(self): + query = {"module":"email_import"} + query["config"] = {"unzip_attachments": None, + "guess_zip_attachment_passwords": None, + "extract_urls": None} + filenames = os.listdir("tests/test_files/encodings") + for encoding in ['utf-8', 'utf-16', 'utf-32']: + message = get_base_email() + text = """I am a test e-mail + the password is NOT "this string". + That is all. + """ + message.attach(MIMEText(text, 'plain')) + for hdr, hdr_val in message.items(): + msg = message + encoded_header = hdr_val.encode(encoding) + pat = re.compile(hdr_val.encode()) + message_bytes = pat.sub(encoded_header, msg.as_bytes()) + message64 = base64.b64encode(message_bytes).decode() + query['data'] = message64 + + data = json.dumps(query) + response = requests.post(self.url + "query", data=data) + results = response.json()['results'] + values = [] + for x in results: + # Remove BOM from UTF-16 strings + if re.search('\ufeff', x["values"]): + values.append(re.sub('\ufeff', "", x["values"])) + else: + values.append(x["values"]) + types = {} + for i in results: + types.setdefault(i["type"], 0) + types[i["type"]] += 1 + # Check that all the items were correct + self.assertEqual(types['target-email'], 1) + self.assertIn('test@domain.com', values) + self.assertEqual(types['email-dst-display-name'], 4) + self.assertIn('Last One', values) + self.assertIn('Other Friend', values) + self.assertIn('Second Person', values) + self.assertIn('Testy Testerson', values) + self.assertEqual(types['email-dst'], 4) + self.assertIn('test@domain.com', values) + self.assertIn('second@domain.com', values) + self.assertIn('other@friend.net', values) + self.assertIn('last_one@finally.com', values) + self.assertEqual(types['email-src-display-name'], 2) + self.assertIn("Innocent Person", values) + self.assertEqual(types['email-src'], 2) + self.assertIn("evil_spoofer@example.com", values) + self.assertIn("IgnoreMeImInnocent@sender.com", values) + self.assertEqual(types['email-thread-index'], 1) + self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values) + self.assertEqual(types['email-message-id'], 1) + self.assertIn("<4988EF2D.40804@example.com>", values) + self.assertEqual(types['email-subject'], 1) + self.assertIn("Example Message", values) + self.assertEqual(types['email-header'], 1) + self.assertEqual(types['email-x-mailer'], 1) + self.assertIn("mlx 5.1.7", values) + self.assertEqual(types['email-reply-to'], 1) + self.assertIn("", values) + + self.assertIn("", values) + + def test_email_header_CJK_encoding(self): + query = {"module":"email_import"} + query["config"] = {"unzip_attachments": None, + "guess_zip_attachment_passwords": None, + "extract_urls": None} + # filenames = os.listdir("tests/test_files/encodings") + # for encoding in ['utf-8', 'utf-16', 'utf-32']: + message = get_base_email() + text = """I am a test e-mail + the password is NOT "this string". + That is all. + """ + message.attach(MIMEText(text, 'plain')) + japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合" + jisx213 = Header(japanese_charset, 'euc_jisx0213') + message.replace_header("Subject", jisx213) + query['data'] = decode_email(message) + data = json.dumps(query) + response = requests.post(self.url + "query", data=data) + # Parse Response + RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?=' + for i in response.json()['results']: + if i['type'] == 'email-subject': + RFC_encoding_error = "The subject was not decoded from RFC2047 format." + self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error) + self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded") + + def test_email_malformed_header_CJK_encoding(self): + query = {"module":"email_import"} + query["config"] = {"unzip_attachments": None, + "guess_zip_attachment_passwords": None, + "extract_urls": None} + # filenames = os.listdir("tests/test_files/encodings") + # for encoding in ['utf-8', 'utf-16', 'utf-32']: + message = get_base_email() + text = """I am a test e-mail + the password is NOT "this string". + That is all. + """ + message.attach(MIMEText(text, 'plain')) + japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合" + japanese_bytes = japanese_charset.encode() + message.replace_header('Subject', "{{REPLACE}}") + pat = re.compile(b'{{REPLACE}}') + message_bytes = pat.sub(japanese_bytes, message.as_bytes()) + message64 = base64.b64encode(message_bytes).decode() + query['data'] = message64 + data = json.dumps(query) + response = requests.post(self.url + "query", data=data) + # Parse Response + RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?=' + for i in response.json()['results']: + if i['type'] == 'email-subject': + RFC_encoding_error = "The subject was not decoded from RFC2047 format." + self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error) + self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded") + + def test_email_malformed_header_emoji_encoding(self): + query = {"module":"email_import"} + query["config"] = {"unzip_attachments": None, + "guess_zip_attachment_passwords": None, + "extract_urls": None} + # filenames = os.listdir("tests/test_files/encodings") + # for encoding in ['utf-8', 'utf-16', 'utf-32']: + message = get_base_email() + text = """I am a test e-mail + the password is NOT "this string". + That is all. + """ + message.attach(MIMEText(text, 'plain')) + emoji_string = "Emoji Test 👍 checking this" + emoji_bytes = emoji_string.encode() + message.replace_header('Subject', "{{EMOJI}}") + pat = re.compile(b'{{EMOJI}}') + message_bytes = pat.sub(emoji_bytes, message.as_bytes()) + message64 = base64.b64encode(message_bytes).decode() + query['data'] = message64 + data = json.dumps(query) + response = requests.post(self.url + "query", data=data) + # Parse Response + RFC_format = "=?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?=" + for i in response.json()['results']: + if i['type'] == 'email-subject': + RFC_encoding_error = "The subject was not decoded from RFC2047 format." + self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error) + self.assertEqual(emoji_string, i['values'], "Subject not properly decoded") + + def test_email_attachment_emoji_filename(self): + query = {"module": "email_import"} + query["config"] = {"unzip_attachments": None, + "guess_zip_attachment_passwords": None, + "extract_urls": None} + message = get_base_email() + text = """I am a test e-mail""" + message.attach(MIMEText(text, 'plain')) + with open("tests/EICAR.com", "rb") as fp: + eicar_mime = MIMEApplication(fp.read(), 'com') + eicar_mime.add_header('Content-Disposition', + 'attachment', + filename="Emoji Test 👍 checking this") + message.attach(eicar_mime) + query['data'] = decode_email(message) + data = json.dumps(query) + response = requests.post(self.url + "query", data=data) + values = [x["values"] for x in response.json()['results']] + self.assertIn("Emoji Test 👍 checking this", values) + for i in response.json()['results']: + if i["type"] == 'email-attachment': + self.assertEqual(i["values"], "Emoji Test 👍 checking this") + if i['type'] == 'malware-sample': + attch_data = base64.b64decode(i["data"]) + self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') + def test_email_attachment_password_in_subject(self): query = {"module": "email_import"}