mirror of https://github.com/MISP/misp-modules
Fix tests, cleanup
parent
2db845c45c
commit
9f84db3659
|
@ -44,60 +44,45 @@ def handler(q=False):
|
|||
all_headers = ""
|
||||
for k, v in message.items():
|
||||
all_headers += "{0}: {1}\n".format(k.strip(), v.strip())
|
||||
results.append({"values": all_headers,
|
||||
"types": ['email-header']})
|
||||
results.append({"values": all_headers, "type": 'email-header'})
|
||||
|
||||
# E-Mail MIME Boundry
|
||||
if message.get_boundary():
|
||||
results.append({"values": message.get_boundary(),
|
||||
"types": ['email-mime-boundary']})
|
||||
results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'})
|
||||
|
||||
# E-Mail Reply To
|
||||
if message.get('In-Reply-To'):
|
||||
results.append({"values": message.get('In-Reply-To').strip(),
|
||||
"types": ['email-reply-to']})
|
||||
results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'})
|
||||
|
||||
# X-Mailer
|
||||
if message.get('X-Mailer'):
|
||||
results.append({"values": message.get('X-Mailer'),
|
||||
"types": ['email-x-mailer']})
|
||||
results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'})
|
||||
|
||||
# Thread Index
|
||||
if message.get('Thread-Index'):
|
||||
results.append({"values": message.get('Thread-Index'),
|
||||
"types": ['email-thread-index']})
|
||||
results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'})
|
||||
|
||||
# Email Message ID
|
||||
if message.get('Message-ID'):
|
||||
results.append({"values": message.get('Message-ID'),
|
||||
"types": ['email-message-id']})
|
||||
results.append({"values": message.get('Message-ID'), "type": 'email-message-id'})
|
||||
|
||||
# Subject
|
||||
if message.get('Subject'):
|
||||
results.append({"values": message.get('Subject'),
|
||||
"types": ['email-subject']})
|
||||
results.append({"values": message.get('Subject'), "type": 'email-subject'})
|
||||
|
||||
# Source
|
||||
from_addr = message.get('From')
|
||||
if from_addr:
|
||||
results.append({"values": parseaddr(from_addr)[1],
|
||||
"types": ['email-src'],
|
||||
"comment": "From: {0}".format(from_addr)})
|
||||
results.append({"values": parseaddr(from_addr)[0],
|
||||
"types": ['email-src-display-name'],
|
||||
"comment": "From: {0}".format(from_addr)})
|
||||
results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)})
|
||||
results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)})
|
||||
|
||||
# Return Path
|
||||
return_path = message.get('Return-Path')
|
||||
if return_path:
|
||||
# E-Mail Source
|
||||
results.append({"values": parseaddr(return_path)[1],
|
||||
"types": ['email-src'],
|
||||
"comment": "Return Path: {0}".format(return_path)})
|
||||
results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)})
|
||||
# E-Mail Source Name
|
||||
results.append({"values": parseaddr(return_path)[0],
|
||||
"types": ['email-src-display-name'],
|
||||
"comment": "Return Path: {0}".format(return_path)})
|
||||
results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)})
|
||||
|
||||
# Destinations
|
||||
# Split and sort destination header values
|
||||
|
@ -109,14 +94,8 @@ def handler(q=False):
|
|||
for addr in addrs:
|
||||
# Parse and add destination header values
|
||||
parsed_addr = parseaddr(addr)
|
||||
results.append({"values": parsed_addr[1],
|
||||
"types": ["email-dst"],
|
||||
"comment": "{0}: {1}".format(hdr_val,
|
||||
addr)})
|
||||
results.append({"values": parsed_addr[0],
|
||||
"types": ["email-dst-display-name"],
|
||||
"comment": "{0}: {1}".format(hdr_val,
|
||||
addr)})
|
||||
results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)})
|
||||
results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)})
|
||||
|
||||
# Get E-Mail Targets
|
||||
# Get the addresses that received the email.
|
||||
|
@ -132,9 +111,7 @@ def handler(q=False):
|
|||
except (AttributeError):
|
||||
continue
|
||||
for tar in email_targets:
|
||||
results.append({"values": tar,
|
||||
"types": ["target-email"],
|
||||
"comment": "Extracted from email 'Received' header"})
|
||||
results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"})
|
||||
|
||||
# Check if we were given a configuration
|
||||
config = request.get("config", {})
|
||||
|
@ -162,7 +139,7 @@ def handler(q=False):
|
|||
for part in message.walk():
|
||||
filename = part.get_filename()
|
||||
if filename is not None:
|
||||
results.append({"values": filename, "types": ['email-attachment']})
|
||||
results.append({"values": filename, "type": 'email-attachment'})
|
||||
attachment_data = part.get_payload(decode=True)
|
||||
# Base attachment data is default
|
||||
attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
|
||||
|
@ -182,7 +159,7 @@ def handler(q=False):
|
|||
except zipfile.BadZipFile: # Attachment is not a zipfile
|
||||
attachment_files += [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
|
||||
for attch_item in attachment_files:
|
||||
attch_item["types"] = ['malware-sample']
|
||||
attch_item["type"] = 'malware-sample'
|
||||
results.append(attch_item)
|
||||
else: # Check email body part for urls
|
||||
if (extract_urls is True and part.get_content_type() == 'text/html'):
|
||||
|
@ -191,8 +168,7 @@ def handler(q=False):
|
|||
url_parser.feed(part.get_payload(decode=True).decode(charset))
|
||||
urls = url_parser.urls
|
||||
for url in urls:
|
||||
results.append({"values": url,
|
||||
"types": "url"})
|
||||
results.append({"values": url, "type": "url"})
|
||||
r = {'results': results}
|
||||
return r
|
||||
|
||||
|
@ -269,12 +245,8 @@ def get_zip_passwords(message):
|
|||
|
||||
# Not checking for multi-part message because by having an
|
||||
# encrypted zip file it must be multi-part.
|
||||
text_parts = [part for part in typed_subpart_iterator(message,
|
||||
'text',
|
||||
'plain')]
|
||||
html_parts = [part for part in typed_subpart_iterator(message,
|
||||
'text',
|
||||
'html')]
|
||||
text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')]
|
||||
html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')]
|
||||
body = []
|
||||
# Get full message character set once
|
||||
# Language example reference (using python2)
|
||||
|
@ -299,8 +271,7 @@ def get_zip_passwords(message):
|
|||
# Grab any strings that are marked off by special chars
|
||||
marking_chars = [["\'", "\'"], ['"', '"'], ['[', ']'], ['(', ')']]
|
||||
for char_set in marking_chars:
|
||||
regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0],
|
||||
char_set[1]))
|
||||
regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0], char_set[1]))
|
||||
marked_off = re.findall(regex, raw_text)
|
||||
possible_passwords += marked_off
|
||||
|
||||
|
|
|
@ -70,8 +70,8 @@ class TestModules(unittest.TestCase):
|
|||
values = [x["values"] for x in results]
|
||||
types = {}
|
||||
for i in results:
|
||||
types.setdefault(i["types"][0], 0)
|
||||
types[i["types"][0]] += 1
|
||||
types.setdefault(i["type"], 0)
|
||||
types[i["type"]] += 1
|
||||
# Check that there are the appropriate number of items
|
||||
# Check that all the items were correct
|
||||
self.assertEqual(types['target-email'], 1)
|
||||
|
@ -125,11 +125,11 @@ class TestModules(unittest.TestCase):
|
|||
values = [x["values"] for x in response.json()['results']]
|
||||
self.assertIn('EICAR.com', values)
|
||||
for i in response.json()['results']:
|
||||
if i["types"][0] == 'attachment':
|
||||
if i["type"] == 'email-attachment':
|
||||
self.assertEqual(i["values"], "EICAR.com")
|
||||
if i['type'] == 'malware-sample':
|
||||
attch_data = base64.b64decode(i["data"])
|
||||
self.assertEqual(attch_data,
|
||||
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
||||
self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
||||
|
||||
|
||||
def test_email_attachment_unpack(self):
|
||||
|
@ -151,13 +151,13 @@ class TestModules(unittest.TestCase):
|
|||
self.assertIn('EICAR.com', values)
|
||||
self.assertIn('EICAR.com.zip', values)
|
||||
for i in response.json()['results']:
|
||||
if i["values"] == 'EICAR.com.zip':
|
||||
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
|
||||
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
|
||||
with zf.open("EICAR.com") as ec:
|
||||
attch_data = ec.read()
|
||||
self.assertEqual(attch_data,
|
||||
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
||||
if i["values"] == 'EICAR.com':
|
||||
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
|
||||
attch_data = base64.b64decode(i["data"])
|
||||
self.assertEqual(attch_data,
|
||||
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
||||
|
@ -182,11 +182,11 @@ class TestModules(unittest.TestCase):
|
|||
self.assertIn('EICAR.com', values)
|
||||
self.assertIn('EICAR.com.zip', values)
|
||||
for i in response.json()['results']:
|
||||
if i["values"] == 'EICAR.com.zip':
|
||||
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
|
||||
with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
|
||||
# Make sure password was set and still in place
|
||||
self.assertRaises(RuntimeError, zf.open, "EICAR.com")
|
||||
if i["values"] == 'EICAR.com':
|
||||
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
|
||||
attch_data = base64.b64decode(i["data"])
|
||||
self.assertEqual(attch_data,
|
||||
b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
||||
|
@ -238,7 +238,7 @@ class TestModules(unittest.TestCase):
|
|||
self.assertIn('EICAR.com', values)
|
||||
for i in response.json()['results']:
|
||||
# Check that it could be extracted.
|
||||
if i["values"] == 'EICAR.com':
|
||||
if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
|
||||
attch_data = base64.b64decode(i["data"]).decode()
|
||||
self.assertEqual(attch_data,
|
||||
'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
|
||||
|
|
Loading…
Reference in New Issue