Merge pull request #91 from Rafiot/master

Improve email import module
pull/92/head
Raphaël Vinot 2017-01-07 18:53:08 -05:00 committed by GitHub
commit d07e34e76c
2 changed files with 32 additions and 62 deletions

View File

@ -44,60 +44,45 @@ def handler(q=False):
all_headers = ""
for k, v in message.items():
all_headers += "{0}: {1}\n".format(k.strip(), v.strip())
results.append({"values": all_headers,
"types": ['email-header']})
results.append({"values": all_headers, "type": 'email-header'})
# E-Mail MIME Boundry
if message.get_boundary():
results.append({"values": message.get_boundary(),
"types": ['email-mime-boundary']})
results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'})
# E-Mail Reply To
if message.get('In-Reply-To'):
results.append({"values": message.get('In-Reply-To').strip(),
"types": ['email-reply-to']})
results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'})
# X-Mailer
if message.get('X-Mailer'):
results.append({"values": message.get('X-Mailer'),
"types": ['email-x-mailer']})
results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'})
# Thread Index
if message.get('Thread-Index'):
results.append({"values": message.get('Thread-Index'),
"types": ['email-thread-index']})
results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'})
# Email Message ID
if message.get('Message-ID'):
results.append({"values": message.get('Message-ID'),
"types": ['email-message-id']})
results.append({"values": message.get('Message-ID'), "type": 'email-message-id'})
# Subject
if message.get('Subject'):
results.append({"values": message.get('Subject'),
"types": ['email-subject']})
results.append({"values": message.get('Subject'), "type": 'email-subject'})
# Source
from_addr = message.get('From')
if from_addr:
results.append({"values": parseaddr(from_addr)[1],
"types": ['email-src'],
"comment": "From: {0}".format(from_addr)})
results.append({"values": parseaddr(from_addr)[0],
"types": ['email-src-display-name'],
"comment": "From: {0}".format(from_addr)})
results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)})
results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)})
# Return Path
return_path = message.get('Return-Path')
if return_path:
# E-Mail Source
results.append({"values": parseaddr(return_path)[1],
"types": ['email-src'],
"comment": "Return Path: {0}".format(return_path)})
results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)})
# E-Mail Source Name
results.append({"values": parseaddr(return_path)[0],
"types": ['email-src-display-name'],
"comment": "Return Path: {0}".format(return_path)})
results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)})
# Destinations
# Split and sort destination header values
@ -109,14 +94,8 @@ def handler(q=False):
for addr in addrs:
# Parse and add destination header values
parsed_addr = parseaddr(addr)
results.append({"values": parsed_addr[1],
"types": ["email-dst"],
"comment": "{0}: {1}".format(hdr_val,
addr)})
results.append({"values": parsed_addr[0],
"types": ["email-dst-display-name"],
"comment": "{0}: {1}".format(hdr_val,
addr)})
results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)})
results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)})
# Get E-Mail Targets
# Get the addresses that received the email.
@ -132,9 +111,7 @@ def handler(q=False):
except (AttributeError):
continue
for tar in email_targets:
results.append({"values": tar,
"types": ["target-email"],
"comment": "Extracted from email 'Received' header"})
results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"})
# Check if we were given a configuration
config = request.get("config", {})
@ -162,10 +139,10 @@ def handler(q=False):
for part in message.walk():
filename = part.get_filename()
if filename is not None:
results.append({"values": filename, "type": 'email-attachment'})
attachment_data = part.get_payload(decode=True)
# Base attachment data is default
attachment_files = [{"values": filename,
"data": base64.b64encode(attachment_data).decode()}]
attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
if unzip is True: # Attempt to unzip the attachment and return its files
try:
attachment_files += get_zipped_contents(filename, attachment_data)
@ -180,10 +157,9 @@ def handler(q=False):
attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password)
attachment_files += get_zipped_contents(filename, attachment_data, password=password)
except zipfile.BadZipFile: # Attachment is not a zipfile
attachment_files += [{"values": filename,
"data": base64.b64encode(attachment_data).decode()}]
attachment_files += [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
for attch_item in attachment_files:
attch_item["types"] = ['attachment']
attch_item["type"] = 'malware-sample'
results.append(attch_item)
else: # Check email body part for urls
if (extract_urls is True and part.get_content_type() == 'text/html'):
@ -192,8 +168,7 @@ def handler(q=False):
url_parser.feed(part.get_payload(decode=True).decode(charset))
urls = url_parser.urls
for url in urls:
results.append({"values": url,
"types": "url"})
results.append({"values": url, "type": "url"})
r = {'results': results}
return r
@ -270,12 +245,8 @@ def get_zip_passwords(message):
# Not checking for multi-part message because by having an
# encrypted zip file it must be multi-part.
text_parts = [part for part in typed_subpart_iterator(message,
'text',
'plain')]
html_parts = [part for part in typed_subpart_iterator(message,
'text',
'html')]
text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')]
html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')]
body = []
# Get full message character set once
# Language example reference (using python2)
@ -300,8 +271,7 @@ def get_zip_passwords(message):
# Grab any strings that are marked off by special chars
marking_chars = [["\'", "\'"], ['"', '"'], ['[', ']'], ['(', ')']]
for char_set in marking_chars:
regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0],
char_set[1]))
regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0], char_set[1]))
marked_off = re.findall(regex, raw_text)
possible_passwords += marked_off

View File

@ -70,8 +70,8 @@ class TestModules(unittest.TestCase):
values = [x["values"] for x in results]
types = {}
for i in results:
types.setdefault(i["types"][0], 0)
types[i["types"][0]] += 1
types.setdefault(i["type"], 0)
types[i["type"]] += 1
# Check that there are the appropriate number of items
# Check that all the items were correct
self.assertEqual(types['target-email'], 1)
@ -125,11 +125,11 @@ class TestModules(unittest.TestCase):
values = [x["values"] for x in response.json()['results']]
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
if i["types"][0] == 'attachment':
if i["type"] == 'email-attachment':
self.assertEqual(i["values"], "EICAR.com")
if i['type'] == 'malware-sample':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
def test_email_attachment_unpack(self):
@ -151,13 +151,13 @@ class TestModules(unittest.TestCase):
self.assertIn('EICAR.com', values)
self.assertIn('EICAR.com.zip', values)
for i in response.json()['results']:
if i["values"] == 'EICAR.com.zip':
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
with zf.open("EICAR.com") as ec:
attch_data = ec.read()
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
if i["values"] == 'EICAR.com':
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@ -182,11 +182,11 @@ class TestModules(unittest.TestCase):
self.assertIn('EICAR.com', values)
self.assertIn('EICAR.com.zip', values)
for i in response.json()['results']:
if i["values"] == 'EICAR.com.zip':
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
# Make sure password was set and still in place
self.assertRaises(RuntimeError, zf.open, "EICAR.com")
if i["values"] == 'EICAR.com':
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"])
self.assertEqual(attch_data,
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
@ -238,7 +238,7 @@ class TestModules(unittest.TestCase):
self.assertIn('EICAR.com', values)
for i in response.json()['results']:
# Check that it could be extracted.
if i["values"] == 'EICAR.com':
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
attch_data = base64.b64decode(i["data"]).decode()
self.assertEqual(attch_data,
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')