PyMISP/pymisp/tools/emailobject.py

122 lines
5.4 KiB
Python
Raw Normal View History

2018-03-18 23:21:29 +01:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
from io import BytesIO
import logging
from email import message_from_bytes, policy
2020-09-25 10:49:48 +02:00
import email.utils
2020-01-23 10:27:40 +01:00
from pathlib import Path
from typing import Union
2018-03-18 23:21:29 +01:00
logger = logging.getLogger('pymisp')
class EMailObject(AbstractMISPObjectGenerator):
2020-07-28 20:05:42 +02:00
def __init__(self, filepath: Union[Path, str] = None, pseudofile: BytesIO = None, attach_original_email: bool = True, **kwargs):
# PY3 way:
# super().__init__('file')
super(EMailObject, self).__init__('email', **kwargs)
2018-03-18 23:21:29 +01:00
if filepath:
with open(filepath, 'rb') as f:
self.__pseudofile = BytesIO(f.read())
2018-03-18 23:21:29 +01:00
elif pseudofile and isinstance(pseudofile, BytesIO):
self.__pseudofile = pseudofile
2018-03-18 23:21:29 +01:00
else:
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')
self.__email = message_from_bytes(self.__pseudofile.getvalue(), policy=policy.default)
# Improperly encoded emails (utf-8-sig) fail silently. An empty email indicates this might be the case.
if len(self.__email) == 0:
self.attempt_decoding()
if attach_original_email:
self.add_attribute('eml', value='Full email.eml', data=self.__pseudofile)
2018-03-18 23:21:29 +01:00
self.generate_attributes()
@property
def email(self):
return self.__email
@property
def attachments(self):
to_return = []
for attachment in self.__email.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
to_return.append((attachment.get_filename(), BytesIO(content)))
return to_return
def attempt_decoding(self):
"""Attempt to decode non-ascii encoded emails.
"""
_msg_bytes = self.__pseudofile.getvalue()
try:
_msg_bytes.decode("ASCII")
logger.info("EmailObject failed to decode ASCII encoded email.")
return
except UnicodeDecodeError:
logger.debug("EmailObject was passed a non-ASCII encoded binary blob.")
try:
2020-09-09 15:41:42 +02:00
if _msg_bytes[:3] == b'\xef\xbb\xbf': # utf-8-sig byte-order mark (BOM)
# Set Pseudofile to correctly encoded email in case it is used at some later point.
self.__pseudofile = BytesIO(_msg_bytes.decode('utf_8_sig').encode("ASCII"))
self.__email = message_from_bytes(self.__pseudofile.getvalue(), policy=policy.default)
except UnicodeDecodeError:
logger.debug("EmailObject does not know how to decode binary blob passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.")
2018-03-18 23:21:29 +01:00
def generate_attributes(self):
if self.__email.get_body(preferencelist=('html', 'plain')):
2018-05-11 16:20:07 +02:00
self.add_attribute('email-body', value=self.__email.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape'))
2018-03-18 23:21:29 +01:00
if 'Reply-To' in self.__email:
self.add_attribute('reply-to', value=self.__email['Reply-To'])
if 'Message-ID' in self.__email:
self.add_attribute('message-id', value=self.__email['Message-ID'])
if 'To' in self.__email:
2020-09-25 10:49:48 +02:00
self._add_emails('to', self.__email['To'])
2018-03-18 23:21:29 +01:00
if 'Cc' in self.__email:
2019-12-04 15:18:27 +01:00
# TODO: split name and email address
to_add = [to.strip() for to in self.__email['Cc'].split(',')]
self.add_attributes('cc', *to_add)
2018-03-18 23:21:29 +01:00
if 'Subject' in self.__email:
self.add_attribute('subject', value=self.__email['Subject'])
if 'From' in self.__email:
2020-09-25 10:49:48 +02:00
self._add_emails('from', self.__email['From'])
2018-03-18 23:21:29 +01:00
if 'Return-Path' in self.__email:
2019-12-04 15:18:27 +01:00
# TODO: split name and email address
2018-03-18 23:21:29 +01:00
self.add_attribute('return-path', value=self.__email['Return-Path'])
if 'User-Agent' in self.__email:
self.add_attribute('user-agent', value=self.__email['User-Agent'])
2019-12-04 15:18:27 +01:00
if self.__email.get_boundary():
self.add_attribute('mime-boundary', value=self.__email.get_boundary())
if 'X-Mailer' in self.__email:
self.add_attribute('x-mailer', value=self.__email['X-Mailer'])
if 'Thread-Index' in self.__email:
self.add_attribute('thread-index', value=self.__email['Thread-Index'])
2020-09-25 10:25:38 +02:00
if 'Date' in self.__email:
2020-09-25 10:49:48 +02:00
self.add_attribute('send-date', value=self._sanitize_timestamp(self.__email['Date']))
2019-12-04 15:18:27 +01:00
# TODO: email-header: all headers in one bloc
# TODO: BCC?
# TODO: received headers sometimes have TO email addresses
2020-09-25 10:49:48 +02:00
def _add_emails(self, type: str, data: str):
parts = [part.strip() for part in data.split(',')]
addresses = []
display_names = []
for part in parts:
realname, address = email.utils.parseaddr(part)
if address: # parsing failed, insert original value
addresses.push({"value": part})
else:
addresses.push({"value": address, "comment": part})
if realname:
display_names.push({"value": realname, "comment": part})
if addresses:
self.add_attributes(type, *addresses)
if display_names:
self.add_attributes("{}-display-name".format(type), *display_names)