
596 lines
29 KiB
Raw Normal View History

2018-05-14 23:23:30 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
2018-05-14 23:23:30 +02:00
import re
import syslog
import html
import os
2018-05-14 23:23:30 +02:00
from io import BytesIO
from ipaddress import ip_address
from email import message_from_bytes, policy, message
from email.parser import BytesParser
2018-05-14 23:23:30 +02:00
from . import urlmarker, hashmarker
2023-02-16 16:16:50 +01:00
from pyfaup.faup import Faup # type: ignore
2019-07-18 15:12:15 +02:00
from pymisp import ExpandedPyMISP, MISPEvent, MISPObject, MISPSighting, InvalidMISPObject
from import EMailObject, make_binary_objects, VTReportObject
2023-02-16 16:16:50 +01:00
from defang import refang # type: ignore
from datetime import datetime
from O365 import Account
from O365.message import Message
from O365.utils import AWSS3Backend, AWSSecretsBackend, EnvTokenBackend, FileSystemTokenBackend, FirestoreBackend
from typing import Iterator, List, Optional, Union
import dns.resolver
HAS_DNS = True
except ImportError:
HAS_DNS = False
2018-05-14 23:23:30 +02:00
def is_ip(address):
except ValueError:
return False
return True
class Mail2MISP():
2019-07-19 12:20:56 +02:00
def __init__(self, misp_url, misp_key, verifycert, config, offline=False, urlsonly=False):
2018-05-14 23:23:30 +02:00
self.offline = offline
if not self.offline:
2019-07-18 15:12:15 +02:00
self.misp = ExpandedPyMISP(misp_url, misp_key, verifycert, debug=config.debug)
2018-05-14 23:23:30 +02:00
self.config = config
2019-07-19 12:20:56 +02:00
self.urlsonly = urlsonly
if not hasattr(self.config, 'enable_dns'):
setattr(self.config, 'enable_dns', True)
2019-07-19 12:20:56 +02:00
if self.urlsonly is False:
setattr(self.config, 'enable_dns', False)
2018-05-14 23:23:30 +02:00
self.debug = self.config.debug
self.config_from_email_body = {}
if not hasattr(self.config, 'ignore_nullsize_attachments'):
setattr(self.config, 'ignore_nullsize_attachments', False)
self.ignore_nullsize_attachments = self.config.ignore_nullsize_attachments
2018-05-14 23:23:30 +02:00
# Init Faup
self.f = Faup()
self.sightings_to_add = []
2018-05-14 23:23:30 +02:00
def load_email(self, pseudofile):
self.pseudofile = pseudofile
self.original_mail = message_from_bytes(self.pseudofile.getvalue(), policy=policy.default)
2019-07-23 14:45:50 +02:00
self.sender = self.original_mail.get('From')
2020-06-16 12:03:59 +02:00
except Exception:
2019-07-23 14:45:50 +02:00
self.sender = "<unknown sender>"
2020-06-16 12:03:59 +02:00
self.subject = self.original_mail.get('Subject')
# Remove words from subject
for removeword in self.config.removelist:
self.subject = re.sub(removeword, "", self.subject).strip()
except Exception as ex:
self.subject = "<subject could not be retrieved>"
if self.debug:
2018-05-14 23:23:30 +02:00
# Initialize the MISP event
self.misp_event = MISPEvent() = f'{self.config.email_subject_prefix} - {self.subject}'
self.misp_event.distribution = self.config.default_distribution
self.misp_event.threat_level_id = self.config.default_threat_level
self.misp_event.analysis = self.config.default_analysis
2018-05-14 23:23:30 +02:00
def load_o365_email(self, msg: Message):
self.msg = msg
self.sender = self.msg.sender.address
except Exception as ex:
self.sender = "<unknown sender>"
if self.debug:
self.reply_to = self.msg.reply_to[0].address
except Exception as ex:
self.reply_to = None
if self.debug:
self.subject = self.msg.subject
# remove words from subject
for removeword in self.config.removelist:
self.subject = re.sub(removeword, "", self.subject).strip()
except Exception as ex:
self.subject = "<subject could not be retrieved>"
if self.debug:
# initialize the MISP event
self.misp_event = MISPEvent() = self.subject
self.misp_event.distribution = self.config.default_distribution
self.misp_event.threat_level_id = self.config.default_threat_level
self.misp_event.analysis = self.config.default_analysis
2018-05-14 23:23:30 +02:00
def sighting(self, value, source):
if self.offline:
raise Exception('The script is running in offline mode, ')
'''Add a sighting'''
s = MISPSighting()
s.from_dict(value=value, source=source)
2019-07-18 15:12:15 +02:00
2018-05-14 23:23:30 +02:00
def _find_inline_forward(self):
'''Does the body contains a forwarded email?'''
for identifier in self.config.forward_identifiers:
if identifier in self.clean_email_body:
self.clean_email_body, fw_email = self.clean_email_body.split(identifier)
return self.forwarded_email(pseudofile=BytesIO(fw_email.encode()))
def _find_attached_forward(self):
forwarded_emails = []
for attachment in self.original_mail.iter_attachments():
attachment_content = attachment.get_content()
except KeyError:
2020-06-16 12:03:59 +02:00
# Attachment type has no handler
2018-05-14 23:23:30 +02:00
# Search for email forwarded as attachment
# I could have more than one, attaching everything.
if isinstance(attachment_content, message.EmailMessage):
2018-05-14 23:23:30 +02:00
if isinstance(attachment_content, str):
attachment_content = attachment_content.encode()
2018-05-14 23:23:30 +02:00
filename = attachment.get_filename()
if not filename:
filename = 'missing_filename'
if self.config_from_email_body.get('attachment') == self.config.m2m_benign_attachment_keyword:
# Attach sane file
self.misp_event.add_attribute('attachment', value=filename, data=BytesIO(attachment_content))
2018-05-14 23:23:30 +02:00
f_object, main_object, sections = make_binary_objects(pseudofile=BytesIO(attachment_content), filename=filename, standalone=False)
2018-05-14 23:23:30 +02:00
if main_object:
[self.misp_event.add_object(section) for section in sections]
return forwarded_emails
def _find_o365_attached_forward(self, msg: Message):
forwarded_emails = []
if msg.has_attachments:
if msg.attachments.download_attachments():
for attachment in msg.attachments:
if '.eml' in
decoded_attachment = base64.b64decode(attachment.content)
pseudofile = BytesIO(decoded_attachment)
eml = BytesParser(policy=policy.default).parse(pseudofile)
if isinstance(eml, message.EmailMessage):
return forwarded_emails
2018-05-14 23:23:30 +02:00
def email_from_spamtrap(self):
'''The email comes from a spamtrap and should be attached as-is.'''
raw_body = self.original_mail.get_body(preferencelist=('html', 'plain'))
if raw_body:
self.clean_email_body = html.unescape(raw_body.get_payload(decode=True).decode('utf8', 'surrogateescape'))
self.clean_email_body = ''
return self.forwarded_email(self.pseudofile)
def forwarded_email(self, pseudofile: BytesIO):
'''Extracts all possible indicators out of an email and create a MISP event out of it.
* Gets all relevant Headers
* Attach the body
* Create MISP file objects (uses lief if possible)
* Set all references
email_object = EMailObject(pseudofile=pseudofile, attach_original_mail=True, standalone=False)
if email_object.attachments:
# Create file objects for the attachments
for attachment_name, attachment in email_object.attachments:
2020-06-16 12:03:59 +02:00
if not (self.ignore_nullsize_attachments and attachment.getbuffer().nbytes == 0):
if not attachment_name:
attachment_name = 'NameMissing.txt'
if self.config_from_email_body.get('attachment') == self.config.m2m_benign_attachment_keyword:
a = self.misp_event.add_attribute('attachment', value=attachment_name, data=attachment)
email_object.add_reference(a.uuid, 'related-to', 'Email attachment')
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
if self.config.vt_key:
vt_object = VTReportObject(self.config.vt_key, f_object.get_attributes_by_relation('sha256')[0].value, standalone=False)
f_object.add_reference(vt_object.uuid, 'analysed-with')
except InvalidMISPObject as e:
if main_object:
for section in sections:
email_object.add_reference(f_object.uuid, 'related-to', 'Email attachment')
2018-05-14 23:23:30 +02:00
if self.config.spamtrap or self.config.attach_original_mail or self.config_from_email_body.get('attach_original_mail'):
return email_object
def process_email_body(self):
mail_as_bytes = self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True)
if mail_as_bytes:
self.clean_email_body = html.unescape(mail_as_bytes.decode('utf8', 'surrogateescape'))
# Check if there are config lines in the body & convert them to a python dictionary:
# <config.body_config_prefix>:<key>:<value> => {<key>: <value>}
self.config_from_email_body = {k.strip(): v.strip() for k, v in re.findall(f'{self.config.body_config_prefix}:(.*):(.*)', self.clean_email_body)}
if self.config_from_email_body:
# ... remove the config lines from the body
self.clean_email_body = re.sub(rf'^{self.config.body_config_prefix}.*\n?', '',
html.unescape(self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape')), flags=re.MULTILINE)
# Check if autopublish key is present and valid
if self.config_from_email_body.get('m2mkey') == self.config.m2m_key:
if self.config_from_email_body.get('distribution') is not None:
2018-05-14 23:23:30 +02:00
self.misp_event.distribution = self.config_from_email_body.get('distribution')
if self.config_from_email_body.get('threat_level') is not None:
2018-05-14 23:23:30 +02:00
self.misp_event.threat_level_id = self.config_from_email_body.get('threat_level')
if self.config_from_email_body.get('analysis') is not None:
2018-05-14 23:23:30 +02:00
self.misp_event.analysis = self.config_from_email_body.get('analysis')
if self.config_from_email_body.get('publish'):
self.clean_email_body = ''
def process_o365_email_body(self):
if self.msg:
self.clean_email_body = html.unescape(self.msg.body)
if"<div>You don't often get email from .*?</div>", self.clean_email_body):
self.clean_email_body = re.sub(r"<div>You don't often get email from .*?</div>", "", html.unescape(self.msg.body))
# Check if there are config lines in the body & convert them to a python dictionary:
# <config.body_config_prefix>:<key>:<value> => {<key>: <value>}
self.config_from_email_body = {k.strip(): v.strip() for k, v in re.findall(f'{self.config.body_config_prefix}:(.*):(.*)', self.clean_email_body)}
if self.config_from_email_body:
# ... remove the config lines from the body
self.clean_email_body = re.sub(rf'^{self.config.body_config_prefix}.*\n?', '', html.unescape(self.msg.body), flags=re.MULTILINE)
# Check if autopublish key is present and valid
if self.config_from_email_body.get('m2mkey') == self.config.m2m_key:
if self.config_from_email_body.get('distribution') is not None:
self.misp_event.distribution = self.config_from_email_body.get('distribution')
if self.config_from_email_body.get('threat_level') is not None:
self.misp_event.threat_level_id = self.config_from_email_body.get('threat_level')
if self.config_from_email_body.get('analysis') is not None:
self.misp_event.analysis = self.config_from_email_body.get('analysis')
if self.config_from_email_body.get('publish'):
self.clean_email_body = ''
2018-05-14 23:23:30 +02:00
def process_body_iocs(self, email_object=None):
if email_object:
body = html.unescape('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape'))
body = self.clean_email_body
# Cleanup body content
# Depending on the source of the mail, there is some cleanup to do. Ignore lines in body of message
for ignoreline in self.config.ignorelist:
body = re.sub(rf'^{ignoreline}.*\n?', '', body, flags=re.MULTILINE)
# Remove everything after the stopword from the body
body = body.split(self.config.stopword, 1)[0]
# Add tags to the event if keywords are found in the mail
for tag in self.config.tlptags:
for alternativetag in self.config.tlptags[tag]:
if alternativetag in body.lower():
2018-05-14 23:23:30 +02:00
# Prepare extraction of IOCs
# Refang email data
body = refang(body)
# Extract and add hashes
contains_hash = False
for h in set(re.findall(hashmarker.MD5_REGEX, body)):
contains_hash = True
attribute = self.misp_event.add_attribute('md5', h, enforceWarninglist=self.config.enforcewarninglist)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
if self.config.sighting:
self.sightings_to_add.append((h, self.config.sighting_source))
2018-05-14 23:23:30 +02:00
for h in set(re.findall(hashmarker.SHA1_REGEX, body)):
contains_hash = True
attribute = self.misp_event.add_attribute('sha1', h, enforceWarninglist=self.config.enforcewarninglist)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
if self.config.sighting:
self.sightings_to_add.append((h, self.config.sighting_source))
2018-05-14 23:23:30 +02:00
for h in set(re.findall(hashmarker.SHA256_REGEX, body)):
contains_hash = True
attribute = self.misp_event.add_attribute('sha256', h, enforceWarninglist=self.config.enforcewarninglist)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
if self.config.sighting:
self.sightings_to_add.append((h, self.config.sighting_source))
2018-05-14 23:23:30 +02:00
if contains_hash:
[self.misp_event.add_tag(tag) for tag in self.config.hash_only_tags]
# # Extract network IOCs
urllist = []
urllist += re.findall(urlmarker.WEB_URL_REGEX, body)
urllist += re.findall(urlmarker.IP_REGEX, body)
if self.debug:
hostname_processed = []
# Add IOCs and expanded information to MISP
for entry in set(urllist):
ids_flag = True
2019-01-21 14:39:04 +01:00
domainname = self.f.get_domain()
2018-05-14 23:23:30 +02:00
if domainname in self.config.excludelist:
# Ignore the entry
2019-01-21 14:39:04 +01:00
hostname = self.f.get_host()
2018-05-14 23:23:30 +02:00
scheme = self.f.get_scheme()
if scheme:
2019-01-21 14:39:04 +01:00
scheme = scheme
2018-05-14 23:23:30 +02:00
resource_path = self.f.get_resource_path()
if resource_path:
2019-01-21 14:39:04 +01:00
resource_path = resource_path
2018-05-14 23:23:30 +02:00
if self.debug:
if domainname in self.config.internallist and self.urlsonly is False: # Add link to internal reference unless in urlsonly mode
2018-05-14 23:23:30 +02:00
attribute = self.misp_event.add_attribute('link', entry, category='Internal reference',
to_ids=False, enforceWarninglist=False)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
elif domainname in self.config.externallist and self.urlsonly is False: # External analysis
2018-05-14 23:23:30 +02:00
attribute = self.misp_event.add_attribute('link', entry, category='External analysis',
to_ids=False, enforceWarninglist=False)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
2019-07-19 12:20:56 +02:00
elif domainname in self.config.externallist or self.urlsonly: # External analysis
if self.urlsonly:
2020-06-16 12:03:59 +02:00
comment = self.subject + f" (from: {self.sender})"
comment = ""
2019-07-19 12:20:56 +02:00
attribute = self.misp.add_attribute(self.urlsonly, {"type": 'link', "value": entry, "category": 'External analysis',
2020-06-16 12:03:59 +02:00
"to_ids": False, "comment": comment})
for tag in self.config.tlptags:
for alternativetag in self.config.tlptags[tag]:
if alternativetag in self.subject.lower():
self.misp.tag(attribute["uuid"], tag)
2019-07-23 14:45:50 +02:00
new_subject = comment.replace(alternativetag, '')
self.misp.change_comment(attribute["uuid"], new_subject)
2018-05-14 23:23:30 +02:00
else: # The URL is probably an indicator.
comment = ""
if (domainname in self.config.noidsflaglist) or (hostname in self.config.noidsflaglist):
ids_flag = False
comment = "Known host (mostly for connectivity test or IP lookup)"
if self.debug:
if scheme:
if is_ip(hostname):
attribute = self.misp_event.add_attribute('url', entry, to_ids=False,
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
if resource_path: # URL has path, ignore warning list
attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag,
enforceWarninglist=False, comment=comment)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
else: # URL has no path
attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag,
enforceWarninglist=self.config.enforcewarninglist, comment=comment)
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
if self.config.sighting:
self.sightings_to_add.append((entry, self.config.sighting_source))
2018-05-14 23:23:30 +02:00
if hostname in hostname_processed:
# Hostname already processed.
if self.config.sighting:
self.sightings_to_add.append((hostname, self.config.sighting_source))
2018-05-14 23:23:30 +02:00
if self.debug:
comment = ''
port = self.f.get_port()
if port:
2019-01-21 14:39:04 +01:00
port = port
2018-05-14 23:23:30 +02:00
comment = f'on port: {port}'
if is_ip(hostname):
attribute = self.misp_event.add_attribute('ip-dst', hostname, to_ids=ids_flag,
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
related_ips = []
if HAS_DNS and self.config.enable_dns:
for rdata in dns.resolver.query(hostname, 'A'):
if self.debug:
except Exception as e:
2018-05-14 23:23:30 +02:00
if self.debug:
2018-05-14 23:23:30 +02:00
if related_ips:
hip = MISPObject(name='ip-port')
hip.add_attribute('hostname', value=hostname, to_ids=ids_flag,
enforceWarninglist=self.config.enforcewarninglist, comment=comment)
for ip in set(related_ips):
hip.add_attribute('ip', type='ip-dst', value=ip, to_ids=False,
if email_object:
email_object.add_reference(hip.uuid, 'contains')
2019-07-19 12:20:56 +02:00
if self.urlsonly is False:
attribute = self.misp_event.add_attribute('hostname', value=hostname,
2020-06-16 12:03:59 +02:00
to_ids=ids_flag, enforceWarninglist=self.config.enforcewarninglist,
2018-05-14 23:23:30 +02:00
if email_object:
email_object.add_reference(attribute.uuid, 'contains')
def add_event(self):
'''Add event on the remote MISP instance.'''
# Add additional tags depending on others
tags = []
for tag in [ for t in self.misp_event.tags]:
if self.config.dependingtags.get(tag):
tags += self.config.dependingtags.get(tag)
# Add additional tags according to configuration
for malware in self.config.malwaretags:
if malware.lower() in self.subject.lower():
tags += self.config.malwaretags.get(malware)
if tags:
[self.misp_event.add_tag(tag) for tag in tags]
has_tlp_tag = False
for tag in [ for t in self.misp_event.tags]:
if tag.lower().startswith('tlp'):
has_tlp_tag = True
if not has_tlp_tag:
if self.offline:
return self.misp_event.to_json()
event = self.misp.add_event(self.misp_event, pythonify=True)
if self.config.sighting:
for value, source in self.sightings_to_add:
self.sighting(value, source)
if self.config.freetext:
if self.config.o365_freetext:
self.misp.freetext(event, string=self.clean_email_body, adhereToWarninglists=self.config.enforcewarninglist)
self.misp.freetext(event, string=self.original_mail.get_body(preferencelist=('html', 'plain')), adhereToWarninglists=self.config.enforcewarninglist)
return event
2020-06-16 12:03:59 +02:00
def get_attached_emails(self, pseudofile):
if self.debug:
syslog.syslog("get_attached_emails Job started.")
forwarded_emails = []
self.pseudofile = pseudofile
self.original_mail = message_from_bytes(self.pseudofile.getvalue(), policy=policy.default)
for attachment in self.original_mail.iter_attachments():
attachment_content = attachment.get_content()
filename = attachment.get_filename()
if self.debug:
syslog.syslog(f'get_attached_emails: filename = {filename}')
# Search for email forwarded as attachment
# I could have more than one, attaching everything.
if isinstance(attachment, message.EmailMessage) and os.path.splitext(filename)[1] == '.eml':
# all attachments are identified as message.EmailMessage so filtering on extension for now.
return forwarded_emails
class O365MISPClient:
A client (MUA) to allow mail_to_misp to interact with Microsoft Graph and Office 365 API to get email messages.
def __init__(
client_id: str,
client_secret: str,
tenant_id: str,
resource: str,
scopes: List[str],
token_backend: Optional[
Union[AWSS3Backend, AWSSecretsBackend, EnvTokenBackend, FileSystemTokenBackend, FirestoreBackend]
] = None,
Init O365MISPClient
:param client_id: OAuth Client ID
:param client_secret: OAuth Client Secret
:param tenant_id: Your Tenant ID
:param resource: The email address you want to access
:param scopes: The permission scopes for the resource
:param token_backend: The backend used for storing OAuth token
self.scopes = scopes
self.resource = resource
self.o365_acct = Account(
credentials=(client_id, client_secret),
if not self.o365_acct.is_authenticated:
self.mailbox = self.o365_acct.mailbox(resource=self.resource)
self.inbox = self.mailbox.inbox_folder()
self.query_properties = [
def get_email_messages(self, from_time: datetime, to_time: datetime, folder: Optional[str] = None) -> Iterator[Message]:
Get messages for a certain timeframe. Defaults to looking for messages in the Inbox folder, however by
supplying a folder name as a parameter you can change where to get the messages from.
:param from_time: start time to search for
:param to_time: end time to search for
:param folder: specific folder to get messages from (don't supply if getting from the inbox folder)
:return: an iterator of O365.messages.Message from the resource
query = self.mailbox.new_query().select(*self.query_properties)
query = query.chain('and').on_attribute('received_date_time').greater(from_time)
query = query.chain('and').on_attribute('received_date_time').less(to_time)
if folder:
messages = self.mailbox.get_folder(folder_name=folder).get_messages(query=query)
messages = self.inbox.get_messages(query=query)
return messages