mirror of https://github.com/MISP/misp-modules
chg: Update email import module, support objects
parent
2b8a2d03cd
commit
6f95445143
|
@ -3,24 +3,25 @@
|
|||
|
||||
import json
|
||||
import base64
|
||||
import io
|
||||
import zipfile
|
||||
import codecs
|
||||
import re
|
||||
from email import message_from_bytes
|
||||
from email.utils import parseaddr
|
||||
from email.iterators import typed_subpart_iterator
|
||||
from email.parser import Parser
|
||||
from html.parser import HTMLParser
|
||||
from email.header import decode_header
|
||||
from pymisp.tools import EMailObject, make_binary_objects
|
||||
try:
|
||||
from pymisp.tools import URLObject
|
||||
except ImportError:
|
||||
raise ImportError('Unable to import URLObject, pyfaup missing')
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
userConfig = {}
|
||||
|
||||
inputSource = ['file']
|
||||
mispattributes = {'inputSource': ['file'], 'output': ['MISP objects'],
|
||||
'format': 'misp_standard'}
|
||||
|
||||
moduleinfo = {'version': '0.1',
|
||||
'author': 'Seamus Tuohy',
|
||||
moduleinfo = {'version': '0.2',
|
||||
'author': 'Seamus Tuohy, Raphaël Vinot',
|
||||
'description': 'Email import module for MISP',
|
||||
'module-type': ['import']}
|
||||
|
||||
|
@ -35,93 +36,13 @@ moduleconfig = ["unzip_attachments",
|
|||
def handler(q=False):
|
||||
if q is False:
|
||||
return False
|
||||
results = []
|
||||
|
||||
# Decode and parse email
|
||||
request = json.loads(q)
|
||||
# request data is always base 64 byte encoded
|
||||
data = base64.b64decode(request["data"])
|
||||
|
||||
# Double decode to force headers to be re-parsed with proper encoding
|
||||
message = Parser().parsestr(message_from_bytes(data).as_string())
|
||||
# Decode any encoded headers to get at proper string
|
||||
for key, val in message.items():
|
||||
replacement = get_decoded_header(key, val)
|
||||
if replacement is not None:
|
||||
message.replace_header(key, replacement)
|
||||
|
||||
# Extract all header information
|
||||
all_headers = ""
|
||||
for k, v in message.items():
|
||||
all_headers += "{0}: {1}\n".format(k.strip(), v.strip())
|
||||
results.append({"values": all_headers, "type": 'email-header'})
|
||||
|
||||
# E-Mail MIME Boundry
|
||||
if message.get_boundary():
|
||||
results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'})
|
||||
|
||||
# E-Mail Reply To
|
||||
if message.get('In-Reply-To'):
|
||||
results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'})
|
||||
|
||||
# X-Mailer
|
||||
if message.get('X-Mailer'):
|
||||
results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'})
|
||||
|
||||
# Thread Index
|
||||
if message.get('Thread-Index'):
|
||||
results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'})
|
||||
|
||||
# Email Message ID
|
||||
if message.get('Message-ID'):
|
||||
results.append({"values": message.get('Message-ID'), "type": 'email-message-id'})
|
||||
|
||||
# Subject
|
||||
if message.get('Subject'):
|
||||
results.append({"values": message.get('Subject'), "type": 'email-subject'})
|
||||
|
||||
# Source
|
||||
from_addr = message.get('From')
|
||||
if from_addr:
|
||||
results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)})
|
||||
results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)})
|
||||
|
||||
# Return Path
|
||||
return_path = message.get('Return-Path')
|
||||
if return_path:
|
||||
# E-Mail Source
|
||||
results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)})
|
||||
# E-Mail Source Name
|
||||
results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)})
|
||||
|
||||
# Destinations
|
||||
# Split and sort destination header values
|
||||
recipient_headers = ['To', 'Cc', 'Bcc']
|
||||
|
||||
for hdr_val in recipient_headers:
|
||||
if message.get(hdr_val):
|
||||
addrs = message.get(hdr_val).split(',')
|
||||
for addr in addrs:
|
||||
# Parse and add destination header values
|
||||
parsed_addr = parseaddr(addr)
|
||||
results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)})
|
||||
results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)})
|
||||
|
||||
# Get E-Mail Targets
|
||||
# Get the addresses that received the email.
|
||||
# As pulled from the Received header
|
||||
received = message.get_all('Received')
|
||||
if received:
|
||||
email_targets = set()
|
||||
for rec in received:
|
||||
try:
|
||||
email_check = re.search(r"for\s(.*@.*);", rec).group(1)
|
||||
email_check = email_check.strip(' <>')
|
||||
email_targets.add(parseaddr(email_check)[1])
|
||||
except (AttributeError):
|
||||
continue
|
||||
for tar in email_targets:
|
||||
results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"})
|
||||
email_object = EMailObject(pseudofile=BytesIO(data), attach_original_mail=True, standalone=False)
|
||||
|
||||
# Check if we were given a configuration
|
||||
config = request.get("config", {})
|
||||
|
@ -137,66 +58,81 @@ def handler(q=False):
|
|||
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
|
||||
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes):
|
||||
zip_pass_crack = True
|
||||
password_list = None # Only want to collect password list once
|
||||
password_list = get_zip_passwords(email_object.email)
|
||||
|
||||
# Do we extract URL's from the email.
|
||||
extract_urls = config.get("extract_urls", None)
|
||||
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes):
|
||||
extract_urls = True
|
||||
|
||||
file_objects = [] # All possible file objects
|
||||
# Get Attachments
|
||||
# Get file names of attachments
|
||||
for part in message.walk():
|
||||
filename = part.get_filename()
|
||||
if filename is not None:
|
||||
results.append({"values": filename, "type": 'email-attachment'})
|
||||
attachment_data = part.get_payload(decode=True)
|
||||
# Base attachment data is default
|
||||
attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
|
||||
if unzip is True: # Attempt to unzip the attachment and return its files
|
||||
zipped_files = ["doc", "docx", "dot", "dotx", "xls",
|
||||
"xlsx", "xlm", "xla", "xlc", "xlt",
|
||||
"xltx", "xlw", "ppt", "pptx", "pps",
|
||||
"ppsx", "pot", "potx", "potx", "sldx",
|
||||
"odt", "ods", "odp", "odg", "odf",
|
||||
"fodt", "fods", "fodp", "fodg", "ott",
|
||||
"uot"]
|
||||
for attachment_name, attachment in email_object.attachments:
|
||||
# Create file objects for the attachments
|
||||
if not attachment_name:
|
||||
attachment_name = 'NameMissing.txt'
|
||||
|
||||
zipped_filetype = False
|
||||
for ext in zipped_files:
|
||||
if filename.endswith(ext) is True:
|
||||
zipped_filetype = True
|
||||
if not zipped_filetype:
|
||||
try:
|
||||
attachment_files += get_zipped_contents(filename, attachment_data)
|
||||
except RuntimeError: # File is encrypted with a password
|
||||
if zip_pass_crack is True:
|
||||
if password_list is None:
|
||||
password_list = get_zip_passwords(message)
|
||||
password = test_zip_passwords(attachment_data, password_list)
|
||||
if password is None: # Inform the analyst that we could not crack password
|
||||
attachment_files[0]['comment'] = "Encrypted Zip: Password could not be cracked from message"
|
||||
else:
|
||||
attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password)
|
||||
attachment_files += get_zipped_contents(filename, attachment_data, password=password)
|
||||
except zipfile.BadZipFile: # Attachment is not a zipfile
|
||||
pass
|
||||
for attch_item in attachment_files:
|
||||
attch_item["type"] = 'malware-sample'
|
||||
results.append(attch_item)
|
||||
else: # Check email body part for urls
|
||||
if (extract_urls is True and part.get_content_type() == 'text/html'):
|
||||
url_parser = HTMLURLParser()
|
||||
charset = get_charset(part, get_charset(message))
|
||||
url_parser.feed(part.get_payload(decode=True).decode(charset))
|
||||
urls = url_parser.urls
|
||||
for url in urls:
|
||||
results.append({"values": url, "type": "url"})
|
||||
r = {'results': results}
|
||||
temp_filename = Path(attachment_name)
|
||||
zipped_files = ["doc", "docx", "dot", "dotx", "xls", "xlsx", "xlm", "xla",
|
||||
"xlc", "xlt", "xltx", "xlw", "ppt", "pptx", "pps", "ppsx",
|
||||
"pot", "potx", "potx", "sldx", "odt", "ods", "odp", "odg",
|
||||
"odf", "fodt", "fods", "fodp", "fodg", "ott", "uot"]
|
||||
# Attempt to unzip the attachment and return its files
|
||||
if unzip and temp_filename.suffix[1:] not in zipped_files:
|
||||
try:
|
||||
unzip_attachement(attachment_name, attachment, email_object, file_objects)
|
||||
except RuntimeError: # File is encrypted with a password
|
||||
if zip_pass_crack is True:
|
||||
password = test_zip_passwords(attachment, password_list)
|
||||
if password:
|
||||
unzip_attachement(attachment_name, attachment, email_object, file_objects, password)
|
||||
else: # Inform the analyst that we could not crack password
|
||||
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
|
||||
f_object.comment = "Encrypted Zip: Password could not be cracked from message"
|
||||
file_objects.append(f_object)
|
||||
file_objects.append(main_object)
|
||||
file_objects += sections
|
||||
email_object.add_reference(f_object.uuid, 'includes', 'Email attachment')
|
||||
except zipfile.BadZipFile: # Attachment is not a zipfile
|
||||
# Just straight add the file
|
||||
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
|
||||
file_objects.append(f_object)
|
||||
file_objects.append(main_object)
|
||||
file_objects += sections
|
||||
email_object.add_reference(f_object.uuid, 'includes', 'Email attachment')
|
||||
else:
|
||||
# Just straight add the file
|
||||
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
|
||||
file_objects.append(f_object)
|
||||
file_objects.append(main_object)
|
||||
file_objects += sections
|
||||
email_object.add_reference(f_object.uuid, 'includes', 'Email attachment')
|
||||
|
||||
mail_body = email_object.email.get_body(preferencelist=('html', 'plain'))
|
||||
if extract_urls:
|
||||
charset = mail_body.get_content_charset()
|
||||
if mail_body.get_content_type() == 'text/html':
|
||||
url_parser = HTMLURLParser()
|
||||
url_parser.feed(mail_body.get_payload(decode=True).decode(charset, errors='ignore'))
|
||||
urls = url_parser.urls
|
||||
else:
|
||||
urls = re.findall(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', mail_body.get_payload(decode=True).decode(charset, errors='ignore'))
|
||||
for url in urls:
|
||||
if not url:
|
||||
continue
|
||||
url_object = URLObject(url, standalone=False)
|
||||
file_objects.append(url_object)
|
||||
email_object.add_reference(url_object.uuid, 'includes', 'URL in email body')
|
||||
|
||||
objects = [email_object.to_json()]
|
||||
if file_objects:
|
||||
objects += [o.to_json() for o in file_objects if o]
|
||||
r = {'results': {'Object': [json.loads(o) for o in objects]}}
|
||||
return r
|
||||
|
||||
|
||||
def get_zipped_contents(filename, data, password=None):
|
||||
def unzip_attachement(filename, data, email_object, file_objects, password=None):
|
||||
"""Extract the contents of a zipfile.
|
||||
|
||||
Args:
|
||||
|
@ -210,17 +146,23 @@ def get_zipped_contents(filename, data, password=None):
|
|||
"comment":"string here"}
|
||||
|
||||
"""
|
||||
with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
|
||||
unzipped_files = []
|
||||
with zipfile.ZipFile(data, "r") as zf:
|
||||
if password is not None:
|
||||
comment = f'Extracted from {filename} with password "{password}"'
|
||||
password = str.encode(password) # Byte encoded password required
|
||||
else:
|
||||
comment = f'Extracted from {filename}'
|
||||
for zip_file_name in zf.namelist(): # Get all files in the zip file
|
||||
with zf.open(zip_file_name, mode='r', pwd=password) as fp:
|
||||
file_data = fp.read()
|
||||
unzipped_files.append({"values": zip_file_name,
|
||||
"data": base64.b64encode(file_data).decode(), # Any password works when not encrypted
|
||||
"comment": "Extracted from {0}".format(filename)})
|
||||
return unzipped_files
|
||||
file_data = BytesIO(fp.read())
|
||||
f_object, main_object, sections = make_binary_objects(pseudofile=file_data,
|
||||
filename=zip_file_name,
|
||||
standalone=False)
|
||||
f_object.comment = comment
|
||||
file_objects.append(f_object)
|
||||
file_objects.append(main_object)
|
||||
file_objects += sections
|
||||
email_object.add_reference(f_object.uuid, 'includes', 'Email attachment')
|
||||
|
||||
|
||||
def test_zip_passwords(data, test_passwords):
|
||||
|
@ -234,7 +176,7 @@ def test_zip_passwords(data, test_passwords):
|
|||
Returns a byte string containing a found password and None if password is not found.
|
||||
|
||||
"""
|
||||
with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
|
||||
with zipfile.ZipFile(data, "r") as zf:
|
||||
firstfile = zf.namelist()[0]
|
||||
for pw_test in test_passwords:
|
||||
byte_pwd = str.encode(pw_test)
|
||||
|
@ -268,23 +210,16 @@ def get_zip_passwords(message):
|
|||
|
||||
# Not checking for multi-part message because by having an
|
||||
# encrypted zip file it must be multi-part.
|
||||
text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')]
|
||||
html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')]
|
||||
body = []
|
||||
# Get full message character set once
|
||||
# Language example reference (using python2)
|
||||
# http://ginstrom.com/scribbles/2007/11/19/parsing-multilingual-email-with-python/
|
||||
message_charset = get_charset(message)
|
||||
for part in text_parts:
|
||||
charset = get_charset(part, message_charset)
|
||||
body.append(part.get_payload(decode=True).decode(charset))
|
||||
for part in html_parts:
|
||||
charset = get_charset(part, message_charset)
|
||||
html_part = part.get_payload(decode=True).decode(charset)
|
||||
html_parser = HTMLTextParser()
|
||||
html_parser.feed(html_part)
|
||||
for text in html_parser.text_data:
|
||||
body.append(text)
|
||||
for part in message.walk():
|
||||
charset = part.get_content_charset()
|
||||
if part.get_content_type() == 'text/plain':
|
||||
body.append(part.get_payload(decode=True).decode(charset, errors='ignore'))
|
||||
elif part.get_content_type() == 'text/html':
|
||||
html_parser = HTMLTextParser()
|
||||
html_parser.feed(part.get_payload(decode=True).decode(charset, errors='ignore'))
|
||||
for text in html_parser.text_data:
|
||||
body.append(text)
|
||||
raw_text = "\n".join(body).strip()
|
||||
|
||||
# Add subject to text corpus to parse
|
||||
|
@ -334,63 +269,12 @@ class HTMLURLParser(HTMLParser):
|
|||
def handle_starttag(self, tag, attrs):
|
||||
if tag == 'a':
|
||||
self.urls.append(dict(attrs).get('href'))
|
||||
|
||||
|
||||
def get_charset(message, default="ascii"):
|
||||
"""Get a message objects charset
|
||||
|
||||
Args:
|
||||
message (email.message): Email message object to parse.
|
||||
default (string): String containing default charset to return.
|
||||
"""
|
||||
if message.get_content_charset():
|
||||
return message.get_content_charset()
|
||||
if message.get_charset():
|
||||
return message.get_charset()
|
||||
return default
|
||||
|
||||
|
||||
def get_decoded_header(header, value):
|
||||
subject, encoding = decode_header(value)[0]
|
||||
subject = subject.strip() # extra whitespace will mess up encoding
|
||||
if isinstance(subject, bytes):
|
||||
# Remove Byte Order Mark (BOM) from UTF strings
|
||||
if encoding == 'utf-8':
|
||||
return re.sub(codecs.BOM_UTF8, b"", subject).decode(encoding)
|
||||
if encoding == 'utf-16':
|
||||
return re.sub(codecs.BOM_UTF16, b"", subject).decode(encoding)
|
||||
elif encoding == 'utf-32':
|
||||
return re.sub(codecs.BOM_UTF32, b"", subject).decode(encoding)
|
||||
# Try various UTF decodings for any unknown 8bit encodings
|
||||
elif encoding == 'unknown-8bit':
|
||||
for enc in [('utf-8', codecs.BOM_UTF8),
|
||||
('utf-32', codecs.BOM_UTF32), # 32 before 16 so it raises errors
|
||||
('utf-16', codecs.BOM_UTF16)]:
|
||||
try:
|
||||
return re.sub(enc[1], b"", subject).decode(enc[0])
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
# If none of those encoding work return it in RFC2047 format
|
||||
return str(subject)
|
||||
# Provide RFC2047 format string if encoding is a unknown encoding
|
||||
# Better to have the analyst decode themselves than to provide a mangled string
|
||||
elif encoding is None:
|
||||
return str(subject)
|
||||
else:
|
||||
return subject.decode(encoding)
|
||||
if tag == 'img':
|
||||
self.urls.append(dict(attrs).get('src'))
|
||||
|
||||
|
||||
def introspection():
|
||||
modulesetup = {}
|
||||
try:
|
||||
modulesetup['userConfig'] = userConfig
|
||||
except NameError:
|
||||
pass
|
||||
try:
|
||||
modulesetup['inputSource'] = inputSource
|
||||
except NameError:
|
||||
pass
|
||||
return modulesetup
|
||||
return mispattributes
|
||||
|
||||
|
||||
def version():
|
||||
|
|
Loading…
Reference in New Issue