Merge pull request #129 from seamustuohy/utf_hate

Added support for malformed internationalized email headers
pull/133/head
Raphaël Vinot 2017-07-18 10:06:08 +02:00 committed by GitHub
commit 4c2cda9903
5 changed files with 478 additions and 3 deletions

View File

@ -5,11 +5,14 @@ import json
import base64
import io
import zipfile
import codecs
import re
from email import message_from_bytes
from email.utils import parseaddr
from email.iterators import typed_subpart_iterator
from email.parser import Parser
from html.parser import HTMLParser
from email.header import decode_header
misperrors = {'error': 'Error'}
userConfig = {}
@ -38,7 +41,14 @@ def handler(q=False):
request = json.loads(q)
# request data is always base 64 byte encoded
data = base64.b64decode(request["data"])
message = message_from_bytes(data)
# Double decode to force headers to be re-parsed with proper encoding
message = Parser().parsestr(message_from_bytes(data).as_string())
# Decode any encoded headers to get at proper string
for key, val in message.items():
replacement = get_decoded_header(key, val)
if replacement is not None:
message.replace_header(key, replacement)
# Extract all header information
all_headers = ""
@ -340,6 +350,36 @@ def get_charset(message, default="ascii"):
return default
def get_decoded_header(header, value):
subject, encoding = decode_header(value)[0]
subject = subject.strip() # extra whitespace will mess up encoding
if isinstance(subject, bytes):
# Remove Byte Order Mark (BOM) from UTF strings
if encoding == 'utf-8':
return re.sub(codecs.BOM_UTF8, b"", subject).decode(encoding)
if encoding == 'utf-16':
return re.sub(codecs.BOM_UTF16, b"", subject).decode(encoding)
elif encoding == 'utf-32':
return re.sub(codecs.BOM_UTF32, b"", subject).decode(encoding)
# Try various UTF decodings for any unknown 8bit encodings
elif encoding == 'unknown-8bit':
for enc in [('utf-8', codecs.BOM_UTF8),
('utf-32', codecs.BOM_UTF32), # 32 before 16 so it raises errors
('utf-16', codecs.BOM_UTF16)]:
try:
return re.sub(enc[1], b"", subject).decode(enc[0])
except UnicodeDecodeError:
continue
# If none of those encoding work return it in RFC2047 format
return str(subject)
# Provide RFC2047 format string if encoding is a unknown encoding
# Better to have the analyst decode themselves than to provide a mangled string
elif encoding is None:
return str(subject)
else:
return subject.decode(encoding)
def introspection():
modulesetup = {}
try:

View File

@ -5,13 +5,15 @@ import unittest
import requests
import base64
import json
import os
import io
import re
import zipfile
from hashlib import sha256
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
class TestModules(unittest.TestCase):
@ -315,7 +317,6 @@ class TestModules(unittest.TestCase):
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# print(response.json())
values = [x["values"] for x in response.json()["results"]]
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
@ -325,6 +326,271 @@ class TestModules(unittest.TestCase):
self.assertEqual(attch_data,
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
def test_email_body_encoding(self):
query = {"module":"email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
filenames = os.listdir("tests/test_files/encodings")
for fn in filenames:
message = get_base_email()
encoding = os.path.splitext(fn)
with open("tests/test_files/encodings/{0}".format(fn), "r", encoding=encoding[0]) as fp:
# Encoding is used as the name of the file
text = fp.read()
message.attach(MIMEText(text, 'html', encoding[0]))
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data).json()
self.assertNotIn('error', response, response.get('error', ""))
self.assertIn('results', response, "No server results found.")
def test_email_header_proper_encoding(self):
query = {"module":"email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
filenames = os.listdir("tests/test_files/encodings")
for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
for hdr, hdr_val in message.items():
msg = message
encoded_header = hdr_val.encode(encoding)
msg.replace_header(hdr, Header(encoded_header, encoding))
query['data'] = decode_email(msg)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
results = response.json()['results']
values = []
for x in results:
# Remove BOM from UTF-16 strings
if re.search('\ufeff', x["values"]):
values.append(re.sub('\ufeff', "", x["values"]))
else:
values.append(x["values"])
types = {}
for i in results:
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that all the items were correct
self.assertEqual(types['target-email'], 1)
self.assertIn('test@domain.com', values)
self.assertEqual(types['email-dst-display-name'], 4)
self.assertIn('Last One', values)
self.assertIn('Other Friend', values)
self.assertIn('Second Person', values)
self.assertIn('Testy Testerson', values)
self.assertEqual(types['email-dst'], 4)
self.assertIn('test@domain.com', values)
self.assertIn('second@domain.com', values)
self.assertIn('other@friend.net', values)
self.assertIn('last_one@finally.com', values)
self.assertEqual(types['email-src-display-name'], 2)
self.assertIn("Innocent Person", values)
self.assertEqual(types['email-src'], 2)
self.assertIn("evil_spoofer@example.com", values)
self.assertIn("IgnoreMeImInnocent@sender.com", values)
self.assertEqual(types['email-thread-index'], 1)
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
self.assertEqual(types['email-message-id'], 1)
self.assertIn("<4988EF2D.40804@example.com>", values)
self.assertEqual(types['email-subject'], 1)
self.assertIn("Example Message", values)
self.assertEqual(types['email-header'], 1)
self.assertEqual(types['email-x-mailer'], 1)
self.assertIn("mlx 5.1.7", values)
self.assertEqual(types['email-reply-to'], 1)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
def test_email_header_malformed_encoding(self):
query = {"module":"email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
filenames = os.listdir("tests/test_files/encodings")
for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
for hdr, hdr_val in message.items():
msg = message
encoded_header = hdr_val.encode(encoding)
pat = re.compile(hdr_val.encode())
message_bytes = pat.sub(encoded_header, msg.as_bytes())
message64 = base64.b64encode(message_bytes).decode()
query['data'] = message64
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
results = response.json()['results']
values = []
for x in results:
# Remove BOM from UTF-16 strings
if re.search('\ufeff', x["values"]):
values.append(re.sub('\ufeff', "", x["values"]))
else:
values.append(x["values"])
types = {}
for i in results:
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that all the items were correct
self.assertEqual(types['target-email'], 1)
self.assertIn('test@domain.com', values)
self.assertEqual(types['email-dst-display-name'], 4)
self.assertIn('Last One', values)
self.assertIn('Other Friend', values)
self.assertIn('Second Person', values)
self.assertIn('Testy Testerson', values)
self.assertEqual(types['email-dst'], 4)
self.assertIn('test@domain.com', values)
self.assertIn('second@domain.com', values)
self.assertIn('other@friend.net', values)
self.assertIn('last_one@finally.com', values)
self.assertEqual(types['email-src-display-name'], 2)
self.assertIn("Innocent Person", values)
self.assertEqual(types['email-src'], 2)
self.assertIn("evil_spoofer@example.com", values)
self.assertIn("IgnoreMeImInnocent@sender.com", values)
self.assertEqual(types['email-thread-index'], 1)
self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
self.assertEqual(types['email-message-id'], 1)
self.assertIn("<4988EF2D.40804@example.com>", values)
self.assertEqual(types['email-subject'], 1)
self.assertIn("Example Message", values)
self.assertEqual(types['email-header'], 1)
self.assertEqual(types['email-x-mailer'], 1)
self.assertIn("mlx 5.1.7", values)
self.assertEqual(types['email-reply-to'], 1)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
def test_email_header_CJK_encoding(self):
query = {"module":"email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
jisx213 = Header(japanese_charset, 'euc_jisx0213')
message.replace_header("Subject", jisx213)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# Parse Response
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
for i in response.json()['results']:
if i['type'] == 'email-subject':
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
def test_email_malformed_header_CJK_encoding(self):
query = {"module":"email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
japanese_bytes = japanese_charset.encode()
message.replace_header('Subject', "{{REPLACE}}")
pat = re.compile(b'{{REPLACE}}')
message_bytes = pat.sub(japanese_bytes, message.as_bytes())
message64 = base64.b64encode(message_bytes).decode()
query['data'] = message64
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# Parse Response
RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
for i in response.json()['results']:
if i['type'] == 'email-subject':
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
def test_email_malformed_header_emoji_encoding(self):
query = {"module":"email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email()
text = """I am a test e-mail
the password is NOT "this string".
That is all.
"""
message.attach(MIMEText(text, 'plain'))
emoji_string = "Emoji Test 👍 checking this"
emoji_bytes = emoji_string.encode()
message.replace_header('Subject', "{{EMOJI}}")
pat = re.compile(b'{{EMOJI}}')
message_bytes = pat.sub(emoji_bytes, message.as_bytes())
message64 = base64.b64encode(message_bytes).decode()
query['data'] = message64
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
# Parse Response
RFC_format = "=?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?="
for i in response.json()['results']:
if i['type'] == 'email-subject':
RFC_encoding_error = "The subject was not decoded from RFC2047 format."
self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
self.assertEqual(emoji_string, i['values'], "Subject not properly decoded")
def test_email_attachment_emoji_filename(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": None,
"guess_zip_attachment_passwords": None,
"extract_urls": None}
message = get_base_email()
text = """I am a test e-mail"""
message.attach(MIMEText(text, 'plain'))
with open("tests/EICAR.com", "rb") as fp:
eicar_mime = MIMEApplication(fp.read(), 'com')
eicar_mime.add_header('Content-Disposition',
'attachment',
filename="Emoji Test 👍 checking this")
message.attach(eicar_mime)
query['data'] = decode_email(message)
data = json.dumps(query)
response = requests.post(self.url + "query", data=data)
values = [x["values"] for x in response.json()['results']]
self.assertIn("Emoji Test 👍 checking this", values)
for i in response.json()['results']:
if i["type"] == 'email-attachment':
self.assertEqual(i["values"], "Emoji Test 👍 checking this")
if i['type'] == 'malware-sample':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
def test_email_attachment_password_in_subject(self):
query = {"module": "email_import"}
query["config"] = {"unzip_attachments": "true",

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,169 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">
<html lang="en-US">
<head>
<META http-equiv="Content-Type" content= "text/html; charset=UTF-8">
<META name="copyright" content="&copy; 2001-2008, Tex Texin">
<META http-equiv="Content-Language" content="en-US">
<META name="keywords" lang="en-US" content="Unicode, supplementary, business">
<META name="keywords" lang="en-US" content="UTF-8, Tex Texin, i18nGuy">
<META name="Author" content="Tex Texin">
<title>Unicode Plane 1 Supplementary Character Examples using UTF-8</title>
<meta http-equiv="Content-Style-Type" content="text/css" >
<link href="css/unicode-example.css" rel="stylesheet" type="text/css" >
<style type="text/css">
p {
width : 80%;
margin : 1em auto;
}
h3 {
text-align:center;
}
div.center {
font-size : 80%;
}
td.english {
font-size : 80%;
width : 12%;
padding : 2px 4px;
color : black;
background-color: #F8F8F8;
}
td.submitter {
font-size : 70%;
width : 20%;
padding : 2px 4px;
color : black;
background-color: #F8F8F8 ;
}
</style>
</head>
<body>
<h1 id="top">Example Unicode Usage For Business Applications</h1>
<h3>Demonstrating Unicode Plane 1 (Supplementary) Characters Encoded in UTF-8</h3>
<div class="center">
Also see: <a href="unicode/unicode-example-intro.html">Introduction to the Compelling Unicode Demo</a>.
<br>The original <a href="unicode-example.html">Compelling Unicode Demo</a> (BMP) page.
<br><a href="index.html">I18nGuy Home Page</a>
</div>
<p>The table on this page is identical to the table on
<a href="unicode-example-plane1.html">Unicode Plane 1 Characters Encoded as Numeric Character References (NCR).</a>
except this table uses
<a href="http://www.unicode.org/glossary/#UTF_8">UTF-8</a> encoding for the plane 1 characters, and the other uses NCRs
<b>(</b><a href="http://www.w3.org/TR/html401/charset.html#h-5.3.1">Numeric Character References</a>
of the form &amp;#dddd; (decimal) or &amp;#xhhhh; (hexadecimal)<b>)</b>.
</p>
<p style="BACKGROUND-COLOR: yellow">The NCR page also
has a discussion of how to set up browsers to view these characters and which browsers work.
If you find browsers or configurations that work let me know.
Note that as of version 6, IE does not support Supplementary characters encoded in UTF-8.
Netscape and Opera do support them. Also Ximian Desktop 2 (XD2) displays this page correctly.</p>
<TABLE class="ctr">
<CAPTION>Example Plane 1 Unicode Data</CAPTION>
<TBODY>
<TR>
<TD class="english"><B>Script</B><br><span class="small">(links to Unicode code charts)</span></TD>
<TD class="english"><B>Origin</B> <BR><span class="small">(in English)</span> </TD>
<TD class="english"><B>Name</B> <BR><span class="small">(English transliteration)</span> </TD>
<TD class="native"><B>Origin</B> <BR><span class="small">(in native language)</span> </TD>
<TD class="native"><B>Name</B> <BR><span class="small">(in native language)</span>
</TD>
<TD class="submitter"><B>Submitters</B> </TD></TR>
<TR>
<TD class="english"><a href="http://www.unicode.org/charts/PDF/U10300.pdf" target="_blank">Etruscan</a></TD>
<TD class="english">Rasna (Etruria) </TD>
<TD class="english">Aulus Metellus <BR>(Aules'i Metelis' )</TD>
<TD class="rtlplane1"><BDO dir=rtl>𐌓𐌀𐌔𐌍𐌀</BDO></TD>
<TD class="rtlplane1"><BDO dir=rtl>𐌀𐌖𐌋𐌄𐌑𐌉·𐌌𐌄𐌕𐌄𐌋𐌉𐌑</BDO></TD>
<TD class="submitter">Marco Cimarosti, <BR><A href="mailto:jameskass&#x40;worldnet.att.net">James Kass</A>, <BR>Andrew "Bass"
Shcheglov, <br>Michka Kaplan<br>Font: <a href="#code2001">CODE2001</a></TD></TR>
<TR>
<TD class="english"><a href="http://www.unicode.org/charts/PDF/U10400.pdf" target="_blank">Deseret</a></TD>
<TD class="english">Utah</TD>
<TD class="english">Brigham Young</TD>
<TD class="plane1">𐐏𐐭𐐻𐐫 </TD>
<TD class="plane1">𐐒𐑉𐐮𐑀𐐲𐑋 𐐏𐐲𐑍 </TD>
<TD class="submitter">John Jenkins<br>Font: <a href="#code2001">CODE2001</a></TD></TR>
<TR>
<TD class="english"><a href="http://www.unicode.org/charts/PDF/U10330.pdf" target="_blank">Gothic</a></TD>
<TD class="english">Gothland <BR>(Kingdom of the Goths)<BR>(thizai
thiudangardjai thize Gutane) </TD>
<TD class="english">Wulfila<BR>(also Ulfilas) </TD>
<TD class=plane1>𐌸𐌹𐌶𐌰𐌹<BR>𐌸𐌹𐌿𐌳𐌰𐌽𐌲𐌰𐍂𐌳𐌾𐌰𐌹 <BR>𐌸𐌹𐌶𐌴
<BR>𐌲𐌿𐍄𐌰𐌽𐌴 </TD>
<TD class="plane1">𐍅𐌿𐌻𐍆𐌹𐌻𐌰 </TD>
<TD class="submitter">James Kass<br>Font: <a href="#code2001">CODE2001</a></TD></TR>
<tr>
<td class="english"><a href="http://www.unicode.org/charts/PDF/U10480.pdf" target="_blank">Osmanya</a></td>
<td class="english">Somalia</td>
<td class="english">Cismaan Yuusuf Keenadiid<br><span class="small">(inventor of Osmanya script)</span></td>
<td class="osmanya">𐒈𐒝𐒑𐒛𐒐𐒘𐒕𐒖
</td>
<td class="osmanya">
𐒋𐒘𐒈𐒑𐒛𐒒 𐒕𐒓
𐒈𐒚𐒍 𐒏𐒜𐒒𐒖𐒆
𐒕𐒆
</td>
<td class="english">Mark Williamson
<br>Font: <a href="#andagii">ANDAGII</a>
</td>
</tr>
<tr>
<td class="english"><a href="http://www.unicode.org/charts/PDF/U10000.pdf" target="_blank">Linear B Syllabary</a></td>
<td class="english">Tulisos</td>
<td class="english">Minos</td>
<td class="linearb">𐀶𐀪𐀰</td>
<td class="linearb"><span class="small">(Unknown).</span></td>
<td class="english">Mark Williamson
<br>Font: <a href="#penuturesu">PENUTURESU</a>
</td>
</tr>
<TR>
<TD class="english"><a href="http://www.unicode.org/charts/PDF/U10450.pdf" target="_blank">Shavian</a></TD>
<TD class="english">Great Britain or United Kingdom</TD>
<TD class="english">George Bernard Shaw</TD>
<TD class=plane1>·𐑜𐑮𐑱𐑑 ·𐑚𐑮𐑦𐑑𐑩𐑯
or<br>·𐑿𐑯𐑲𐑑𐑧𐑛 ·𐑒𐑦𐑙𐑛𐑳𐑥</TD>
<TD class="plane1">𐑡𐑹𐑡 ·𐑚𐑻𐑯𐑸𐑛 ·𐑖𐑷</TD>
<TD class="submitter">Doug Ewell based this entry on information from Simon Barne's (now defunct) web site.
<br>Font: <a href="#code2001">CODE2001</a></TD></TR>
</TBODY>
</TABLE>
<div class="nottheothers" style="font-family:helvetica, arial, sans-serif">
<h2 id="fonts" style="margin-left:1in">Fonts</h2>
<ul>
<li id="code2001"><a href="http://www.code2000.net/code2001.htm">CODE2001</a></li>
<li id="andagii"><a href="unicode/unicode-font.html" target="_blank">ANDAGII</a></li>
<li id="penuturesu"><a href="unicode/unicode-font.html" target="_blank">PENUTURESU</a></li>
</ul>
</div>
<div class="center">
<a href="http://www.unicode.org" target="_blank" style="float:right;margin:1em 0 1em 1em;"><img border="0"
src="images/UniEncGreyBord.gif" width="88" height="31" alt="Encoded in UTF-8!"></a>
<a href="#top">Top of page</a>
<br>This page last updated 2008-11-15
</div>
<!-- WiredMinds eMetrics tracking with Enterprise Edition V5.4 START -->
<script type='text/javascript' src='https://count.carrierzone.com/app/count_server/count.js'></script>
<script type='text/javascript'><!--
wm_custnum='5e53965097060c7f';
wm_page_name='unicode-plane1-utf8.html';
wm_group_name='/services/webpages/i/1/i18nguy.com/public';
wm_campaign_key='campaign_id';
wm_track_alt='';
wiredminds.count();
// -->
</script>
<!-- WiredMinds eMetrics tracking with Enterprise Edition V5.4 END -->
</BODY>
</HTML>