From 2db845c45ca50056401f5190e27f68eebb4de3fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Sat, 7 Jan 2017 14:39:52 -0500 Subject: [PATCH 1/2] Improve support of email attachments Related to #90 --- misp_modules/modules/import_mod/email_import.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/misp_modules/modules/import_mod/email_import.py b/misp_modules/modules/import_mod/email_import.py index f1f7259..0ca4b3f 100644 --- a/misp_modules/modules/import_mod/email_import.py +++ b/misp_modules/modules/import_mod/email_import.py @@ -162,10 +162,10 @@ def handler(q=False): for part in message.walk(): filename = part.get_filename() if filename is not None: + results.append({"values": filename, "types": ['email-attachment']}) attachment_data = part.get_payload(decode=True) # Base attachment data is default - attachment_files = [{"values": filename, - "data": base64.b64encode(attachment_data).decode()}] + attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}] if unzip is True: # Attempt to unzip the attachment and return its files try: attachment_files += get_zipped_contents(filename, attachment_data) @@ -180,10 +180,9 @@ def handler(q=False): attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password) attachment_files += get_zipped_contents(filename, attachment_data, password=password) except zipfile.BadZipFile: # Attachment is not a zipfile - attachment_files += [{"values": filename, - "data": base64.b64encode(attachment_data).decode()}] + attachment_files += [{"values": filename, "data": base64.b64encode(attachment_data).decode()}] for attch_item in attachment_files: - attch_item["types"] = ['attachment'] + attch_item["types"] = ['malware-sample'] results.append(attch_item) else: # Check email body part for urls if (extract_urls is True and part.get_content_type() == 'text/html'): From 9f84db365951091614f7a5b4763ad387dec77b09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Sat, 7 Jan 2017 18:36:08 -0500 Subject: [PATCH 2/2] Fix tests, cleanup --- .../modules/import_mod/email_import.py | 69 ++++++------------- tests/test.py | 20 +++--- 2 files changed, 30 insertions(+), 59 deletions(-) diff --git a/misp_modules/modules/import_mod/email_import.py b/misp_modules/modules/import_mod/email_import.py index 0ca4b3f..bd0b231 100644 --- a/misp_modules/modules/import_mod/email_import.py +++ b/misp_modules/modules/import_mod/email_import.py @@ -44,60 +44,45 @@ def handler(q=False): all_headers = "" for k, v in message.items(): all_headers += "{0}: {1}\n".format(k.strip(), v.strip()) - results.append({"values": all_headers, - "types": ['email-header']}) + results.append({"values": all_headers, "type": 'email-header'}) # E-Mail MIME Boundry if message.get_boundary(): - results.append({"values": message.get_boundary(), - "types": ['email-mime-boundary']}) + results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'}) # E-Mail Reply To if message.get('In-Reply-To'): - results.append({"values": message.get('In-Reply-To').strip(), - "types": ['email-reply-to']}) + results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'}) # X-Mailer if message.get('X-Mailer'): - results.append({"values": message.get('X-Mailer'), - "types": ['email-x-mailer']}) + results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'}) # Thread Index if message.get('Thread-Index'): - results.append({"values": message.get('Thread-Index'), - "types": ['email-thread-index']}) + results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'}) # Email Message ID if message.get('Message-ID'): - results.append({"values": message.get('Message-ID'), - "types": ['email-message-id']}) + results.append({"values": message.get('Message-ID'), "type": 'email-message-id'}) # Subject if message.get('Subject'): - results.append({"values": message.get('Subject'), - "types": ['email-subject']}) + results.append({"values": message.get('Subject'), "type": 'email-subject'}) # Source from_addr = message.get('From') if from_addr: - results.append({"values": parseaddr(from_addr)[1], - "types": ['email-src'], - "comment": "From: {0}".format(from_addr)}) - results.append({"values": parseaddr(from_addr)[0], - "types": ['email-src-display-name'], - "comment": "From: {0}".format(from_addr)}) + results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)}) + results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)}) # Return Path return_path = message.get('Return-Path') if return_path: # E-Mail Source - results.append({"values": parseaddr(return_path)[1], - "types": ['email-src'], - "comment": "Return Path: {0}".format(return_path)}) + results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)}) # E-Mail Source Name - results.append({"values": parseaddr(return_path)[0], - "types": ['email-src-display-name'], - "comment": "Return Path: {0}".format(return_path)}) + results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)}) # Destinations # Split and sort destination header values @@ -109,14 +94,8 @@ def handler(q=False): for addr in addrs: # Parse and add destination header values parsed_addr = parseaddr(addr) - results.append({"values": parsed_addr[1], - "types": ["email-dst"], - "comment": "{0}: {1}".format(hdr_val, - addr)}) - results.append({"values": parsed_addr[0], - "types": ["email-dst-display-name"], - "comment": "{0}: {1}".format(hdr_val, - addr)}) + results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)}) + results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)}) # Get E-Mail Targets # Get the addresses that received the email. @@ -132,9 +111,7 @@ def handler(q=False): except (AttributeError): continue for tar in email_targets: - results.append({"values": tar, - "types": ["target-email"], - "comment": "Extracted from email 'Received' header"}) + results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"}) # Check if we were given a configuration config = request.get("config", {}) @@ -162,7 +139,7 @@ def handler(q=False): for part in message.walk(): filename = part.get_filename() if filename is not None: - results.append({"values": filename, "types": ['email-attachment']}) + results.append({"values": filename, "type": 'email-attachment'}) attachment_data = part.get_payload(decode=True) # Base attachment data is default attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}] @@ -182,7 +159,7 @@ def handler(q=False): except zipfile.BadZipFile: # Attachment is not a zipfile attachment_files += [{"values": filename, "data": base64.b64encode(attachment_data).decode()}] for attch_item in attachment_files: - attch_item["types"] = ['malware-sample'] + attch_item["type"] = 'malware-sample' results.append(attch_item) else: # Check email body part for urls if (extract_urls is True and part.get_content_type() == 'text/html'): @@ -191,8 +168,7 @@ def handler(q=False): url_parser.feed(part.get_payload(decode=True).decode(charset)) urls = url_parser.urls for url in urls: - results.append({"values": url, - "types": "url"}) + results.append({"values": url, "type": "url"}) r = {'results': results} return r @@ -269,12 +245,8 @@ def get_zip_passwords(message): # Not checking for multi-part message because by having an # encrypted zip file it must be multi-part. - text_parts = [part for part in typed_subpart_iterator(message, - 'text', - 'plain')] - html_parts = [part for part in typed_subpart_iterator(message, - 'text', - 'html')] + text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')] + html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')] body = [] # Get full message character set once # Language example reference (using python2) @@ -299,8 +271,7 @@ def get_zip_passwords(message): # Grab any strings that are marked off by special chars marking_chars = [["\'", "\'"], ['"', '"'], ['[', ']'], ['(', ')']] for char_set in marking_chars: - regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0], - char_set[1])) + regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0], char_set[1])) marked_off = re.findall(regex, raw_text) possible_passwords += marked_off diff --git a/tests/test.py b/tests/test.py index 81e7879..d15144d 100644 --- a/tests/test.py +++ b/tests/test.py @@ -70,8 +70,8 @@ class TestModules(unittest.TestCase): values = [x["values"] for x in results] types = {} for i in results: - types.setdefault(i["types"][0], 0) - types[i["types"][0]] += 1 + types.setdefault(i["type"], 0) + types[i["type"]] += 1 # Check that there are the appropriate number of items # Check that all the items were correct self.assertEqual(types['target-email'], 1) @@ -125,11 +125,11 @@ class TestModules(unittest.TestCase): values = [x["values"] for x in response.json()['results']] self.assertIn('EICAR.com', values) for i in response.json()['results']: - if i["types"][0] == 'attachment': + if i["type"] == 'email-attachment': self.assertEqual(i["values"], "EICAR.com") + if i['type'] == 'malware-sample': attch_data = base64.b64decode(i["data"]) - self.assertEqual(attch_data, - b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') + self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') def test_email_attachment_unpack(self): @@ -151,13 +151,13 @@ class TestModules(unittest.TestCase): self.assertIn('EICAR.com', values) self.assertIn('EICAR.com.zip', values) for i in response.json()['results']: - if i["values"] == 'EICAR.com.zip': + if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip': with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf: with zf.open("EICAR.com") as ec: attch_data = ec.read() self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') - if i["values"] == 'EICAR.com': + if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com': attch_data = base64.b64decode(i["data"]) self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') @@ -182,11 +182,11 @@ class TestModules(unittest.TestCase): self.assertIn('EICAR.com', values) self.assertIn('EICAR.com.zip', values) for i in response.json()['results']: - if i["values"] == 'EICAR.com.zip': + if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip': with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf: # Make sure password was set and still in place self.assertRaises(RuntimeError, zf.open, "EICAR.com") - if i["values"] == 'EICAR.com': + if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com': attch_data = base64.b64decode(i["data"]) self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-') @@ -238,7 +238,7 @@ class TestModules(unittest.TestCase): self.assertIn('EICAR.com', values) for i in response.json()['results']: # Check that it could be extracted. - if i["values"] == 'EICAR.com': + if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com': attch_data = base64.b64decode(i["data"]).decode() self.assertEqual(attch_data, 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')