mirror of https://github.com/MISP/misp-modules
Make PEP8 happy
parent
3f83357a2d
commit
93a49c3c1d
|
@ -12,7 +12,7 @@ from email.iterators import typed_subpart_iterator
|
||||||
from html.parser import HTMLParser
|
from html.parser import HTMLParser
|
||||||
|
|
||||||
misperrors = {'error': 'Error'}
|
misperrors = {'error': 'Error'}
|
||||||
userConfig = { }
|
userConfig = {}
|
||||||
|
|
||||||
inputSource = ['file']
|
inputSource = ['file']
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ def handler(q=False):
|
||||||
results.append({"values": message.get('Thread-Index'),
|
results.append({"values": message.get('Thread-Index'),
|
||||||
"types": ['email-thread-index']})
|
"types": ['email-thread-index']})
|
||||||
|
|
||||||
## Email Message ID
|
# Email Message ID
|
||||||
results.append({"values": message.get('Message-ID'),
|
results.append({"values": message.get('Message-ID'),
|
||||||
"types": ['email-message-id']})
|
"types": ['email-message-id']})
|
||||||
|
|
||||||
|
@ -92,22 +92,21 @@ def handler(q=False):
|
||||||
"comment": "Return Path: {0}".format(return_path)})
|
"comment": "Return Path: {0}".format(return_path)})
|
||||||
|
|
||||||
# Destinations
|
# Destinations
|
||||||
## Split and sort destination header values
|
# Split and sort destination header values
|
||||||
recipient_headers = ['To', 'Cc', 'Bcc']
|
recipient_headers = ['To', 'Cc', 'Bcc']
|
||||||
destinations = {}
|
|
||||||
|
|
||||||
for hdr_val in recipient_headers:
|
for hdr_val in recipient_headers:
|
||||||
try:
|
try:
|
||||||
addrs = message.get(hdr_val).split(',')
|
addrs = message.get(hdr_val).split(',')
|
||||||
for addr in addrs:
|
for addr in addrs:
|
||||||
## Parse and add destination header values
|
# Parse and add destination header values
|
||||||
parsed_addr = parseaddr(addr)
|
parsed_addr = parseaddr(addr)
|
||||||
results.append({"values": parsed_addr[1],
|
results.append({"values": parsed_addr[1],
|
||||||
"types": ["email-dst"],
|
"types": ["email-dst"],
|
||||||
"comment": "{0}: {1}".format(hdr_val,
|
"comment": "{0}: {1}".format(hdr_val,
|
||||||
addr)})
|
addr)})
|
||||||
results.append({"values": parsed_addr[0],
|
results.append({"values": parsed_addr[0],
|
||||||
"types": ["email-dst-display-name"],
|
"types": ["email-dst-display-name"],
|
||||||
"comment": "{0}: {1}".format(hdr_val,
|
"comment": "{0}: {1}".format(hdr_val,
|
||||||
addr)})
|
addr)})
|
||||||
|
|
||||||
|
@ -128,11 +127,11 @@ def handler(q=False):
|
||||||
except (AttributeError):
|
except (AttributeError):
|
||||||
continue
|
continue
|
||||||
for tar in email_targets:
|
for tar in email_targets:
|
||||||
results.append({"values": tar,
|
results.append({"values": tar,
|
||||||
"types": ["target-email"],
|
"types": ["target-email"],
|
||||||
"comment": "Extracted from email 'Received' header"})
|
"comment": "Extracted from email 'Received' header"})
|
||||||
except TypeError:
|
except TypeError:
|
||||||
pass # If received header is missing we can't iterate over NoneType
|
pass # If received header is missing we can't iterate over NoneType
|
||||||
|
|
||||||
# Check if we were given a configuration
|
# Check if we were given a configuration
|
||||||
config = request.get("config", {})
|
config = request.get("config", {})
|
||||||
|
@ -141,21 +140,18 @@ def handler(q=False):
|
||||||
|
|
||||||
# Do we unzip attachments we find?
|
# Do we unzip attachments we find?
|
||||||
unzip = config.get("unzip_attachments", None)
|
unzip = config.get("unzip_attachments", None)
|
||||||
if (unzip is not None and
|
if (unzip is not None and unzip.lower() in acceptable_config_yes):
|
||||||
unzip.lower() in acceptable_config_yes):
|
|
||||||
unzip = True
|
unzip = True
|
||||||
|
|
||||||
# Do we try to find passwords for protected zip files?
|
# Do we try to find passwords for protected zip files?
|
||||||
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
|
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
|
||||||
if (zip_pass_crack is not None and
|
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes):
|
||||||
zip_pass_crack.lower() in acceptable_config_yes):
|
|
||||||
zip_pass_crack = True
|
zip_pass_crack = True
|
||||||
password_list = None # Only want to collect password list once
|
password_list = None # Only want to collect password list once
|
||||||
|
|
||||||
# Do we extract URL's from the email.
|
# Do we extract URL's from the email.
|
||||||
extract_urls = config.get("extract_urls", None)
|
extract_urls = config.get("extract_urls", None)
|
||||||
if (extract_urls is not None and
|
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes):
|
||||||
extract_urls.lower() in acceptable_config_yes):
|
|
||||||
extract_urls = True
|
extract_urls = True
|
||||||
|
|
||||||
# Get Attachments
|
# Get Attachments
|
||||||
|
@ -166,30 +162,27 @@ def handler(q=False):
|
||||||
attachment_data = part.get_payload(decode=True)
|
attachment_data = part.get_payload(decode=True)
|
||||||
# Base attachment data is default
|
# Base attachment data is default
|
||||||
attachment_files = [{"values": filename,
|
attachment_files = [{"values": filename,
|
||||||
"data" : base64.b64encode(attachment_data).decode()}]
|
"data": base64.b64encode(attachment_data).decode()}]
|
||||||
if unzip is True: # Attempt to unzip the attachment and return its files
|
if unzip is True: # Attempt to unzip the attachment and return its files
|
||||||
try:
|
try:
|
||||||
attachment_files += get_zipped_contents(filename,
|
attachment_files += get_zipped_contents(filename, attachment_data)
|
||||||
attachment_data)
|
|
||||||
except RuntimeError: # File is encrypted with a password
|
except RuntimeError: # File is encrypted with a password
|
||||||
if zip_pass_crack is True:
|
if zip_pass_crack is True:
|
||||||
if password_list is None:
|
if password_list is None:
|
||||||
password_list = get_zip_passwords(message)
|
password_list = get_zip_passwords(message)
|
||||||
password = test_zip_passwords(attachment_data, password_list)
|
password = test_zip_passwords(attachment_data, password_list)
|
||||||
if password is None: # Inform the analyst that we could not crack password
|
if password is None: # Inform the analyst that we could not crack password
|
||||||
attachment_files[0]['comment'] = "Encrypted Zip: Password could not be cracked from message"
|
attachment_files[0]['comment'] = "Encrypted Zip: Password could not be cracked from message"
|
||||||
else:
|
else:
|
||||||
attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password)
|
attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password)
|
||||||
attachment_files += get_zipped_contents(filename,
|
attachment_files += get_zipped_contents(filename, attachment_data, password=password)
|
||||||
attachment_data,
|
except zipfile.BadZipFile: # Attachment is not a zipfile
|
||||||
password=password)
|
|
||||||
except zipfile.BadZipFile: # Attachment is not a zipfile
|
|
||||||
attachment_files += [{"values": filename,
|
attachment_files += [{"values": filename,
|
||||||
"data" : base64.b64encode(attachment_data).decode()}]
|
"data": base64.b64encode(attachment_data).decode()}]
|
||||||
for attch_item in attachment_files:
|
for attch_item in attachment_files:
|
||||||
attch_item["types"] = ['attachment']
|
attch_item["types"] = ['attachment']
|
||||||
results.append(attch_item)
|
results.append(attch_item)
|
||||||
else: # Check email body part for urls
|
else: # Check email body part for urls
|
||||||
if (extract_urls is True and part.get_content_type() == 'text/html'):
|
if (extract_urls is True and part.get_content_type() == 'text/html'):
|
||||||
url_parser = HTMLURLParser()
|
url_parser = HTMLURLParser()
|
||||||
charset = get_charset(part, get_charset(message))
|
charset = get_charset(part, get_charset(message))
|
||||||
|
@ -201,6 +194,7 @@ def handler(q=False):
|
||||||
r = {'results': results}
|
r = {'results': results}
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
|
||||||
def get_zipped_contents(filename, data, password=None):
|
def get_zipped_contents(filename, data, password=None):
|
||||||
"""Extract the contents of a zipfile.
|
"""Extract the contents of a zipfile.
|
||||||
|
|
||||||
|
@ -223,7 +217,7 @@ def get_zipped_contents(filename, data, password=None):
|
||||||
with zf.open(zip_file_name, mode='rU', pwd=password) as fp:
|
with zf.open(zip_file_name, mode='rU', pwd=password) as fp:
|
||||||
file_data = fp.read()
|
file_data = fp.read()
|
||||||
unzipped_files.append({"values": zip_file_name,
|
unzipped_files.append({"values": zip_file_name,
|
||||||
"data" : base64.b64encode(file_data).decode(), # Any password works when not encrypted
|
"data": base64.b64encode(file_data).decode(), # Any password works when not encrypted
|
||||||
"comment": "Extracted from {0}".format(filename)})
|
"comment": "Extracted from {0}".format(filename)})
|
||||||
return unzipped_files
|
return unzipped_files
|
||||||
|
|
||||||
|
@ -250,6 +244,7 @@ def test_zip_passwords(data, test_passwords):
|
||||||
continue
|
continue
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_zip_passwords(message):
|
def get_zip_passwords(message):
|
||||||
""" Parse message for possible zip password combinations.
|
""" Parse message for possible zip password combinations.
|
||||||
|
|
||||||
|
@ -317,6 +312,7 @@ def get_zip_passwords(message):
|
||||||
|
|
||||||
return possible_passwords
|
return possible_passwords
|
||||||
|
|
||||||
|
|
||||||
class HTMLTextParser(HTMLParser):
|
class HTMLTextParser(HTMLParser):
|
||||||
""" Parse all text and data from HTML strings."""
|
""" Parse all text and data from HTML strings."""
|
||||||
def __init__(self, text_data=None):
|
def __init__(self, text_data=None):
|
||||||
|
@ -325,9 +321,11 @@ class HTMLTextParser(HTMLParser):
|
||||||
self.text_data = []
|
self.text_data = []
|
||||||
else:
|
else:
|
||||||
self.text_data = text_data
|
self.text_data = text_data
|
||||||
|
|
||||||
def handle_data(self, data):
|
def handle_data(self, data):
|
||||||
self.text_data.append(data)
|
self.text_data.append(data)
|
||||||
|
|
||||||
|
|
||||||
class HTMLURLParser(HTMLParser):
|
class HTMLURLParser(HTMLParser):
|
||||||
""" Parse all href targets from HTML strings."""
|
""" Parse all href targets from HTML strings."""
|
||||||
def __init__(self, urls=None):
|
def __init__(self, urls=None):
|
||||||
|
@ -336,10 +334,12 @@ class HTMLURLParser(HTMLParser):
|
||||||
self.urls = []
|
self.urls = []
|
||||||
else:
|
else:
|
||||||
self.urls = urls
|
self.urls = urls
|
||||||
|
|
||||||
def handle_starttag(self, tag, attrs):
|
def handle_starttag(self, tag, attrs):
|
||||||
if tag == 'a':
|
if tag == 'a':
|
||||||
self.urls.append(dict(attrs).get('href'))
|
self.urls.append(dict(attrs).get('href'))
|
||||||
|
|
||||||
|
|
||||||
def get_charset(message, default="ascii"):
|
def get_charset(message, default="ascii"):
|
||||||
"""Get a message objects charset
|
"""Get a message objects charset
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue