From c967c5e197063f82f055d117b8f689b157f2e447 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 14 May 2018 17:23:30 -0400 Subject: [PATCH] new: Make it a lib, add test cases --- .gitmodules | 3 + .travis.yml | 35 +++ __init__.py | 0 mail2misp/__init__.py | 3 + hashmarker.py => mail2misp/hashmarker.py | 0 mail2misp/mail2misp.py | 350 +++++++++++++++++++++++ urlmarker.py => mail2misp/urlmarker.py | 0 mail_to_misp.py | 345 +--------------------- mail_to_misp_config.py-example | 2 +- tests/__init__.py | 0 tests/config_forward.py | 77 +++++ tests/config_spamtrap.py | 77 +++++ tests/mails | 1 + tests/tests.py | 57 ++++ 14 files changed, 605 insertions(+), 345 deletions(-) create mode 100644 .gitmodules create mode 100644 .travis.yml create mode 100644 __init__.py create mode 100644 mail2misp/__init__.py rename hashmarker.py => mail2misp/hashmarker.py (100%) mode change 100755 => 100644 create mode 100644 mail2misp/mail2misp.py rename urlmarker.py => mail2misp/urlmarker.py (100%) mode change 100755 => 100644 create mode 100644 tests/__init__.py create mode 100644 tests/config_forward.py create mode 100644 tests/config_spamtrap.py create mode 160000 tests/mails create mode 100644 tests/tests.py diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..1044271 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "tests/mails"] + path = tests/mails + url = https://github.com/MISP/mail_to_misp_test.git diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..b3a5921 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,35 @@ +language: python + +cache: pip + +sudo: required + +addons: + apt: + sources: [ 'ubuntu-toolchain-r-test' ] + packages: + - libstdc++6 + - libfuzzy-dev + +python: + - "3.6" + - "3.6-dev" + +install: + - git clone git://github.com/stricaud/faup.git + - pushd faup/build + - cmake .. && make + - sudo make install + - popd + - sudo ldconfig + - pip install -U nose pip coverage + - pip install -U -r requirements.txt + - git submodule init + - git submodule update + +script: + - nosetests --with-coverage --cover-package=mail2misp tests/tests.py + +after_success: + - codecov + - coveralls diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mail2misp/__init__.py b/mail2misp/__init__.py new file mode 100644 index 0000000..cb5584d --- /dev/null +++ b/mail2misp/__init__.py @@ -0,0 +1,3 @@ +from . import urlmarker +from . import hashmarker +from .mail2misp import Mail2MISP diff --git a/hashmarker.py b/mail2misp/hashmarker.py old mode 100755 new mode 100644 similarity index 100% rename from hashmarker.py rename to mail2misp/hashmarker.py diff --git a/mail2misp/mail2misp.py b/mail2misp/mail2misp.py new file mode 100644 index 0000000..d2218d8 --- /dev/null +++ b/mail2misp/mail2misp.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import re +import syslog +import html +from io import BytesIO +from ipaddress import ip_address +from email import message_from_bytes, policy + +from . import urlmarker +from . import hashmarker +from pyfaup.faup import Faup +from pymisp import PyMISP, MISPEvent, MISPObject, MISPSighting +from pymisp.tools import EMailObject, make_binary_objects +from defang import refang +import dns.resolver + + +def is_ip(address): + try: + ip_address(address) + except ValueError: + return False + return True + + +class Mail2MISP(): + + def __init__(self, misp_url, misp_key, verifycert, config, offline=False): + self.offline = offline + if not self.offline: + self.misp = PyMISP(misp_url, misp_key, verifycert, debug=config.debug) + self.config = config + self.debug = self.config.debug + self.config_from_email_body = {} + # Init Faup + self.f = Faup() + + def load_email(self, pseudofile): + self.pseudofile = pseudofile + self.original_mail = message_from_bytes(self.pseudofile.getvalue(), policy=policy.default) + self.subject = self.original_mail.get('Subject') + # Remove words from subject + for removeword in self.config.removelist: + self.subject = re.sub(removeword, "", self.subject).strip() + + # Initialize the MISP event + self.misp_event = MISPEvent() + self.misp_event.info = f'{self.config.email_subject_prefix} - {self.subject}' + self.misp_event.distribution = self.config.default_distribution + self.misp_event.threat_level_id = self.config.default_threat_level + self.misp_event.analysis = self.config.default_analysis + + def sighting(self, value, source): + if self.offline: + raise Exception('The script is running in offline mode, ') + '''Add a sighting''' + s = MISPSighting() + s.from_dict(value=value, source=source) + self.misp.set_sightings(s) + + def _find_inline_forward(self): + '''Does the body contains a forwarded email?''' + for identifier in self.config.forward_identifiers: + if identifier in self.clean_email_body: + self.clean_email_body, fw_email = self.clean_email_body.split(identifier) + return self.forwarded_email(pseudofile=BytesIO(fw_email.encode())) + + def _find_attached_forward(self): + forwarded_emails = [] + for attachment in self.original_mail.iter_attachments(): + # Search for email forwarded as attachment + # I could have more than one, attaching everything. + if attachment.get_filename() and attachment.get_filename().endswith('.eml'): + forwarded_emails.append(self.forwarded_email(pseudofile=BytesIO(attachment.get_content().as_bytes()))) + else: + filename = attachment.get_filename() + if not filename: + filename = 'missing_filename' + if self.config_from_email_body.get('attachment') == self.config.m2m_benign_attachment_keyword: + # Attach sane file + self.misp_event.add_attribute('attachment', value=filename, data=BytesIO(attachment.get_content())) + else: + f_object, main_object, sections = make_binary_objects(pseudofile=BytesIO(attachment.get_content()), filename=filename, standalone=False) + self.misp_event.add_object(f_object) + if main_object: + self.misp_event.add_object(main_object) + [self.misp_event.add_object(section) for section in sections] + return forwarded_emails + + def email_from_spamtrap(self): + '''The email comes from a spamtrap and should be attached as-is.''' + raw_body = self.original_mail.get_body(preferencelist=('html', 'plain')) + if raw_body: + self.clean_email_body = html.unescape(raw_body.get_payload(decode=True).decode('utf8', 'surrogateescape')) + else: + self.clean_email_body = '' + return self.forwarded_email(self.pseudofile) + + def forwarded_email(self, pseudofile: BytesIO): + '''Extracts all possible indicators out of an email and create a MISP event out of it. + * Gets all relevant Headers + * Attach the body + * Create MISP file objects (uses lief if possible) + * Set all references + ''' + email_object = EMailObject(pseudofile=pseudofile, attach_original_mail=True, standalone=False) + if email_object.attachments: + # Create file objects for the attachments + for attachment_name, attachment in email_object.attachments: + if not attachment_name: + attachment_name = 'NameMissing.txt' + f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) + self.misp_event.add_object(f_object) + if main_object: + self.misp_event.add_object(main_object) + for section in sections: + self.misp_event.add_object(section) + email_object.add_reference(f_object.uuid, 'related-to', 'Email attachment') + self.process_body_iocs(email_object) + if self.config.spamtrap or self.config.attach_original_mail or self.config_from_email_body.get('attach_original_mail'): + self.misp_event.add_object(email_object) + return email_object + + def process_email_body(self): + mail_as_bytes = self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True) + if mail_as_bytes: + self.clean_email_body = html.unescape(mail_as_bytes.decode('utf8', 'surrogateescape')) + # Check if there are config lines in the body & convert them to a python dictionary: + # :: => {: } + self.config_from_email_body = {k.strip(): v.strip() for k, v in re.findall(f'{self.config.body_config_prefix}:(.*):(.*)', self.clean_email_body)} + if self.config_from_email_body: + # ... remove the config lines from the body + self.clean_email_body = re.sub(rf'^{self.config.body_config_prefix}.*\n?', '', + html.unescape(self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape')), flags=re.MULTILINE) + # Check if autopublish key is present and valid + if self.config_from_email_body.get('m2mkey') == self.config.m2m_key: + if self.config_from_email_body.get('distribution'): + self.misp_event.distribution = self.config_from_email_body.get('distribution') + if self.config_from_email_body.get('threat_level'): + self.misp_event.threat_level_id = self.config_from_email_body.get('threat_level') + if self.config_from_email_body.get('analysis'): + self.misp_event.analysis = self.config_from_email_body.get('analysis') + if self.config_from_email_body.get('publish'): + self.misp_event.publish() + + self._find_inline_forward() + else: + self.clean_email_body = '' + self._find_attached_forward() + + def process_body_iocs(self, email_object=None): + if email_object: + body = html.unescape(email_object.email.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape')) + else: + body = self.clean_email_body + + # Cleanup body content + # Depending on the source of the mail, there is some cleanup to do. Ignore lines in body of message + for ignoreline in self.config.ignorelist: + body = re.sub(rf'^{ignoreline}.*\n?', '', body, flags=re.MULTILINE) + + # Remove everything after the stopword from the body + body = body.split(self.config.stopword, 1)[0] + + # Add tags to the event if keywords are found in the mail + for tag in self.config.tlptags: + if any(alternativetag in body.lower() for alternativetag in self.config.tlptags[tag]): + self.misp_event.add_tag(tag) + + # Prepare extraction of IOCs + # Refang email data + body = refang(body) + + # Extract and add hashes + contains_hash = False + for h in set(re.findall(hashmarker.MD5_REGEX, body)): + contains_hash = True + attribute = self.misp_event.add_attribute('md5', h, enforceWarninglist=self.config.enforcewarninglist) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + if self.config.sighting: + self.sighting(h, self.config.sighting_source) + for h in set(re.findall(hashmarker.SHA1_REGEX, body)): + contains_hash = True + attribute = self.misp_event.add_attribute('sha1', h, enforceWarninglist=self.config.enforcewarninglist) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + if self.config.sighting: + self.sighting(h, self.config.sighting_source) + for h in set(re.findall(hashmarker.SHA256_REGEX, body)): + contains_hash = True + attribute = self.misp_event.add_attribute('sha256', h, enforceWarninglist=self.config.enforcewarninglist) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + if self.config.sighting: + self.sighting(h, self.config.sighting_source) + + if contains_hash: + [self.misp_event.add_tag(tag) for tag in self.config.hash_only_tags] + + # # Extract network IOCs + urllist = [] + urllist += re.findall(urlmarker.WEB_URL_REGEX, body) + urllist += re.findall(urlmarker.IP_REGEX, body) + if self.debug: + syslog.syslog(str(urllist)) + + hostname_processed = [] + + # Add IOCs and expanded information to MISP + for entry in set(urllist): + ids_flag = True + self.f.decode(entry) + + domainname = self.f.get_domain().decode() + if domainname in self.config.excludelist: + # Ignore the entry + continue + + hostname = self.f.get_host().decode() + + scheme = self.f.get_scheme() + if scheme: + scheme = scheme.decode() + + resource_path = self.f.get_resource_path() + if resource_path: + resource_path = resource_path.decode() + + if self.debug: + syslog.syslog(domainname) + + if domainname in self.config.internallist: # Add link to internal reference + attribute = self.misp_event.add_attribute('link', entry, category='Internal reference', + to_ids=False, enforceWarninglist=False) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + elif domainname in self.config.externallist: # External analysis + attribute = self.misp_event.add_attribute('link', entry, category='External analysis', + to_ids=False, enforceWarninglist=False) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + else: # The URL is probably an indicator. + comment = "" + if (domainname in self.config.noidsflaglist) or (hostname in self.config.noidsflaglist): + ids_flag = False + comment = "Known host (mostly for connectivity test or IP lookup)" + if self.debug: + syslog.syslog(str(entry)) + + if scheme: + if is_ip(hostname): + attribute = self.misp_event.add_attribute('url', entry, to_ids=False, + enforceWarninglist=self.config.enforcewarninglist) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + else: + if resource_path: # URL has path, ignore warning list + attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag, + enforceWarninglist=False, comment=comment) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + else: # URL has no path + attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag, + enforceWarninglist=self.config.enforcewarninglist, comment=comment) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + if self.config.sighting: + self.sighting(entry, self.config.sighting_source) + + if hostname in hostname_processed: + # Hostname already processed. + continue + + hostname_processed.append(hostname) + if self.config.sighting: + self.sighting(hostname, self.config.sighting_source) + + if self.debug: + syslog.syslog(hostname) + + comment = '' + port = self.f.get_port() + if port: + port = port.decode() + comment = f'on port: {port}' + + if is_ip(hostname): + attribute = self.misp_event.add_attribute('ip-dst', hostname, to_ids=ids_flag, + enforceWarninglist=self.config.enforcewarninglist, + comment=comment) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + else: + related_ips = [] + try: + syslog.syslog(hostname) + for rdata in dns.resolver.query(hostname, 'A'): + if self.debug: + syslog.syslog(str(rdata)) + related_ips.append(rdata.to_text()) + except Exception as e: + if self.debug: + syslog.syslog(str(e)) + + if related_ips: + hip = MISPObject(name='ip-port') + hip.add_attribute('hostname', value=hostname, to_ids=ids_flag, + enforceWarninglist=self.config.enforcewarninglist, comment=comment) + for ip in set(related_ips): + hip.add_attribute('ip', type='ip-dst', value=ip, to_ids=False, + enforceWarninglist=self.config.enforcewarninglist) + self.misp_event.add_object(hip) + if email_object: + email_object.add_reference(hip.uuid, 'contains') + else: + attribute = self.misp_event.add_attribute('hostname', value=hostname, + to_ids=ids_flag, enforceWarninglist=self.config.enforcewarninglist, + comment=comment) + if email_object: + email_object.add_reference(attribute.uuid, 'contains') + + def add_event(self): + '''Add event on the remote MISP instance.''' + + # Add additional tags depending on others + tags = [] + for tag in [t.name for t in self.misp_event.tags]: + if self.config.dependingtags.get(tag): + tags += self.config.dependingtags.get(tag) + + # Add additional tags according to configuration + for malware in self.config.malwaretags: + if malware.lower() in self.subject.lower(): + tags += self.config.malwaretags.get(malware) + if tags: + [self.misp_event.add_tag(tag) for tag in tags] + + has_tlp_tag = False + for tag in [t.name for t in self.misp_event.tags]: + if tag.lower().startswith('tlp'): + has_tlp_tag = True + if not has_tlp_tag: + self.misp_event.add_tag(self.config.tlptag_default) + + if self.offline: + return self.misp_event.to_json() + return self.misp.add_event(self.misp_event) diff --git a/urlmarker.py b/mail2misp/urlmarker.py old mode 100755 new mode 100644 similarity index 100% rename from urlmarker.py rename to mail2misp/urlmarker.py diff --git a/mail_to_misp.py b/mail_to_misp.py index 04cce6e..8b70161 100755 --- a/mail_to_misp.py +++ b/mail_to_misp.py @@ -4,355 +4,12 @@ import os import sys import argparse -import re import syslog from pathlib import Path -import html from io import BytesIO -from ipaddress import ip_address -from email import message_from_bytes, policy import importlib -try: - import urlmarker - import hashmarker - from pyfaup.faup import Faup - from pymisp import PyMISP, MISPEvent, MISPObject, MISPSighting - from pymisp.tools import EMailObject, make_binary_objects - from defang import refang - import dns.resolver -except ImportError as e: - print("(!) Problem loading module:") - print(e) - sys.exit(-1) - - -def is_ip(address): - try: - ip_address(address) - except ValueError: - return False - return True - - -class Mail2MISP(): - - def __init__(self, misp_url, misp_key, verifycert, config): - self.misp = PyMISP(misp_url, misp_key, verifycert, debug=config.debug) - self.config = config - self.debug = self.config.debug - self.config_from_email_body = {} - # Init Faup - self.f = Faup() - - def load_email(self, pseudofile): - self.pseudofile = pseudofile - self.original_mail = message_from_bytes(self.pseudofile.getvalue(), policy=policy.default) - self.subject = self.original_mail.get('Subject') - # Remove words from subject - for removeword in self.config.removelist: - self.subject = re.sub(removeword, "", self.subject).strip() - - # Initialize the MISP event - self.misp_event = MISPEvent() - self.misp_event.info = f'{config.email_subject_prefix} - {self.subject}' - self.misp_event.distribution = self.config.default_distribution - self.misp_event.threat_level_id = self.config.default_threat_level - self.misp_event.analysis = self.config.default_analysis - - def sighting(self, value, source): - '''Add a sighting''' - s = MISPSighting() - s.from_dict(value=value, source=source) - self.misp.set_sightings(s) - - def _find_inline_forward(self): - '''Does the body contains a forwarded email?''' - for identifier in self.config.forward_identifiers: - if identifier in self.clean_email_body: - self.clean_email_body, fw_email = self.clean_email_body.split(identifier) - return self.forwarded_email(pseudofile=BytesIO(fw_email.encode())) - - def _find_attached_forward(self): - forwarded_emails = [] - for attachment in self.original_mail.iter_attachments(): - # Search for email forwarded as attachment - # I could have more than one, attaching everything. - if attachment.get_filename() and attachment.get_filename().endswith('.eml'): - forwarded_emails.append(self.forwarded_email(pseudofile=BytesIO(attachment.get_content().as_bytes()))) - else: - filename = attachment.get_filename() - if not filename: - filename = 'missing_filename' - if self.config_from_email_body.get('attachment') == self.config.m2m_benign_attachment_keyword: - # Attach sane file - self.misp_event.add_attribute('attachment', value=filename, data=BytesIO(attachment.get_content())) - else: - f_object, main_object, sections = make_binary_objects(pseudofile=BytesIO(attachment.get_content()), filename=filename, standalone=False) - self.misp_event.add_object(f_object) - if main_object: - self.misp_event.add_object(main_object) - [self.misp_event.add_object(section) for section in sections] - return forwarded_emails - - def email_from_spamtrap(self): - '''The email comes from a spamtrap and should be attached as-is.''' - raw_body = self.original_mail.get_body(preferencelist=('html', 'plain')) - if raw_body: - self.clean_email_body = html.unescape(raw_body.get_payload(decode=True).decode('utf8', 'surrogateescape')) - else: - self.clean_email_body = '' - return self.forwarded_email(self.pseudofile) - - def forwarded_email(self, pseudofile: BytesIO): - '''Extracts all possible indicators out of an email and create a MISP event out of it. - * Gets all relevant Headers - * Attach the body - * Create MISP file objects (uses lief if possible) - * Set all references - ''' - email_object = EMailObject(pseudofile=pseudofile, attach_original_mail=True, standalone=False) - if email_object.attachments: - # Create file objects for the attachments - for attachment_name, attachment in email_object.attachments: - if not attachment_name: - attachment_name = 'NameMissing.txt' - f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) - self.misp_event.add_object(f_object) - if main_object: - self.misp_event.add_object(main_object) - for section in sections: - self.misp_event.add_object(section) - email_object.add_reference(f_object.uuid, 'related-to', 'Email attachment') - self.process_body_iocs(email_object) - if self.config.spamtrap or self.config.attach_original_mail or self.config_from_email_body.get('attach_original_mail'): - self.misp_event.add_object(email_object) - return email_object - - def process_email_body(self): - mail_as_bytes = self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True) - if mail_as_bytes: - self.clean_email_body = html.unescape(mail_as_bytes.decode('utf8', 'surrogateescape')) - # Check if there are config lines in the body & convert them to a python dictionary: - # :: => {: } - self.config_from_email_body = {k.strip(): v.strip() for k, v in re.findall(f'{config.body_config_prefix}:(.*):(.*)', self.clean_email_body)} - if self.config_from_email_body: - # ... remove the config lines from the body - self.clean_email_body = re.sub(rf'^{config.body_config_prefix}.*\n?', '', - html.unescape(self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape')), flags=re.MULTILINE) - - # Check if autopublish key is present and valid - if self.config_from_email_body.get('m2mkey') == self.config.m2m_key: - if self.config_from_email_body.get('distribution'): - self.misp_event.distribution = self.config_from_email_body.get('distribution') - if self.config_from_email_body.get('threat_level'): - self.misp_event.threat_level_id = self.config_from_email_body.get('threat_level') - if self.config_from_email_body.get('analysis'): - self.misp_event.analysis = self.config_from_email_body.get('analysis') - if self.config_from_email_body.get('publish'): - self.misp_event.publish() - - self._find_inline_forward() - else: - self.clean_email_body = '' - self._find_attached_forward() - - def process_body_iocs(self, email_object=None): - if email_object: - body = html.unescape(email_object.email.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape')) - else: - body = self.clean_email_body - - # Cleanup body content - # Depending on the source of the mail, there is some cleanup to do. Ignore lines in body of message - for ignoreline in self.config.ignorelist: - body = re.sub(rf'^{ignoreline}.*\n?', '', body, flags=re.MULTILINE) - - # Remove everything after the stopword from the body - body = body.split(self.config.stopword, 1)[0] - - # Add tags to the event if keywords are found in the mail - for tag in self.config.tlptags: - if any(alternativetag in body.lower() for alternativetag in self.config.tlptags[tag]): - self.misp_event.add_tag(tag) - - # Prepare extraction of IOCs - # Refang email data - body = refang(body) - - # Extract and add hashes - contains_hash = False - for h in set(re.findall(hashmarker.MD5_REGEX, body)): - contains_hash = True - attribute = self.misp_event.add_attribute('md5', h, enforceWarninglist=config.enforcewarninglist) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - if self.config.sighting: - self.sighting(h, self.config.sighting_source) - for h in set(re.findall(hashmarker.SHA1_REGEX, body)): - contains_hash = True - attribute = self.misp_event.add_attribute('sha1', h, enforceWarninglist=config.enforcewarninglist) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - if self.config.sighting: - self.sighting(h, self.config.sighting_source) - for h in set(re.findall(hashmarker.SHA256_REGEX, body)): - contains_hash = True - attribute = self.misp_event.add_attribute('sha256', h, enforceWarninglist=config.enforcewarninglist) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - if self.config.sighting: - self.sighting(h, self.config.sighting_source) - - if contains_hash: - [self.misp_event.add_tag(tag) for tag in self.config.hash_only_tags] - - # # Extract network IOCs - urllist = [] - urllist += re.findall(urlmarker.WEB_URL_REGEX, body) - urllist += re.findall(urlmarker.IP_REGEX, body) - if self.debug: - syslog.syslog(str(urllist)) - - hostname_processed = [] - - # Add IOCs and expanded information to MISP - for entry in set(urllist): - ids_flag = True - self.f.decode(entry) - - domainname = self.f.get_domain().decode() - if domainname in self.config.excludelist: - # Ignore the entry - continue - - hostname = self.f.get_host().decode() - - scheme = self.f.get_scheme() - if scheme: - scheme = scheme.decode() - - resource_path = self.f.get_resource_path() - if resource_path: - resource_path = resource_path.decode() - - if debug: - syslog.syslog(domainname) - - if domainname in self.config.internallist: # Add link to internal reference - attribute = self.misp_event.add_attribute('link', entry, category='Internal reference', - to_ids=False, enforceWarninglist=False) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - elif domainname in self.config.externallist: # External analysis - attribute = self.misp_event.add_attribute('link', entry, category='External analysis', - to_ids=False, enforceWarninglist=False) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - else: # The URL is probably an indicator. - comment = "" - if (domainname in self.config.noidsflaglist) or (hostname in self.config.noidsflaglist): - ids_flag = False - comment = "Known host (mostly for connectivity test or IP lookup)" - if debug: - syslog.syslog(str(entry)) - - if scheme: - if is_ip(hostname): - attribute = self.misp_event.add_attribute('url', entry, to_ids=False, - enforceWarninglist=config.enforcewarninglist) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - else: - if resource_path: # URL has path, ignore warning list - attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag, - enforceWarninglist=False, comment=comment) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - else: # URL has no path - attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag, - enforceWarninglist=config.enforcewarninglist, comment=comment) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - if self.config.sighting: - self.sighting(entry, self.config.sighting_source) - - if hostname in hostname_processed: - # Hostname already processed. - continue - - hostname_processed.append(hostname) - if self.config.sighting: - self.sighting(hostname, self.config.sighting_source) - - if debug: - syslog.syslog(hostname) - - comment = '' - port = self.f.get_port() - if port: - port = port.decode() - comment = f'on port: {port}' - - if is_ip(hostname): - attribute = self.misp_event.add_attribute('ip-dst', hostname, to_ids=ids_flag, - enforceWarninglist=config.enforcewarninglist, - comment=comment) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - else: - related_ips = [] - try: - syslog.syslog(hostname) - for rdata in dns.resolver.query(hostname, 'A'): - if debug: - syslog.syslog(str(rdata)) - related_ips.append(rdata.to_text()) - except Exception as e: - if debug: - syslog.syslog(str(e)) - - if related_ips: - hip = MISPObject(name='ip-port') - hip.add_attribute('hostname', value=hostname, to_ids=ids_flag, - enforceWarninglist=config.enforcewarninglist, comment=comment) - for ip in set(related_ips): - hip.add_attribute('ip', type='ip-dst', value=ip, to_ids=False, - enforceWarninglist=config.enforcewarninglist) - self.misp_event.add_object(hip) - if email_object: - email_object.add_reference(hip.uuid, 'contains') - else: - attribute = self.misp_event.add_attribute('hostname', value=hostname, - to_ids=ids_flag, enforceWarninglist=config.enforcewarninglist, - comment=comment) - if email_object: - email_object.add_reference(attribute.uuid, 'contains') - - def add_event(self): - '''Add event on the remote MISP instance.''' - - # Add additional tags depending on others - tags = [] - for tag in [t.name for t in self.misp_event.tags]: - if self.config.dependingtags.get(tag): - tags += self.config.dependingtags.get(tag) - - # Add additional tags according to configuration - for malware in self.config.malwaretags: - if malware.lower() in self.subject.lower(): - tags += self.config.malwaretags.get(malware) - if tags: - [self.misp_event.add_tag(tag) for tag in tags] - - has_tlp_tag = False - for tag in [t.name for t in self.misp_event.tags]: - if tag.lower().startswith('tlp'): - has_tlp_tag = True - if not has_tlp_tag: - self.misp_event.add_tag(config.tlptag_default) - - self.misp.add_event(self.misp_event) +from mail2misp import Mail2MISP if __name__ == '__main__': parser = argparse.ArgumentParser(description='Push a Mail into a MISP instance') diff --git a/mail_to_misp_config.py-example b/mail_to_misp_config.py-example index 34fb226..e170b0e 100644 --- a/mail_to_misp_config.py-example +++ b/mail_to_misp_config.py-example @@ -43,7 +43,7 @@ sighting_source = "YOUR_MAIL_TO_MISP_IDENTIFIER" # Remove "Re:", "Fwd:" and {Spam?} from subject # add: "[\(\[].*?[\)\]]" to remove everything between [] and (): i.e. [tag] -removelist = ("Re:", "Fwd:", "\{Spam\?\} ") +removelist = (r"Re:", r"Fwd:", r"\{Spam\?\} ") # TLP tag setup # Tuples contain different variations of spelling diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/config_forward.py b/tests/config_forward.py new file mode 100644 index 0000000..f988989 --- /dev/null +++ b/tests/config_forward.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +misp_url = 'YOUR_MISP_URL' +misp_key = 'YOUR_KEY_HERE' # The MISP auth key can be found on the MISP web interface under the automation section +misp_verifycert = True +spamtrap = False +default_distribution = 0 +default_threat_level = 3 +default_analysis = 1 + +body_config_prefix = 'm2m' # every line in the body starting with this value will be skipped from the IOCs +m2m_key = 'YOUSETYOURKEYHERE' +m2m_benign_attachment_keyword = 'benign' + +debug = True +nameservers = ['8.8.8.8'] +email_subject_prefix = 'M2M' +attach_original_mail = True + +excludelist = ('google.com', 'microsoft.com') +externallist = ('virustotal.com', 'malwr.com', 'hybrid-analysis.com', 'emergingthreats.net') +internallist = ('internal.system.local') +noidsflaglist = ('myexternalip.com', 'ipinfo.io', 'icanhazip.com', 'wtfismyip.com', 'ipecho.net', + 'api.ipify.org', 'checkip.amazonaws.com', 'whatismyipaddress.com', 'google.com', + 'dropbox.com' + ) + +# Stop parsing when this term is found +stopword = 'Whois & IP Information' + +# Ignore lines in body of message containing: +ignorelist = ("From:", "Sender:", "Received:", "Sender IP:", "Reply-To:", "Registrar WHOIS Server:", + "Registrar:", "Domain Status:", "Registrant Email:", "IP Location:", + "X-Get-Message-Sender-Via:", "X-Authenticated-Sender:") + +# Ignore (don't add) attributes that are on server side warning list +enforcewarninglist = True + +# Add a sighting for each value +sighting = False +sighting_source = "YOUR_MAIL_TO_MISP_IDENTIFIER" + +# Remove "Re:", "Fwd:" and {Spam?} from subject +# add: "[\(\[].*?[\)\]]" to remove everything between [] and (): i.e. [tag] +removelist = (r'Re:', r'Fwd:', r'\{Spam?\}') + +# TLP tag setup +# Tuples contain different variations of spelling +tlptags = {'tlp:amber': ['tlp:amber', 'tlp: amber', 'tlp amber'], + 'tlp:green': ['tlp:green', 'tlp: green', 'tlp green'], + 'tlp:white': ['tlp:white', 'tlp: white', 'tlp white'] + } +tlptag_default = sorted(tlptags.keys())[0] + +malwaretags = {'locky': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Locky"'], + 'jaff': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Jaff"'], + 'dridex': ['misp-galaxy:tool="dridex"'], + 'netwire': ['Netwire RAT'], + 'Pony': ['misp-galaxy:tool="Hancitor"'], + 'ursnif': ['misp-galaxy:tool="Snifula"'], + 'NanoCore': ['misp-galaxy:tool="NanoCoreRAT"'], + 'trickbot': ['misp-galaxy:tool="Trick Bot"'] + } + +# Tags to be set depending on the presence of other tags +dependingtags = {'tlp:white': ['circl:osint-feed'] + } + +# Known identifiers for forwarded messages +forward_identifiers = {'-------- Forwarded Message --------', 'Begin forwarded message:'} + +# Tags to add when hashes are found (e.g. to do automatic expansion) +hash_only_tags = {'TODO:VT-ENRICHMENT'} + +# If an attribute is on any MISP server side `warning list`, skip the creation of the attribute +skip_item_on_warninglist = True diff --git a/tests/config_spamtrap.py b/tests/config_spamtrap.py new file mode 100644 index 0000000..8f5dad4 --- /dev/null +++ b/tests/config_spamtrap.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +misp_url = 'YOUR_MISP_URL' +misp_key = 'YOUR_KEY_HERE' # The MISP auth key can be found on the MISP web interface under the automation section +misp_verifycert = True +spamtrap = True +default_distribution = 0 +default_threat_level = 3 +default_analysis = 1 + +body_config_prefix = 'm2m' # every line in the body starting with this value will be skipped from the IOCs +m2m_key = 'YOUSETYOURKEYHERE' +m2m_benign_attachment_keyword = 'benign' + +debug = True +nameservers = ['8.8.8.8'] +email_subject_prefix = 'M2M' +attach_original_mail = True + +excludelist = ('google.com', 'microsoft.com') +externallist = ('virustotal.com', 'malwr.com', 'hybrid-analysis.com', 'emergingthreats.net') +internallist = ('internal.system.local') +noidsflaglist = ('myexternalip.com', 'ipinfo.io', 'icanhazip.com', 'wtfismyip.com', 'ipecho.net', + 'api.ipify.org', 'checkip.amazonaws.com', 'whatismyipaddress.com', 'google.com', + 'dropbox.com' + ) + +# Stop parsing when this term is found +stopword = 'Whois & IP Information' + +# Ignore lines in body of message containing: +ignorelist = ("From:", "Sender:", "Received:", "Sender IP:", "Reply-To:", "Registrar WHOIS Server:", + "Registrar:", "Domain Status:", "Registrant Email:", "IP Location:", + "X-Get-Message-Sender-Via:", "X-Authenticated-Sender:") + +# Ignore (don't add) attributes that are on server side warning list +enforcewarninglist = True + +# Add a sighting for each value +sighting = False +sighting_source = "YOUR_MAIL_TO_MISP_IDENTIFIER" + +# Remove "Re:", "Fwd:" and {Spam?} from subject +# add: "[\(\[].*?[\)\]]" to remove everything between [] and (): i.e. [tag] +removelist = (r'Re:', r'Fwd:', r'\{Spam\?\}') + +# TLP tag setup +# Tuples contain different variations of spelling +tlptags = {'tlp:amber': ['tlp:amber', 'tlp: amber', 'tlp amber'], + 'tlp:green': ['tlp:green', 'tlp: green', 'tlp green'], + 'tlp:white': ['tlp:white', 'tlp: white', 'tlp white'] + } +tlptag_default = sorted(tlptags.keys())[0] + +malwaretags = {'locky': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Locky"'], + 'jaff': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Jaff"'], + 'dridex': ['misp-galaxy:tool="dridex"'], + 'netwire': ['Netwire RAT'], + 'Pony': ['misp-galaxy:tool="Hancitor"'], + 'ursnif': ['misp-galaxy:tool="Snifula"'], + 'NanoCore': ['misp-galaxy:tool="NanoCoreRAT"'], + 'trickbot': ['misp-galaxy:tool="Trick Bot"'] + } + +# Tags to be set depending on the presence of other tags +dependingtags = {'tlp:white': ['circl:osint-feed'] + } + +# Known identifiers for forwarded messages +forward_identifiers = {'-------- Forwarded Message --------', 'Begin forwarded message:'} + +# Tags to add when hashes are found (e.g. to do automatic expansion) +hash_only_tags = {'TODO:VT-ENRICHMENT'} + +# If an attribute is on any MISP server side `warning list`, skip the creation of the attribute +skip_item_on_warninglist = True diff --git a/tests/mails b/tests/mails new file mode 160000 index 0000000..1e673f9 --- /dev/null +++ b/tests/mails @@ -0,0 +1 @@ +Subproject commit 1e673f9d64e24136a9e4ef3f731c1a6052f9bb87 diff --git a/tests/tests.py b/tests/tests.py new file mode 100644 index 0000000..8bb8952 --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import unittest +import importlib +import sys +from io import BytesIO +sys.path.insert(0, ".") + +from mail2misp import Mail2MISP + + +class TestMailToMISP(unittest.TestCase): + + def test_spamtrap(self): + config = importlib.import_module('tests.config_spamtrap') + self.mail2misp = Mail2MISP('', '', '', config=config, offline=True) + with open('tests/mails/simple_spamtrap.eml', 'rb') as f: + self.mail2misp.load_email(BytesIO(f.read())) + self.mail2misp.email_from_spamtrap() + self.mail2misp.process_body_iocs() + event = self.mail2misp.add_event() + print(event) + + def test_spamtrap_attachment(self): + config = importlib.import_module('tests.config_spamtrap') + self.mail2misp = Mail2MISP('', '', '', config=config, offline=True) + with open('tests/mails/attachment_spamtrap.eml', 'rb') as f: + self.mail2misp.load_email(BytesIO(f.read())) + self.mail2misp.email_from_spamtrap() + self.mail2misp.process_body_iocs() + event = self.mail2misp.add_event() + print(event) + + def test_forward(self): + config = importlib.import_module('tests.config_forward') + self.mail2misp = Mail2MISP('', '', '', config=config, offline=True) + with open('tests/mails/simple_forward.eml', 'rb') as f: + self.mail2misp.load_email(BytesIO(f.read())) + self.mail2misp.process_email_body() + self.mail2misp.process_body_iocs() + event = self.mail2misp.add_event() + print(event) + + def test_forward_attachment(self): + config = importlib.import_module('tests.config_forward') + self.mail2misp = Mail2MISP('', '', '', config=config, offline=True) + with open('tests/mails/attachment_forward.eml', 'rb') as f: + self.mail2misp.load_email(BytesIO(f.read())) + self.mail2misp.process_email_body() + self.mail2misp.process_body_iocs() + event = self.mail2misp.add_event() + print(event) + + +if __name__ == '__main__': + unittest.main()