misp-modules/misp_modules/modules/import_mod/email_import.py

404 lines
16 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import base64
2016-12-26 22:55:54 +01:00
import io
import zipfile
import codecs
2016-12-26 22:55:54 +01:00
import re
from email import message_from_bytes
from email.utils import parseaddr
2016-12-26 22:55:54 +01:00
from email.iterators import typed_subpart_iterator
from email.parser import Parser
2016-12-26 22:55:54 +01:00
from html.parser import HTMLParser
from email.header import decode_header
misperrors = {'error': 'Error'}
2017-01-07 01:01:19 +01:00
userConfig = {}
inputSource = ['file']
moduleinfo = {'version': '0.1',
'author': 'Seamus Tuohy',
'description': 'Email import module for MISP',
'module-type': ['import']}
2016-12-26 22:55:54 +01:00
# unzip_attachments : Unzip all zip files that are not password protected
# guess_zip_attachment_passwords : This attempts to unzip all password protected zip files using all the strings found in the email body and subject
# extract_urls : This attempts to extract all URL's from text/html parts of the email
moduleconfig = ["unzip_attachments",
2016-12-26 22:55:54 +01:00
"guess_zip_attachment_passwords",
"extract_urls"]
def handler(q=False):
if q is False:
return False
results = []
# Decode and parse email
request = json.loads(q)
# request data is always base 64 byte encoded
data = base64.b64decode(request["data"])
# Double decode to force headers to be re-parsed with proper encoding
message = Parser().parsestr(message_from_bytes(data).as_string())
# Decode any encoded headers to get at proper string
for key, val in message.items():
replacement = get_decoded_header(key, val)
if replacement is not None:
message.replace_header(key, replacement)
2016-12-26 22:55:54 +01:00
# Extract all header information
all_headers = ""
for k, v in message.items():
all_headers += "{0}: {1}\n".format(k.strip(), v.strip())
2017-01-08 00:36:08 +01:00
results.append({"values": all_headers, "type": 'email-header'})
2016-12-26 22:55:54 +01:00
# E-Mail MIME Boundry
if message.get_boundary():
2017-01-08 00:36:08 +01:00
results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'})
2016-12-26 22:55:54 +01:00
# E-Mail Reply To
if message.get('In-Reply-To'):
2017-01-08 00:36:08 +01:00
results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'})
2016-12-26 22:55:54 +01:00
# X-Mailer
if message.get('X-Mailer'):
2017-01-08 00:36:08 +01:00
results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'})
2016-12-26 22:55:54 +01:00
# Thread Index
if message.get('Thread-Index'):
2017-01-08 00:36:08 +01:00
results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'})
2016-12-26 22:55:54 +01:00
2017-01-07 01:01:19 +01:00
# Email Message ID
if message.get('Message-ID'):
2017-01-08 00:36:08 +01:00
results.append({"values": message.get('Message-ID'), "type": 'email-message-id'})
# Subject
if message.get('Subject'):
2017-01-08 00:36:08 +01:00
results.append({"values": message.get('Subject'), "type": 'email-subject'})
# Source
from_addr = message.get('From')
if from_addr:
2017-01-08 00:36:08 +01:00
results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)})
results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)})
2016-12-26 22:55:54 +01:00
# Return Path
return_path = message.get('Return-Path')
if return_path:
# E-Mail Source
2017-01-08 00:36:08 +01:00
results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)})
# E-Mail Source Name
2017-01-08 00:36:08 +01:00
results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)})
# Destinations
2017-01-07 01:01:19 +01:00
# Split and sort destination header values
recipient_headers = ['To', 'Cc', 'Bcc']
for hdr_val in recipient_headers:
if message.get(hdr_val):
addrs = message.get(hdr_val).split(',')
for addr in addrs:
2017-01-07 01:01:19 +01:00
# Parse and add destination header values
parsed_addr = parseaddr(addr)
2017-01-08 00:36:08 +01:00
results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)})
results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)})
2016-12-26 22:55:54 +01:00
# Get E-Mail Targets
# Get the addresses that received the email.
# As pulled from the Received header
received = message.get_all('Received')
if received:
email_targets = set()
for rec in received:
try:
2018-12-11 15:29:09 +01:00
email_check = re.search(r"for\s(.*@.*);", rec).group(1)
email_check = email_check.strip(' <>')
email_targets.add(parseaddr(email_check)[1])
except (AttributeError):
continue
for tar in email_targets:
2017-01-08 00:36:08 +01:00
results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"})
2016-12-26 22:55:54 +01:00
# Check if we were given a configuration
config = request.get("config", {})
# Don't be picky about how the user chooses to say yes to these
acceptable_config_yes = ['y', 'yes', 'true', 't']
# Do we unzip attachments we find?
unzip = config.get("unzip_attachments", None)
2017-01-07 01:01:19 +01:00
if (unzip is not None and unzip.lower() in acceptable_config_yes):
2016-12-26 22:55:54 +01:00
unzip = True
# Do we try to find passwords for protected zip files?
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
2017-01-07 01:01:19 +01:00
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes):
2016-12-26 22:55:54 +01:00
zip_pass_crack = True
password_list = None # Only want to collect password list once
2016-12-26 22:55:54 +01:00
# Do we extract URL's from the email.
extract_urls = config.get("extract_urls", None)
2017-01-07 01:01:19 +01:00
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes):
2016-12-26 22:55:54 +01:00
extract_urls = True
# Get Attachments
# Get file names of attachments
for part in message.walk():
filename = part.get_filename()
if filename is not None:
2017-01-08 00:36:08 +01:00
results.append({"values": filename, "type": 'email-attachment'})
2016-12-26 22:55:54 +01:00
attachment_data = part.get_payload(decode=True)
# Base attachment data is default
attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
2016-12-26 22:55:54 +01:00
if unzip is True: # Attempt to unzip the attachment and return its files
zipped_files = ["doc", "docx", "dot", "dotx", "xls",
"xlsx", "xlm", "xla", "xlc", "xlt",
"xltx", "xlw", "ppt", "pptx", "pps",
"ppsx", "pot", "potx", "potx", "sldx",
"odt", "ods", "odp", "odg", "odf",
"fodt", "fods", "fodp", "fodg", "ott",
"uot"]
zipped_filetype = False
for ext in zipped_files:
if filename.endswith(ext) is True:
zipped_filetype = True
2018-12-11 15:29:09 +01:00
if not zipped_filetype:
try:
attachment_files += get_zipped_contents(filename, attachment_data)
except RuntimeError: # File is encrypted with a password
if zip_pass_crack is True:
if password_list is None:
password_list = get_zip_passwords(message)
password = test_zip_passwords(attachment_data, password_list)
if password is None: # Inform the analyst that we could not crack password
attachment_files[0]['comment'] = "Encrypted Zip: Password could not be cracked from message"
else:
attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password)
attachment_files += get_zipped_contents(filename, attachment_data, password=password)
except zipfile.BadZipFile: # Attachment is not a zipfile
pass
2016-12-26 22:55:54 +01:00
for attch_item in attachment_files:
2017-01-08 00:36:08 +01:00
attch_item["type"] = 'malware-sample'
2016-12-26 22:55:54 +01:00
results.append(attch_item)
2017-01-07 01:01:19 +01:00
else: # Check email body part for urls
2016-12-26 22:55:54 +01:00
if (extract_urls is True and part.get_content_type() == 'text/html'):
url_parser = HTMLURLParser()
charset = get_charset(part, get_charset(message))
2016-12-26 22:55:54 +01:00
url_parser.feed(part.get_payload(decode=True).decode(charset))
urls = url_parser.urls
for url in urls:
2017-01-08 00:36:08 +01:00
results.append({"values": url, "type": "url"})
r = {'results': results}
return r
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
def get_zipped_contents(filename, data, password=None):
"""Extract the contents of a zipfile.
Args:
filename (str): A string containing the name of the zip file.
data (decoded attachment data): Data object decoded from an e-mail part.
Returns:
Returns an array containing a dict for each file
Example Dict {"values":"name_of_file.txt",
"data":<Base64 Encoded BytesIO>,
"comment":"string here"}
"""
with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
unzipped_files = []
if password is not None:
password = str.encode(password) # Byte encoded password required
for zip_file_name in zf.namelist(): # Get all files in the zip file
2017-01-07 02:36:09 +01:00
with zf.open(zip_file_name, mode='r', pwd=password) as fp:
2017-01-08 01:30:00 +01:00
file_data = fp.read()
2016-12-26 22:55:54 +01:00
unzipped_files.append({"values": zip_file_name,
2017-01-07 01:01:19 +01:00
"data": base64.b64encode(file_data).decode(), # Any password works when not encrypted
2016-12-26 22:55:54 +01:00
"comment": "Extracted from {0}".format(filename)})
return unzipped_files
def test_zip_passwords(data, test_passwords):
"""Test passwords until one is found to be correct.
Args:
data (decoded attachment data): Data object decoded from an e-mail part.
test_passwords (array): List of strings to test as passwords
Returns:
Returns a byte string containing a found password and None if password is not found.
"""
with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
firstfile = zf.namelist()[0]
2016-12-26 22:55:54 +01:00
for pw_test in test_passwords:
byte_pwd = str.encode(pw_test)
try:
zf.open(firstfile, pwd=byte_pwd)
return pw_test
2016-12-26 22:55:54 +01:00
except RuntimeError: # Incorrect Password
continue
return None
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
def get_zip_passwords(message):
""" Parse message for possible zip password combinations.
Args:
message (email.message) Email message object to parse.
"""
possible_passwords = []
# Passwords commonly used for malware
malware_passwords = ["infected", "malware"]
possible_passwords += malware_passwords
# Commonly used passwords
common_passwords = ["123456", "password", "12345678", "qwerty",
"abc123", "123456789", "111111", "1234567",
"iloveyou", "adobe123", "123123", "sunshine",
"1234567890", "letmein", "1234", "monkey",
"shadow", "sunshine", "12345", "password1",
"princess", "azerty", "trustno1", "000000"]
possible_passwords += common_passwords
# Not checking for multi-part message because by having an
# encrypted zip file it must be multi-part.
2017-01-08 00:36:08 +01:00
text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')]
html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')]
2016-12-26 22:55:54 +01:00
body = []
# Get full message character set once
# Language example reference (using python2)
# http://ginstrom.com/scribbles/2007/11/19/parsing-multilingual-email-with-python/
message_charset = get_charset(message)
for part in text_parts:
charset = get_charset(part, message_charset)
body.append(part.get_payload(decode=True).decode(charset))
for part in html_parts:
charset = get_charset(part, message_charset)
html_part = part.get_payload(decode=True).decode(charset)
html_parser = HTMLTextParser()
html_parser.feed(html_part)
for text in html_parser.text_data:
body.append(text)
raw_text = "\n".join(body).strip()
# Add subject to text corpus to parse
subject = " " + message.get('Subject')
raw_text += subject
# Grab any strings that are marked off by special chars
marking_chars = [["\'", "\'"], ['"', '"'], ['[', ']'], ['(', ')']]
2016-12-26 22:55:54 +01:00
for char_set in marking_chars:
2018-12-11 15:29:09 +01:00
regex = re.compile(r"""\{0}([^\{1}]*)\{1}""".format(char_set[0], char_set[1]))
2016-12-26 22:55:54 +01:00
marked_off = re.findall(regex, raw_text)
possible_passwords += marked_off
# Create a list of unique words to test as passwords
individual_words = re.split(r"\s", raw_text)
# Also get words with basic punctuation stripped out
# just in case someone places a password in a proper sentence
stripped_words = [i.strip('.,;:?!') for i in individual_words]
unique_words = list(set(individual_words + stripped_words))
possible_passwords += unique_words
return possible_passwords
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
class HTMLTextParser(HTMLParser):
""" Parse all text and data from HTML strings."""
def __init__(self, text_data=None):
HTMLParser.__init__(self)
if text_data is None:
self.text_data = []
else:
self.text_data = text_data
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
def handle_data(self, data):
self.text_data.append(data)
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
class HTMLURLParser(HTMLParser):
""" Parse all href targets from HTML strings."""
def __init__(self, urls=None):
HTMLParser.__init__(self)
if urls is None:
self.urls = []
else:
self.urls = urls
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
def handle_starttag(self, tag, attrs):
if tag == 'a':
self.urls.append(dict(attrs).get('href'))
2017-01-07 01:01:19 +01:00
2016-12-26 22:55:54 +01:00
def get_charset(message, default="ascii"):
"""Get a message objects charset
Args:
message (email.message): Email message object to parse.
default (string): String containing default charset to return.
"""
if message.get_content_charset():
return message.get_content_charset()
if message.get_charset():
return message.get_charset()
return default
def get_decoded_header(header, value):
subject, encoding = decode_header(value)[0]
subject = subject.strip() # extra whitespace will mess up encoding
if isinstance(subject, bytes):
# Remove Byte Order Mark (BOM) from UTF strings
if encoding == 'utf-8':
return re.sub(codecs.BOM_UTF8, b"", subject).decode(encoding)
if encoding == 'utf-16':
return re.sub(codecs.BOM_UTF16, b"", subject).decode(encoding)
elif encoding == 'utf-32':
return re.sub(codecs.BOM_UTF32, b"", subject).decode(encoding)
# Try various UTF decodings for any unknown 8bit encodings
elif encoding == 'unknown-8bit':
for enc in [('utf-8', codecs.BOM_UTF8),
('utf-32', codecs.BOM_UTF32), # 32 before 16 so it raises errors
('utf-16', codecs.BOM_UTF16)]:
try:
return re.sub(enc[1], b"", subject).decode(enc[0])
except UnicodeDecodeError:
continue
# If none of those encoding work return it in RFC2047 format
return str(subject)
# Provide RFC2047 format string if encoding is a unknown encoding
# Better to have the analyst decode themselves than to provide a mangled string
elif encoding is None:
return str(subject)
else:
return subject.decode(encoding)
def introspection():
modulesetup = {}
try:
modulesetup['userConfig'] = userConfig
except NameError:
pass
try:
modulesetup['inputSource'] = inputSource
except NameError:
pass
return modulesetup
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
2018-12-11 15:29:09 +01:00
if __name__ == '__main__':
with open('tests/test_no_attach.eml', 'r') as email_file:
handler(q=email_file.read())