chg: [internal] Optimise email_import

pull/648/head
Jakub Onderka 2024-01-06 23:30:21 +01:00
parent 4596d76887
commit 658ae11941
1 changed files with 11 additions and 18 deletions

View File

@ -1,6 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json import json
import base64 import base64
import zipfile import zipfile
@ -33,12 +31,7 @@ moduleconfig = ["unzip_attachments",
"extract_urls"] "extract_urls"]
def handler(q=False): def dict_handler(request: dict):
if q is False:
return False
# Decode and parse email
request = json.loads(q)
# request data is always base 64 byte encoded # request data is always base 64 byte encoded
data = base64.b64decode(request["data"]) data = base64.b64decode(request["data"])
@ -51,18 +44,18 @@ def handler(q=False):
# Do we unzip attachments we find? # Do we unzip attachments we find?
unzip = config.get("unzip_attachments", None) unzip = config.get("unzip_attachments", None)
if (unzip is not None and unzip.lower() in acceptable_config_yes): if unzip is not None and unzip.lower() in acceptable_config_yes:
unzip = True unzip = True
# Do we try to find passwords for protected zip files? # Do we try to find passwords for protected zip files?
zip_pass_crack = config.get("guess_zip_attachment_passwords", None) zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes): if zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes:
zip_pass_crack = True zip_pass_crack = True
password_list = get_zip_passwords(email_object.email) password_list = get_zip_passwords(email_object.email)
# Do we extract URL's from the email. # Do we extract URL's from the email.
extract_urls = config.get("extract_urls", None) extract_urls = config.get("extract_urls", None)
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes): if extract_urls is not None and extract_urls.lower() in acceptable_config_yes:
extract_urls = True extract_urls = True
file_objects = [] # All possible file objects file_objects = [] # All possible file objects
@ -81,12 +74,12 @@ def handler(q=False):
# Attempt to unzip the attachment and return its files # Attempt to unzip the attachment and return its files
if unzip and temp_filename.suffix[1:] not in zipped_files: if unzip and temp_filename.suffix[1:] not in zipped_files:
try: try:
unzip_attachement(attachment_name, attachment, email_object, file_objects) unzip_attachment(attachment_name, attachment, email_object, file_objects)
except RuntimeError: # File is encrypted with a password except RuntimeError: # File is encrypted with a password
if zip_pass_crack is True: if zip_pass_crack is True:
password = test_zip_passwords(attachment, password_list) password = test_zip_passwords(attachment, password_list)
if password: if password:
unzip_attachement(attachment_name, attachment, email_object, file_objects, password) unzip_attachment(attachment_name, attachment, email_object, file_objects, password)
else: # Inform the analyst that we could not crack password else: # Inform the analyst that we could not crack password
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
f_object.comment = "Encrypted Zip: Password could not be cracked from message" f_object.comment = "Encrypted Zip: Password could not be cracked from message"
@ -125,14 +118,14 @@ def handler(q=False):
file_objects.append(url_object) file_objects.append(url_object)
email_object.add_reference(url_object.uuid, 'includes', 'URL in email body') email_object.add_reference(url_object.uuid, 'includes', 'URL in email body')
objects = [email_object.to_json()] objects = [email_object.to_dict()]
if file_objects: if file_objects:
objects += [o.to_json() for o in file_objects if o] objects += [o.to_dict() for o in file_objects if o]
r = {'results': {'Object': [json.loads(o) for o in objects]}} r = {'results': {'Object': objects}}
return r return r
def unzip_attachement(filename, data, email_object, file_objects, password=None): def unzip_attachment(filename, data, email_object, file_objects, password=None):
"""Extract the contents of a zipfile. """Extract the contents of a zipfile.
Args: Args:
@ -289,4 +282,4 @@ def version():
if __name__ == '__main__': if __name__ == '__main__':
with open('tests/test_no_attach.eml', 'r') as email_file: with open('tests/test_no_attach.eml', 'r') as email_file:
handler(q=email_file.read()) dict_handler(json.loads(email_file.read()))