diff --git a/misp_modules/modules/import_mod/email_import.py b/misp_modules/modules/import_mod/email_import.py index 956f520..0ffb59b 100644 --- a/misp_modules/modules/import_mod/email_import.py +++ b/misp_modules/modules/import_mod/email_import.py @@ -3,24 +3,25 @@ import json import base64 -import io import zipfile -import codecs import re -from email import message_from_bytes -from email.utils import parseaddr -from email.iterators import typed_subpart_iterator -from email.parser import Parser from html.parser import HTMLParser -from email.header import decode_header +from pymisp.tools import EMailObject, make_binary_objects +try: + from pymisp.tools import URLObject +except ImportError: + raise ImportError('Unable to import URLObject, pyfaup missing') +from io import BytesIO +from pathlib import Path + misperrors = {'error': 'Error'} -userConfig = {} -inputSource = ['file'] +mispattributes = {'inputSource': ['file'], 'output': ['MISP objects'], + 'format': 'misp_standard'} -moduleinfo = {'version': '0.1', - 'author': 'Seamus Tuohy', +moduleinfo = {'version': '0.2', + 'author': 'Seamus Tuohy, Raphaƫl Vinot', 'description': 'Email import module for MISP', 'module-type': ['import']} @@ -35,93 +36,13 @@ moduleconfig = ["unzip_attachments", def handler(q=False): if q is False: return False - results = [] # Decode and parse email request = json.loads(q) # request data is always base 64 byte encoded data = base64.b64decode(request["data"]) - # Double decode to force headers to be re-parsed with proper encoding - message = Parser().parsestr(message_from_bytes(data).as_string()) - # Decode any encoded headers to get at proper string - for key, val in message.items(): - replacement = get_decoded_header(key, val) - if replacement is not None: - message.replace_header(key, replacement) - - # Extract all header information - all_headers = "" - for k, v in message.items(): - all_headers += "{0}: {1}\n".format(k.strip(), v.strip()) - results.append({"values": all_headers, "type": 'email-header'}) - - # E-Mail MIME Boundry - if message.get_boundary(): - results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'}) - - # E-Mail Reply To - if message.get('In-Reply-To'): - results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'}) - - # X-Mailer - if message.get('X-Mailer'): - results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'}) - - # Thread Index - if message.get('Thread-Index'): - results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'}) - - # Email Message ID - if message.get('Message-ID'): - results.append({"values": message.get('Message-ID'), "type": 'email-message-id'}) - - # Subject - if message.get('Subject'): - results.append({"values": message.get('Subject'), "type": 'email-subject'}) - - # Source - from_addr = message.get('From') - if from_addr: - results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)}) - results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)}) - - # Return Path - return_path = message.get('Return-Path') - if return_path: - # E-Mail Source - results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)}) - # E-Mail Source Name - results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)}) - - # Destinations - # Split and sort destination header values - recipient_headers = ['To', 'Cc', 'Bcc'] - - for hdr_val in recipient_headers: - if message.get(hdr_val): - addrs = message.get(hdr_val).split(',') - for addr in addrs: - # Parse and add destination header values - parsed_addr = parseaddr(addr) - results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)}) - results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)}) - - # Get E-Mail Targets - # Get the addresses that received the email. - # As pulled from the Received header - received = message.get_all('Received') - if received: - email_targets = set() - for rec in received: - try: - email_check = re.search(r"for\s(.*@.*);", rec).group(1) - email_check = email_check.strip(' <>') - email_targets.add(parseaddr(email_check)[1]) - except (AttributeError): - continue - for tar in email_targets: - results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"}) + email_object = EMailObject(pseudofile=BytesIO(data), attach_original_mail=True, standalone=False) # Check if we were given a configuration config = request.get("config", {}) @@ -137,66 +58,81 @@ def handler(q=False): zip_pass_crack = config.get("guess_zip_attachment_passwords", None) if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes): zip_pass_crack = True - password_list = None # Only want to collect password list once + password_list = get_zip_passwords(email_object.email) # Do we extract URL's from the email. extract_urls = config.get("extract_urls", None) if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes): extract_urls = True + file_objects = [] # All possible file objects # Get Attachments # Get file names of attachments - for part in message.walk(): - filename = part.get_filename() - if filename is not None: - results.append({"values": filename, "type": 'email-attachment'}) - attachment_data = part.get_payload(decode=True) - # Base attachment data is default - attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}] - if unzip is True: # Attempt to unzip the attachment and return its files - zipped_files = ["doc", "docx", "dot", "dotx", "xls", - "xlsx", "xlm", "xla", "xlc", "xlt", - "xltx", "xlw", "ppt", "pptx", "pps", - "ppsx", "pot", "potx", "potx", "sldx", - "odt", "ods", "odp", "odg", "odf", - "fodt", "fods", "fodp", "fodg", "ott", - "uot"] + for attachment_name, attachment in email_object.attachments: + # Create file objects for the attachments + if not attachment_name: + attachment_name = 'NameMissing.txt' - zipped_filetype = False - for ext in zipped_files: - if filename.endswith(ext) is True: - zipped_filetype = True - if not zipped_filetype: - try: - attachment_files += get_zipped_contents(filename, attachment_data) - except RuntimeError: # File is encrypted with a password - if zip_pass_crack is True: - if password_list is None: - password_list = get_zip_passwords(message) - password = test_zip_passwords(attachment_data, password_list) - if password is None: # Inform the analyst that we could not crack password - attachment_files[0]['comment'] = "Encrypted Zip: Password could not be cracked from message" - else: - attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password) - attachment_files += get_zipped_contents(filename, attachment_data, password=password) - except zipfile.BadZipFile: # Attachment is not a zipfile - pass - for attch_item in attachment_files: - attch_item["type"] = 'malware-sample' - results.append(attch_item) - else: # Check email body part for urls - if (extract_urls is True and part.get_content_type() == 'text/html'): - url_parser = HTMLURLParser() - charset = get_charset(part, get_charset(message)) - url_parser.feed(part.get_payload(decode=True).decode(charset)) - urls = url_parser.urls - for url in urls: - results.append({"values": url, "type": "url"}) - r = {'results': results} + temp_filename = Path(attachment_name) + zipped_files = ["doc", "docx", "dot", "dotx", "xls", "xlsx", "xlm", "xla", + "xlc", "xlt", "xltx", "xlw", "ppt", "pptx", "pps", "ppsx", + "pot", "potx", "potx", "sldx", "odt", "ods", "odp", "odg", + "odf", "fodt", "fods", "fodp", "fodg", "ott", "uot"] + # Attempt to unzip the attachment and return its files + if unzip and temp_filename.suffix[1:] not in zipped_files: + try: + unzip_attachement(attachment_name, attachment, email_object, file_objects) + except RuntimeError: # File is encrypted with a password + if zip_pass_crack is True: + password = test_zip_passwords(attachment, password_list) + if password: + unzip_attachement(attachment_name, attachment, email_object, file_objects, password) + else: # Inform the analyst that we could not crack password + f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) + f_object.comment = "Encrypted Zip: Password could not be cracked from message" + file_objects.append(f_object) + file_objects.append(main_object) + file_objects += sections + email_object.add_reference(f_object.uuid, 'includes', 'Email attachment') + except zipfile.BadZipFile: # Attachment is not a zipfile + # Just straight add the file + f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) + file_objects.append(f_object) + file_objects.append(main_object) + file_objects += sections + email_object.add_reference(f_object.uuid, 'includes', 'Email attachment') + else: + # Just straight add the file + f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) + file_objects.append(f_object) + file_objects.append(main_object) + file_objects += sections + email_object.add_reference(f_object.uuid, 'includes', 'Email attachment') + + mail_body = email_object.email.get_body(preferencelist=('html', 'plain')) + if extract_urls: + charset = mail_body.get_content_charset() + if mail_body.get_content_type() == 'text/html': + url_parser = HTMLURLParser() + url_parser.feed(mail_body.get_payload(decode=True).decode(charset, errors='ignore')) + urls = url_parser.urls + else: + urls = re.findall(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', mail_body.get_payload(decode=True).decode(charset, errors='ignore')) + for url in urls: + if not url: + continue + url_object = URLObject(url, standalone=False) + file_objects.append(url_object) + email_object.add_reference(url_object.uuid, 'includes', 'URL in email body') + + objects = [email_object.to_json()] + if file_objects: + objects += [o.to_json() for o in file_objects if o] + r = {'results': {'Object': [json.loads(o) for o in objects]}} return r -def get_zipped_contents(filename, data, password=None): +def unzip_attachement(filename, data, email_object, file_objects, password=None): """Extract the contents of a zipfile. Args: @@ -210,17 +146,23 @@ def get_zipped_contents(filename, data, password=None): "comment":"string here"} """ - with zipfile.ZipFile(io.BytesIO(data), "r") as zf: - unzipped_files = [] + with zipfile.ZipFile(data, "r") as zf: if password is not None: + comment = f'Extracted from {filename} with password "{password}"' password = str.encode(password) # Byte encoded password required + else: + comment = f'Extracted from {filename}' for zip_file_name in zf.namelist(): # Get all files in the zip file with zf.open(zip_file_name, mode='r', pwd=password) as fp: - file_data = fp.read() - unzipped_files.append({"values": zip_file_name, - "data": base64.b64encode(file_data).decode(), # Any password works when not encrypted - "comment": "Extracted from {0}".format(filename)}) - return unzipped_files + file_data = BytesIO(fp.read()) + f_object, main_object, sections = make_binary_objects(pseudofile=file_data, + filename=zip_file_name, + standalone=False) + f_object.comment = comment + file_objects.append(f_object) + file_objects.append(main_object) + file_objects += sections + email_object.add_reference(f_object.uuid, 'includes', 'Email attachment') def test_zip_passwords(data, test_passwords): @@ -234,7 +176,7 @@ def test_zip_passwords(data, test_passwords): Returns a byte string containing a found password and None if password is not found. """ - with zipfile.ZipFile(io.BytesIO(data), "r") as zf: + with zipfile.ZipFile(data, "r") as zf: firstfile = zf.namelist()[0] for pw_test in test_passwords: byte_pwd = str.encode(pw_test) @@ -268,23 +210,16 @@ def get_zip_passwords(message): # Not checking for multi-part message because by having an # encrypted zip file it must be multi-part. - text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')] - html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')] body = [] - # Get full message character set once - # Language example reference (using python2) - # http://ginstrom.com/scribbles/2007/11/19/parsing-multilingual-email-with-python/ - message_charset = get_charset(message) - for part in text_parts: - charset = get_charset(part, message_charset) - body.append(part.get_payload(decode=True).decode(charset)) - for part in html_parts: - charset = get_charset(part, message_charset) - html_part = part.get_payload(decode=True).decode(charset) - html_parser = HTMLTextParser() - html_parser.feed(html_part) - for text in html_parser.text_data: - body.append(text) + for part in message.walk(): + charset = part.get_content_charset() + if part.get_content_type() == 'text/plain': + body.append(part.get_payload(decode=True).decode(charset, errors='ignore')) + elif part.get_content_type() == 'text/html': + html_parser = HTMLTextParser() + html_parser.feed(part.get_payload(decode=True).decode(charset, errors='ignore')) + for text in html_parser.text_data: + body.append(text) raw_text = "\n".join(body).strip() # Add subject to text corpus to parse @@ -334,63 +269,12 @@ class HTMLURLParser(HTMLParser): def handle_starttag(self, tag, attrs): if tag == 'a': self.urls.append(dict(attrs).get('href')) - - -def get_charset(message, default="ascii"): - """Get a message objects charset - - Args: - message (email.message): Email message object to parse. - default (string): String containing default charset to return. - """ - if message.get_content_charset(): - return message.get_content_charset() - if message.get_charset(): - return message.get_charset() - return default - - -def get_decoded_header(header, value): - subject, encoding = decode_header(value)[0] - subject = subject.strip() # extra whitespace will mess up encoding - if isinstance(subject, bytes): - # Remove Byte Order Mark (BOM) from UTF strings - if encoding == 'utf-8': - return re.sub(codecs.BOM_UTF8, b"", subject).decode(encoding) - if encoding == 'utf-16': - return re.sub(codecs.BOM_UTF16, b"", subject).decode(encoding) - elif encoding == 'utf-32': - return re.sub(codecs.BOM_UTF32, b"", subject).decode(encoding) - # Try various UTF decodings for any unknown 8bit encodings - elif encoding == 'unknown-8bit': - for enc in [('utf-8', codecs.BOM_UTF8), - ('utf-32', codecs.BOM_UTF32), # 32 before 16 so it raises errors - ('utf-16', codecs.BOM_UTF16)]: - try: - return re.sub(enc[1], b"", subject).decode(enc[0]) - except UnicodeDecodeError: - continue - # If none of those encoding work return it in RFC2047 format - return str(subject) - # Provide RFC2047 format string if encoding is a unknown encoding - # Better to have the analyst decode themselves than to provide a mangled string - elif encoding is None: - return str(subject) - else: - return subject.decode(encoding) + if tag == 'img': + self.urls.append(dict(attrs).get('src')) def introspection(): - modulesetup = {} - try: - modulesetup['userConfig'] = userConfig - except NameError: - pass - try: - modulesetup['inputSource'] = inputSource - except NameError: - pass - return modulesetup + return mispattributes def version():