mirror of https://github.com/MISP/mail_to_misp
Merge branch 'master' of https://github.com/rommelfs/mail_to_misp
Conflicts: mail_to_misp.pypull/38/head
commit
f19006e56d
|
@ -0,0 +1,3 @@
|
|||
[submodule "tests/mails"]
|
||||
path = tests/mails
|
||||
url = https://github.com/MISP/mail_to_misp_test.git
|
|
@ -0,0 +1,35 @@
|
|||
language: python
|
||||
|
||||
cache: pip
|
||||
|
||||
sudo: required
|
||||
|
||||
addons:
|
||||
apt:
|
||||
sources: [ 'ubuntu-toolchain-r-test' ]
|
||||
packages:
|
||||
- libstdc++6
|
||||
- libfuzzy-dev
|
||||
|
||||
python:
|
||||
- "3.6"
|
||||
- "3.6-dev"
|
||||
|
||||
install:
|
||||
- git clone git://github.com/stricaud/faup.git
|
||||
- pushd faup/build
|
||||
- cmake .. && make
|
||||
- sudo make install
|
||||
- popd
|
||||
- sudo ldconfig
|
||||
- pip install pipenv
|
||||
- pipenv install -d
|
||||
- git submodule init
|
||||
- git submodule update
|
||||
|
||||
script:
|
||||
- pipenv run nosetests --with-coverage --cover-package=mail2misp tests/tests.py
|
||||
|
||||
after_success:
|
||||
- pipenv run codecov
|
||||
- pipenv run coveralls
|
|
@ -0,0 +1,22 @@
|
|||
[[source]]
|
||||
name = "pypi"
|
||||
url = "https://pypi.org/simple"
|
||||
verify_ssl = true
|
||||
|
||||
[dev-packages]
|
||||
nose = "*"
|
||||
coverage = "*"
|
||||
codecov = "*"
|
||||
coveralls = "*"
|
||||
|
||||
[packages]
|
||||
dnspython = "*"
|
||||
lief = "*"
|
||||
python-magic = "*"
|
||||
pydeep = {git = "https://github.com/kbandla/pydeep.git"}
|
||||
pyfaup = {git = "https://github.com/stricaud/faup.git",subdirectory = "src/lib/bindings/python"}
|
||||
defang = {git = "https://github.com/Rafiot/defang.git"}
|
||||
pymisp = {editable = true,git = "https://github.com/MISP/PyMISP.git"}
|
||||
|
||||
[requires]
|
||||
python_version = "3.6"
|
|
@ -0,0 +1,235 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "20759a97e7bb6bc062e147a56426b3039344319c4140e6312f3a3715b6265ad7"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
"python_version": "3.6"
|
||||
},
|
||||
"sources": [
|
||||
{
|
||||
"name": "pypi",
|
||||
"url": "https://pypi.org/simple",
|
||||
"verify_ssl": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"default": {
|
||||
"attrs": {
|
||||
"hashes": [
|
||||
"sha256:69c0dbf2ed392de1cb5ec704444b08a5ef81680a61cb899dc08127123af36a79",
|
||||
"sha256:f0b870f674851ecbfbbbd364d6b5cbdff9dcedbc7f3f5e18a6891057f21fe399"
|
||||
],
|
||||
"version": "==19.1.0"
|
||||
},
|
||||
"certifi": {
|
||||
"hashes": [
|
||||
"sha256:59b7658e26ca9c7339e00f8f4636cdfe59d34fa37b9b04f6f9e9926b3cece1a5",
|
||||
"sha256:b26104d6835d1f5e49452a26eb2ff87fe7090b89dfcaee5ea2212697e1e1d7ae"
|
||||
],
|
||||
"version": "==2019.3.9"
|
||||
},
|
||||
"chardet": {
|
||||
"hashes": [
|
||||
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae",
|
||||
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"
|
||||
],
|
||||
"version": "==3.0.4"
|
||||
},
|
||||
"defang": {
|
||||
"git": "https://github.com/Rafiot/defang.git",
|
||||
"ref": "52972a25313e2899f98f1777b940cb2122566a26"
|
||||
},
|
||||
"dnspython": {
|
||||
"hashes": [
|
||||
"sha256:36c5e8e38d4369a08b6780b7f27d790a292b2b08eea01607865bf0936c558e01",
|
||||
"sha256:f69c21288a962f4da86e56c4905b49d11aba7938d3d740e80d9e366ee4f1632d"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==1.16.0"
|
||||
},
|
||||
"idna": {
|
||||
"hashes": [
|
||||
"sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407",
|
||||
"sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c"
|
||||
],
|
||||
"version": "==2.8"
|
||||
},
|
||||
"jsonschema": {
|
||||
"hashes": [
|
||||
"sha256:0c0a81564f181de3212efa2d17de1910f8732fa1b71c42266d983cd74304e20d",
|
||||
"sha256:a5f6559964a3851f59040d3b961de5e68e70971afb88ba519d27e6a039efff1a"
|
||||
],
|
||||
"version": "==3.0.1"
|
||||
},
|
||||
"lief": {
|
||||
"hashes": [
|
||||
"sha256:c95974006a6b8a767eea8b35e6c63e2b20939730063ac472894b53ab9855a0b5"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.9.0"
|
||||
},
|
||||
"pydeep": {
|
||||
"git": "https://github.com/kbandla/pydeep.git",
|
||||
"ref": "bc0d33bff4b45718b4c5f2c79d4715d92a427eda"
|
||||
},
|
||||
"pyfaup": {
|
||||
"git": "https://github.com/stricaud/faup.git",
|
||||
"ref": "88dbbe2378552c9753b4f1e938663484909a4940",
|
||||
"subdirectory": "src/lib/bindings/python"
|
||||
},
|
||||
"pymisp": {
|
||||
"editable": true,
|
||||
"git": "https://github.com/MISP/PyMISP.git",
|
||||
"ref": "c888af177f88af653ad395924a3b840ca22f0af4"
|
||||
},
|
||||
"pyrsistent": {
|
||||
"hashes": [
|
||||
"sha256:3ca82748918eb65e2d89f222b702277099aca77e34843c5eb9d52451173970e2"
|
||||
],
|
||||
"version": "==0.14.11"
|
||||
},
|
||||
"python-dateutil": {
|
||||
"hashes": [
|
||||
"sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb",
|
||||
"sha256:c89805f6f4d64db21ed966fda138f8a5ed7a4fdbc1a8ee329ce1b74e3c74da9e"
|
||||
],
|
||||
"version": "==2.8.0"
|
||||
},
|
||||
"python-magic": {
|
||||
"hashes": [
|
||||
"sha256:f2674dcfad52ae6c49d4803fa027809540b130db1dec928cfbb9240316831375",
|
||||
"sha256:f3765c0f582d2dfc72c15f3b5a82aecfae9498bd29ca840d72f37d7bd38bfcd5"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.4.15"
|
||||
},
|
||||
"requests": {
|
||||
"hashes": [
|
||||
"sha256:502a824f31acdacb3a35b6690b5fbf0bc41d63a24a45c4004352b0242707598e",
|
||||
"sha256:7bf2a778576d825600030a110f3c0e3e8edc51dfaafe1c146e39a2027784957b"
|
||||
],
|
||||
"version": "==2.21.0"
|
||||
},
|
||||
"six": {
|
||||
"hashes": [
|
||||
"sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c",
|
||||
"sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73"
|
||||
],
|
||||
"version": "==1.12.0"
|
||||
},
|
||||
"urllib3": {
|
||||
"hashes": [
|
||||
"sha256:a53063d8b9210a7bdec15e7b272776b9d42b2fd6816401a0d43006ad2f9902db",
|
||||
"sha256:d363e3607d8de0c220d31950a8f38b18d5ba7c0830facd71a1c6b1036b7ce06c"
|
||||
],
|
||||
"version": "==1.25.2"
|
||||
}
|
||||
},
|
||||
"develop": {
|
||||
"certifi": {
|
||||
"hashes": [
|
||||
"sha256:59b7658e26ca9c7339e00f8f4636cdfe59d34fa37b9b04f6f9e9926b3cece1a5",
|
||||
"sha256:b26104d6835d1f5e49452a26eb2ff87fe7090b89dfcaee5ea2212697e1e1d7ae"
|
||||
],
|
||||
"version": "==2019.3.9"
|
||||
},
|
||||
"chardet": {
|
||||
"hashes": [
|
||||
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae",
|
||||
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"
|
||||
],
|
||||
"version": "==3.0.4"
|
||||
},
|
||||
"codecov": {
|
||||
"hashes": [
|
||||
"sha256:8ed8b7c6791010d359baed66f84f061bba5bd41174bf324c31311e8737602788",
|
||||
"sha256:ae00d68e18d8a20e9c3288ba3875ae03db3a8e892115bf9b83ef20507732bed4"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==2.0.15"
|
||||
},
|
||||
"coverage": {
|
||||
"hashes": [
|
||||
"sha256:3684fabf6b87a369017756b551cef29e505cb155ddb892a7a29277b978da88b9",
|
||||
"sha256:39e088da9b284f1bd17c750ac672103779f7954ce6125fd4382134ac8d152d74",
|
||||
"sha256:3c205bc11cc4fcc57b761c2da73b9b72a59f8d5ca89979afb0c1c6f9e53c7390",
|
||||
"sha256:465ce53a8c0f3a7950dfb836438442f833cf6663d407f37d8c52fe7b6e56d7e8",
|
||||
"sha256:48020e343fc40f72a442c8a1334284620f81295256a6b6ca6d8aa1350c763bbe",
|
||||
"sha256:5296fc86ab612ec12394565c500b412a43b328b3907c0d14358950d06fd83baf",
|
||||
"sha256:5f61bed2f7d9b6a9ab935150a6b23d7f84b8055524e7be7715b6513f3328138e",
|
||||
"sha256:68a43a9f9f83693ce0414d17e019daee7ab3f7113a70c79a3dd4c2f704e4d741",
|
||||
"sha256:6b8033d47fe22506856fe450470ccb1d8ba1ffb8463494a15cfc96392a288c09",
|
||||
"sha256:7ad7536066b28863e5835e8cfeaa794b7fe352d99a8cded9f43d1161be8e9fbd",
|
||||
"sha256:7bacb89ccf4bedb30b277e96e4cc68cd1369ca6841bde7b005191b54d3dd1034",
|
||||
"sha256:839dc7c36501254e14331bcb98b27002aa415e4af7ea039d9009409b9d2d5420",
|
||||
"sha256:8f9a95b66969cdea53ec992ecea5406c5bd99c9221f539bca1e8406b200ae98c",
|
||||
"sha256:932c03d2d565f75961ba1d3cec41ddde00e162c5b46d03f7423edcb807734eab",
|
||||
"sha256:988529edadc49039d205e0aa6ce049c5ccda4acb2d6c3c5c550c17e8c02c05ba",
|
||||
"sha256:998d7e73548fe395eeb294495a04d38942edb66d1fa61eb70418871bc621227e",
|
||||
"sha256:9de60893fb447d1e797f6bf08fdf0dbcda0c1e34c1b06c92bd3a363c0ea8c609",
|
||||
"sha256:9e80d45d0c7fcee54e22771db7f1b0b126fb4a6c0a2e5afa72f66827207ff2f2",
|
||||
"sha256:a545a3dfe5082dc8e8c3eb7f8a2cf4f2870902ff1860bd99b6198cfd1f9d1f49",
|
||||
"sha256:a5d8f29e5ec661143621a8f4de51adfb300d7a476224156a39a392254f70687b",
|
||||
"sha256:aca06bfba4759bbdb09bf52ebb15ae20268ee1f6747417837926fae990ebc41d",
|
||||
"sha256:bb23b7a6fd666e551a3094ab896a57809e010059540ad20acbeec03a154224ce",
|
||||
"sha256:bfd1d0ae7e292105f29d7deaa9d8f2916ed8553ab9d5f39ec65bcf5deadff3f9",
|
||||
"sha256:c62ca0a38958f541a73cf86acdab020c2091631c137bd359c4f5bddde7b75fd4",
|
||||
"sha256:c709d8bda72cf4cd348ccec2a4881f2c5848fd72903c185f363d361b2737f773",
|
||||
"sha256:c968a6aa7e0b56ecbd28531ddf439c2ec103610d3e2bf3b75b813304f8cb7723",
|
||||
"sha256:df785d8cb80539d0b55fd47183264b7002077859028dfe3070cf6359bf8b2d9c",
|
||||
"sha256:f406628ca51e0ae90ae76ea8398677a921b36f0bd71aab2099dfed08abd0322f",
|
||||
"sha256:f46087bbd95ebae244a0eda01a618aff11ec7a069b15a3ef8f6b520db523dcf1",
|
||||
"sha256:f8019c5279eb32360ca03e9fac40a12667715546eed5c5eb59eb381f2f501260",
|
||||
"sha256:fc5f4d209733750afd2714e9109816a29500718b32dd9a5db01c0cb3a019b96a"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==4.5.3"
|
||||
},
|
||||
"coveralls": {
|
||||
"hashes": [
|
||||
"sha256:baa26648430d5c2225ab12d7e2067f75597a4b967034bba7e3d5ab7501d207a1",
|
||||
"sha256:ff9b7823b15070f26f654837bb02a201d006baaf2083e0514ffd3b34a3ffed81"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==1.7.0"
|
||||
},
|
||||
"docopt": {
|
||||
"hashes": [
|
||||
"sha256:49b3a825280bd66b3aa83585ef59c4a8c82f2c8a522dbe754a8bc8d08c85c491"
|
||||
],
|
||||
"version": "==0.6.2"
|
||||
},
|
||||
"idna": {
|
||||
"hashes": [
|
||||
"sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407",
|
||||
"sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c"
|
||||
],
|
||||
"version": "==2.8"
|
||||
},
|
||||
"nose": {
|
||||
"hashes": [
|
||||
"sha256:9ff7c6cc443f8c51994b34a667bbcf45afd6d945be7477b52e97516fd17c53ac",
|
||||
"sha256:dadcddc0aefbf99eea214e0f1232b94f2fa9bd98fa8353711dacb112bfcbbb2a",
|
||||
"sha256:f1bffef9cbc82628f6e7d7b40d7e255aefaa1adb6a1b1d26c69a8b79e6208a98"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==1.3.7"
|
||||
},
|
||||
"requests": {
|
||||
"hashes": [
|
||||
"sha256:502a824f31acdacb3a35b6690b5fbf0bc41d63a24a45c4004352b0242707598e",
|
||||
"sha256:7bf2a778576d825600030a110f3c0e3e8edc51dfaafe1c146e39a2027784957b"
|
||||
],
|
||||
"version": "==2.21.0"
|
||||
},
|
||||
"urllib3": {
|
||||
"hashes": [
|
||||
"sha256:a53063d8b9210a7bdec15e7b272776b9d42b2fd6816401a0d43006ad2f9902db",
|
||||
"sha256:d363e3607d8de0c220d31950a8f38b18d5ba7c0830facd71a1c6b1036b7ce06c"
|
||||
],
|
||||
"version": "==1.25.2"
|
||||
}
|
||||
}
|
||||
}
|
44
README.md
44
README.md
|
@ -1,3 +1,6 @@
|
|||
[![Build Status](https://travis-ci.org/MISP/mail_to_misp.svg?branch=master)](https://travis-ci.org/MISP/mail_to_misp)
|
||||
[![codecov](https://codecov.io/gh/MISP/mail_to_misp/branch/master/graph/badge.svg)](https://codecov.io/gh/MISP/mail_to_misp)
|
||||
|
||||
# mail_to_misp
|
||||
|
||||
Connect your mail infrastructure to [MISP](https://github.com/MISP/MISP) in order to create events based on the information contained within mails.
|
||||
|
@ -37,6 +40,22 @@ If you send a mail to mail_to_misp containing: `key:ABCDEFGHIJKLMN0PQRSTUVWXYZ`
|
|||
If you don't want to use this feature, just don't put it in the message body.
|
||||
The distribution is defined in the configuration as well: `m2m_auto_distribution = '3' # 3 = All communities`
|
||||
|
||||
# Pass parameters in the email body
|
||||
|
||||
```
|
||||
m2m:<parameter>:<Value>
|
||||
|
||||
# Examples
|
||||
m2m:attachment:benign # Email attachment considered benign (attachment in MISP, malware-sample by default)
|
||||
m2m:attach_original_mail:1 # Attach the full original email to the MISP Event (may contain private information)
|
||||
|
||||
m2m:m2mkey:YOUSETYOURKEYHERE # Key required for some actions
|
||||
# The following key are ignored if M2M:m2mkey is invalid
|
||||
m2m:distribution:<0-3,5> # Note: impossible to pass a sharing group yet.
|
||||
m2m:threat_level:<0-2>
|
||||
m2m:analysis:<0-3>
|
||||
m2m:publish:1 # Autopublish
|
||||
```
|
||||
|
||||
## Implementation
|
||||
|
||||
|
@ -61,7 +80,7 @@ The implemented workflow is mainly for mail servers like Postfix. Client side im
|
|||
|
||||
1. Setup a new email address in the aliases file (e.g. /etc/aliases) and configure the correct path:
|
||||
|
||||
`misp_handler: "|/path/to/mail_to_misp.py"`
|
||||
`misp_handler: "|/path/to/mail_to_misp.py -"`
|
||||
|
||||
2. Rebuild the DB:
|
||||
|
||||
|
@ -133,6 +152,29 @@ Obviously, you would like to filter mails based on subject or from address and p
|
|||
|
||||
## Requirements
|
||||
|
||||
### The easy way
|
||||
|
||||
```bash
|
||||
# Install faup
|
||||
git clone git://github.com/stricaud/faup.git
|
||||
cd faup
|
||||
mkdir build
|
||||
cd build
|
||||
cmake .. && make
|
||||
sudo make install
|
||||
|
||||
# Update Shared libs
|
||||
sudo ldconfig
|
||||
|
||||
(sudo) pip install (--user) pipenv
|
||||
|
||||
# Install other python requirements
|
||||
pipenv install
|
||||
|
||||
# Test if the script is working
|
||||
./mail_to_misp.py -h
|
||||
```
|
||||
|
||||
### General
|
||||
|
||||
- mail_to_misp requires access to a MISP instance (via API).
|
||||
|
|
65
fake_smtp.py
65
fake_smtp.py
|
@ -1,9 +1,35 @@
|
|||
#!/usr/bin/env python
|
||||
import sys
|
||||
#!/usr/bin/env python3
|
||||
import ssl
|
||||
from pathlib import Path
|
||||
import importlib
|
||||
from subprocess import run, PIPE
|
||||
import aiosmtpd.controller
|
||||
from aiosmtpd.controller import Controller
|
||||
from aiosmtpd.smtp import SMTP
|
||||
import subprocess
|
||||
import argparse
|
||||
|
||||
|
||||
def get_context():
|
||||
key_path = Path('certs', 'key.pem')
|
||||
cert_path = Path('certs', 'cert.pem')
|
||||
|
||||
if not cert_path.exists() and not key_path.exists():
|
||||
subprocess.call(f'openssl req -x509 -newkey rsa:4096 -keyout {key_path.as_posix()} -out {cert_path.as_posix()} -days 365 -nodes -subj "/CN=localhost"', shell=True)
|
||||
|
||||
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
context.load_cert_chain(cert_path.as_posix(), key_path.as_posix())
|
||||
|
||||
|
||||
# Pass SSL context to aiosmtpd
|
||||
class ControllerSSL(Controller):
|
||||
def factory(self):
|
||||
return SMTP(self.handler, ssl_context=get_context())
|
||||
|
||||
|
||||
# Pass SSL context to aiosmtpd
|
||||
class ControllerSTARTTLS(Controller):
|
||||
def factory(self):
|
||||
return SMTP(self.handler, require_starttls=False, tls_context=get_context())
|
||||
|
||||
|
||||
class CustomSMTPHandler:
|
||||
|
@ -12,27 +38,48 @@ class CustomSMTPHandler:
|
|||
print(f'Message addressed from: {envelope.mail_from}')
|
||||
print(f'Message addressed to : {envelope.rcpt_tos}')
|
||||
print(f'Message length : {len(envelope.content)}')
|
||||
if email_forward in envelope.rcpt_tos:
|
||||
p = run([binpath_forward, "-"], stdout=PIPE, input=envelope.content)
|
||||
else:
|
||||
p = run([binpath, "-"], stdout=PIPE, input=envelope.content)
|
||||
print(p)
|
||||
return '250 OK'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Launch a fake SMTP server to push SPAMs to a MISP instance')
|
||||
parser.add_argument("--path", default='./mail_to_misp.py', help="Path to the mail_to_misp.py script.")
|
||||
parser.add_argument("--path_forward", default='./mail_to_misp.py', help="Path to the mail_to_misp.py script.")
|
||||
parser.add_argument("--email_forward", default='mail2misp@example.com', help="Path to the mail_to_misp.py script.")
|
||||
parser.add_argument("--host", default='127.0.0.1', help="IP to attach the SMTP server to.")
|
||||
parser.add_argument("--port", default='2525', help="Port of the SMTP server")
|
||||
parser.add_argument("--ssl", action='store_true', help="Pure SMTPs.")
|
||||
args = parser.parse_args()
|
||||
|
||||
configmodule = Path(__file__).as_posix().replace('.py', '_config')
|
||||
if Path(f'{configmodule}.py').exists():
|
||||
config = importlib.import_module(configmodule)
|
||||
else:
|
||||
print("Couldn't locate config file {0}".format(f'{configmodule}.py'))
|
||||
sys.exit(-1)
|
||||
|
||||
binpath = config.binpath
|
||||
binpath_forward = config.binpath_forward
|
||||
email_forward = config.email_forward
|
||||
smtp_addr = config.smtp_addr
|
||||
smtp_port = config.smtp_port
|
||||
binpath = config.binpath
|
||||
smtps = config.ssl
|
||||
else:
|
||||
binpath = args.path
|
||||
binpath_forward = args.path_forward
|
||||
email_forward = args.email_forward
|
||||
smtp_addr = args.host
|
||||
smtp_port = args.port
|
||||
smtps = args.ssl
|
||||
|
||||
print("Starting Fake-SMTP-to-MISP server")
|
||||
|
||||
handler = CustomSMTPHandler()
|
||||
server = aiosmtpd.controller.Controller(handler, hostname=smtp_addr, port=smtp_port)
|
||||
if smtps:
|
||||
server = ControllerSSL(handler, hostname=smtp_addr, port=smtp_port)
|
||||
else:
|
||||
server = ControllerSTARTTLS(handler, hostname=smtp_addr, port=smtp_port)
|
||||
server.start()
|
||||
input("Server started. Press Return to quit.")
|
||||
server.stop()
|
||||
|
|
|
@ -2,7 +2,11 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from pathlib import Path
|
||||
|
||||
binpath = Path(__file__).cwd() / 'mail_to_misp.py'
|
||||
binpath = Path(__file__).parent / 'mail_to_misp.py'
|
||||
binpath_forward = Path(__file__).parent / 'mail_to_misp_forward.py'
|
||||
|
||||
email_forward = 'mail2misp@example.com'
|
||||
|
||||
smtp_addr = '127.0.0.1'
|
||||
smtp_port = 2525
|
||||
ssl = False
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
from . import urlmarker
|
||||
from . import hashmarker
|
||||
from .mail2misp import Mail2MISP
|
|
@ -0,0 +1,381 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import re
|
||||
import syslog
|
||||
import html
|
||||
from io import BytesIO
|
||||
from ipaddress import ip_address
|
||||
from email import message_from_bytes, policy, message
|
||||
|
||||
from . import urlmarker
|
||||
from . import hashmarker
|
||||
from pyfaup.faup import Faup
|
||||
from pymisp import PyMISP, MISPEvent, MISPObject, MISPSighting
|
||||
from pymisp.tools import EMailObject, make_binary_objects
|
||||
from defang import refang
|
||||
try:
|
||||
import dns.resolver
|
||||
HAS_DNS = True
|
||||
except ImportError:
|
||||
HAS_DNS = False
|
||||
|
||||
|
||||
def is_ip(address):
|
||||
try:
|
||||
ip_address(address)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class Mail2MISP():
|
||||
|
||||
def __init__(self, misp_url, misp_key, verifycert, config, offline=False):
|
||||
self.offline = offline
|
||||
if not self.offline:
|
||||
self.misp = PyMISP(misp_url, misp_key, verifycert, debug=config.debug)
|
||||
self.config = config
|
||||
if not hasattr(self.config, 'enable_dns'):
|
||||
setattr(self.config, 'enable_dns', True)
|
||||
self.debug = self.config.debug
|
||||
self.config_from_email_body = {}
|
||||
# Init Faup
|
||||
self.f = Faup()
|
||||
self.sightings_to_add = []
|
||||
|
||||
def load_email(self, pseudofile):
|
||||
self.pseudofile = pseudofile
|
||||
self.original_mail = message_from_bytes(self.pseudofile.getvalue(), policy=policy.default)
|
||||
self.subject = self.original_mail.get('Subject')
|
||||
# Remove words from subject
|
||||
for removeword in self.config.removelist:
|
||||
self.subject = re.sub(removeword, "", self.subject).strip()
|
||||
|
||||
# Initialize the MISP event
|
||||
self.misp_event = MISPEvent()
|
||||
self.misp_event.info = f'{self.config.email_subject_prefix} - {self.subject}'
|
||||
self.misp_event.distribution = self.config.default_distribution
|
||||
self.misp_event.threat_level_id = self.config.default_threat_level
|
||||
self.misp_event.analysis = self.config.default_analysis
|
||||
|
||||
def sighting(self, value, source):
|
||||
if self.offline:
|
||||
raise Exception('The script is running in offline mode, ')
|
||||
'''Add a sighting'''
|
||||
s = MISPSighting()
|
||||
s.from_dict(value=value, source=source)
|
||||
self.misp.set_sightings(s)
|
||||
|
||||
def _find_inline_forward(self):
|
||||
'''Does the body contains a forwarded email?'''
|
||||
for identifier in self.config.forward_identifiers:
|
||||
if identifier in self.clean_email_body:
|
||||
self.clean_email_body, fw_email = self.clean_email_body.split(identifier)
|
||||
return self.forwarded_email(pseudofile=BytesIO(fw_email.encode()))
|
||||
|
||||
def _find_attached_forward(self):
|
||||
forwarded_emails = []
|
||||
for attachment in self.original_mail.iter_attachments():
|
||||
attachment_content = attachment.get_content()
|
||||
# Search for email forwarded as attachment
|
||||
# I could have more than one, attaching everything.
|
||||
if isinstance(attachment_content, message.EmailMessage):
|
||||
forwarded_emails.append(self.forwarded_email(pseudofile=BytesIO(attachment_content.as_bytes())))
|
||||
else:
|
||||
if isinstance(attachment_content, str):
|
||||
attachment_content = attachment_content.encode()
|
||||
filename = attachment.get_filename()
|
||||
if not filename:
|
||||
filename = 'missing_filename'
|
||||
if self.config_from_email_body.get('attachment') == self.config.m2m_benign_attachment_keyword:
|
||||
# Attach sane file
|
||||
self.misp_event.add_attribute('attachment', value=filename, data=BytesIO(attachment_content))
|
||||
else:
|
||||
f_object, main_object, sections = make_binary_objects(pseudofile=BytesIO(attachment_content), filename=filename, standalone=False)
|
||||
self.misp_event.add_object(f_object)
|
||||
if main_object:
|
||||
self.misp_event.add_object(main_object)
|
||||
[self.misp_event.add_object(section) for section in sections]
|
||||
return forwarded_emails
|
||||
|
||||
def email_from_spamtrap(self):
|
||||
'''The email comes from a spamtrap and should be attached as-is.'''
|
||||
raw_body = self.original_mail.get_body(preferencelist=('html', 'plain'))
|
||||
if raw_body:
|
||||
self.clean_email_body = html.unescape(raw_body.get_payload(decode=True).decode('utf8', 'surrogateescape'))
|
||||
else:
|
||||
self.clean_email_body = ''
|
||||
return self.forwarded_email(self.pseudofile)
|
||||
|
||||
def forwarded_email(self, pseudofile: BytesIO):
|
||||
'''Extracts all possible indicators out of an email and create a MISP event out of it.
|
||||
* Gets all relevant Headers
|
||||
* Attach the body
|
||||
* Create MISP file objects (uses lief if possible)
|
||||
* Set all references
|
||||
'''
|
||||
email_object = EMailObject(pseudofile=pseudofile, attach_original_mail=True, standalone=False)
|
||||
if email_object.attachments:
|
||||
# Create file objects for the attachments
|
||||
for attachment_name, attachment in email_object.attachments:
|
||||
if not attachment_name:
|
||||
attachment_name = 'NameMissing.txt'
|
||||
if self.config_from_email_body.get('attachment') == self.config.m2m_benign_attachment_keyword:
|
||||
a = self.misp_event.add_attribute('attachment', value=attachment_name, data=attachment)
|
||||
email_object.add_reference(a.uuid, 'related-to', 'Email attachment')
|
||||
else:
|
||||
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
|
||||
self.misp_event.add_object(f_object)
|
||||
if main_object:
|
||||
self.misp_event.add_object(main_object)
|
||||
for section in sections:
|
||||
self.misp_event.add_object(section)
|
||||
email_object.add_reference(f_object.uuid, 'related-to', 'Email attachment')
|
||||
self.process_body_iocs(email_object)
|
||||
if self.config.spamtrap or self.config.attach_original_mail or self.config_from_email_body.get('attach_original_mail'):
|
||||
self.misp_event.add_object(email_object)
|
||||
return email_object
|
||||
|
||||
def process_email_body(self):
|
||||
mail_as_bytes = self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True)
|
||||
if mail_as_bytes:
|
||||
self.clean_email_body = html.unescape(mail_as_bytes.decode('utf8', 'surrogateescape'))
|
||||
# Check if there are config lines in the body & convert them to a python dictionary:
|
||||
# <config.body_config_prefix>:<key>:<value> => {<key>: <value>}
|
||||
self.config_from_email_body = {k.strip(): v.strip() for k, v in re.findall(f'{self.config.body_config_prefix}:(.*):(.*)', self.clean_email_body)}
|
||||
if self.config_from_email_body:
|
||||
# ... remove the config lines from the body
|
||||
self.clean_email_body = re.sub(rf'^{self.config.body_config_prefix}.*\n?', '',
|
||||
html.unescape(self.original_mail.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape')), flags=re.MULTILINE)
|
||||
# Check if autopublish key is present and valid
|
||||
if self.config_from_email_body.get('m2mkey') == self.config.m2m_key:
|
||||
if self.config_from_email_body.get('distribution') is not None:
|
||||
self.misp_event.distribution = self.config_from_email_body.get('distribution')
|
||||
if self.config_from_email_body.get('threat_level') is not None:
|
||||
self.misp_event.threat_level_id = self.config_from_email_body.get('threat_level')
|
||||
if self.config_from_email_body.get('analysis') is not None:
|
||||
self.misp_event.analysis = self.config_from_email_body.get('analysis')
|
||||
if self.config_from_email_body.get('publish'):
|
||||
self.misp_event.publish()
|
||||
|
||||
self._find_inline_forward()
|
||||
else:
|
||||
self.clean_email_body = ''
|
||||
self._find_attached_forward()
|
||||
|
||||
def process_body_iocs(self, email_object=None):
|
||||
if email_object:
|
||||
body = html.unescape(email_object.email.get_body(preferencelist=('html', 'plain')).get_payload(decode=True).decode('utf8', 'surrogateescape'))
|
||||
else:
|
||||
body = self.clean_email_body
|
||||
|
||||
# Cleanup body content
|
||||
# Depending on the source of the mail, there is some cleanup to do. Ignore lines in body of message
|
||||
for ignoreline in self.config.ignorelist:
|
||||
body = re.sub(rf'^{ignoreline}.*\n?', '', body, flags=re.MULTILINE)
|
||||
|
||||
# Remove everything after the stopword from the body
|
||||
body = body.split(self.config.stopword, 1)[0]
|
||||
|
||||
# Add tags to the event if keywords are found in the mail
|
||||
for tag in self.config.tlptags:
|
||||
if any(alternativetag in body.lower() for alternativetag in self.config.tlptags[tag]):
|
||||
self.misp_event.add_tag(tag)
|
||||
|
||||
# Prepare extraction of IOCs
|
||||
# Refang email data
|
||||
body = refang(body)
|
||||
|
||||
# Extract and add hashes
|
||||
contains_hash = False
|
||||
for h in set(re.findall(hashmarker.MD5_REGEX, body)):
|
||||
contains_hash = True
|
||||
attribute = self.misp_event.add_attribute('md5', h, enforceWarninglist=self.config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if self.config.sighting:
|
||||
self.sightings_to_add.append((h, self.config.sighting_source))
|
||||
for h in set(re.findall(hashmarker.SHA1_REGEX, body)):
|
||||
contains_hash = True
|
||||
attribute = self.misp_event.add_attribute('sha1', h, enforceWarninglist=self.config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if self.config.sighting:
|
||||
self.sightings_to_add.append((h, self.config.sighting_source))
|
||||
for h in set(re.findall(hashmarker.SHA256_REGEX, body)):
|
||||
contains_hash = True
|
||||
attribute = self.misp_event.add_attribute('sha256', h, enforceWarninglist=self.config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if self.config.sighting:
|
||||
self.sightings_to_add.append((h, self.config.sighting_source))
|
||||
|
||||
if contains_hash:
|
||||
[self.misp_event.add_tag(tag) for tag in self.config.hash_only_tags]
|
||||
|
||||
# # Extract network IOCs
|
||||
urllist = []
|
||||
urllist += re.findall(urlmarker.WEB_URL_REGEX, body)
|
||||
urllist += re.findall(urlmarker.IP_REGEX, body)
|
||||
if self.debug:
|
||||
syslog.syslog(str(urllist))
|
||||
|
||||
hostname_processed = []
|
||||
|
||||
# Add IOCs and expanded information to MISP
|
||||
for entry in set(urllist):
|
||||
ids_flag = True
|
||||
self.f.decode(entry)
|
||||
|
||||
domainname = self.f.get_domain()
|
||||
if domainname in self.config.excludelist:
|
||||
# Ignore the entry
|
||||
continue
|
||||
|
||||
hostname = self.f.get_host()
|
||||
|
||||
scheme = self.f.get_scheme()
|
||||
if scheme:
|
||||
scheme = scheme
|
||||
|
||||
resource_path = self.f.get_resource_path()
|
||||
if resource_path:
|
||||
resource_path = resource_path
|
||||
|
||||
if self.debug:
|
||||
syslog.syslog(domainname)
|
||||
|
||||
if domainname in self.config.internallist: # Add link to internal reference
|
||||
attribute = self.misp_event.add_attribute('link', entry, category='Internal reference',
|
||||
to_ids=False, enforceWarninglist=False)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
elif domainname in self.config.externallist: # External analysis
|
||||
attribute = self.misp_event.add_attribute('link', entry, category='External analysis',
|
||||
to_ids=False, enforceWarninglist=False)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else: # The URL is probably an indicator.
|
||||
comment = ""
|
||||
if (domainname in self.config.noidsflaglist) or (hostname in self.config.noidsflaglist):
|
||||
ids_flag = False
|
||||
comment = "Known host (mostly for connectivity test or IP lookup)"
|
||||
if self.debug:
|
||||
syslog.syslog(str(entry))
|
||||
|
||||
if scheme:
|
||||
if is_ip(hostname):
|
||||
attribute = self.misp_event.add_attribute('url', entry, to_ids=False,
|
||||
enforceWarninglist=self.config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else:
|
||||
if resource_path: # URL has path, ignore warning list
|
||||
attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag,
|
||||
enforceWarninglist=False, comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else: # URL has no path
|
||||
attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag,
|
||||
enforceWarninglist=self.config.enforcewarninglist, comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if self.config.sighting:
|
||||
self.sightings_to_add.append((entry, self.config.sighting_source))
|
||||
|
||||
if hostname in hostname_processed:
|
||||
# Hostname already processed.
|
||||
continue
|
||||
|
||||
hostname_processed.append(hostname)
|
||||
if self.config.sighting:
|
||||
self.sightings_to_add.append((hostname, self.config.sighting_source))
|
||||
|
||||
if self.debug:
|
||||
syslog.syslog(hostname)
|
||||
|
||||
comment = ''
|
||||
port = self.f.get_port()
|
||||
if port:
|
||||
port = port
|
||||
comment = f'on port: {port}'
|
||||
|
||||
if is_ip(hostname):
|
||||
attribute = self.misp_event.add_attribute('ip-dst', hostname, to_ids=ids_flag,
|
||||
enforceWarninglist=self.config.enforcewarninglist,
|
||||
comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else:
|
||||
related_ips = []
|
||||
if HAS_DNS and self.config.enable_dns:
|
||||
try:
|
||||
syslog.syslog(hostname)
|
||||
for rdata in dns.resolver.query(hostname, 'A'):
|
||||
if self.debug:
|
||||
syslog.syslog(str(rdata))
|
||||
related_ips.append(rdata.to_text())
|
||||
except Exception as e:
|
||||
if self.debug:
|
||||
syslog.syslog(str(e))
|
||||
|
||||
if related_ips:
|
||||
hip = MISPObject(name='ip-port')
|
||||
hip.add_attribute('hostname', value=hostname, to_ids=ids_flag,
|
||||
enforceWarninglist=self.config.enforcewarninglist, comment=comment)
|
||||
for ip in set(related_ips):
|
||||
hip.add_attribute('ip', type='ip-dst', value=ip, to_ids=False,
|
||||
enforceWarninglist=self.config.enforcewarninglist)
|
||||
self.misp_event.add_object(hip)
|
||||
if email_object:
|
||||
email_object.add_reference(hip.uuid, 'contains')
|
||||
else:
|
||||
attribute = self.misp_event.add_attribute('hostname', value=hostname,
|
||||
to_ids=ids_flag, enforceWarninglist=self.config.enforcewarninglist,
|
||||
comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
|
||||
def add_event(self):
|
||||
'''Add event on the remote MISP instance.'''
|
||||
|
||||
# Add additional tags depending on others
|
||||
tags = []
|
||||
for tag in [t.name for t in self.misp_event.tags]:
|
||||
if self.config.dependingtags.get(tag):
|
||||
tags += self.config.dependingtags.get(tag)
|
||||
|
||||
# Add additional tags according to configuration
|
||||
for malware in self.config.malwaretags:
|
||||
if malware.lower() in self.subject.lower():
|
||||
tags += self.config.malwaretags.get(malware)
|
||||
if tags:
|
||||
[self.misp_event.add_tag(tag) for tag in tags]
|
||||
|
||||
has_tlp_tag = False
|
||||
for tag in [t.name for t in self.misp_event.tags]:
|
||||
if tag.lower().startswith('tlp'):
|
||||
has_tlp_tag = True
|
||||
if not has_tlp_tag:
|
||||
self.misp_event.add_tag(self.config.tlptag_default)
|
||||
|
||||
if self.offline:
|
||||
return self.misp_event.to_json()
|
||||
event = self.misp.add_event(self.misp_event)
|
||||
if self.config.sighting:
|
||||
for value, source in self.sightings_to_add:
|
||||
self.sighting(value, source)
|
||||
return event
|
||||
|
||||
def update_event(self, event_id=None):
|
||||
'''Update event on the remote MISP instance.'''
|
||||
|
||||
if self.offline:
|
||||
return self.misp_event.to_json()
|
||||
event = self.misp.update_event(self.misp_event, event_id=event_id)
|
||||
if self.config.sighting:
|
||||
for value, source in self.sightings_to_add:
|
||||
self.sighting(value, source)
|
||||
return event
|
||||
|
320
mail_to_misp.py
320
mail_to_misp.py
|
@ -1,331 +1,21 @@
|
|||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import re
|
||||
import syslog
|
||||
from pathlib import Path
|
||||
from io import BytesIO
|
||||
from ipaddress import ip_address
|
||||
from email import message_from_bytes, policy
|
||||
import importlib
|
||||
try:
|
||||
import urlmarker
|
||||
import hashmarker
|
||||
from pyfaup.faup import Faup
|
||||
from pymisp import PyMISP, MISPEvent, MISPObject, MISPSighting
|
||||
from pymisp.tools import EMailObject, make_binary_objects
|
||||
from defang import refang
|
||||
import dns.resolver
|
||||
except ImportError as e:
|
||||
print("(!) Problem loading module:")
|
||||
print(e)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
def is_ip(address):
|
||||
try:
|
||||
ip_address(address)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class Mail2MISP():
|
||||
|
||||
def __init__(self, misp_url, misp_key, verifycert, config):
|
||||
self.misp = PyMISP(misp_url, misp_key, verifycert, debug=config.debug)
|
||||
self.debug = config.debug
|
||||
self.config = config
|
||||
# Init Faup
|
||||
self.f = Faup()
|
||||
|
||||
def load_email(self, pseudofile):
|
||||
self.pseudofile = pseudofile
|
||||
self.original_mail = message_from_bytes(self.pseudofile.getvalue(), policy=policy.default)
|
||||
self.subject = self.original_mail.get('Subject')
|
||||
# Remove words from subject
|
||||
for removeword in self.config.removelist:
|
||||
self.subject = re.sub(removeword, "", self.subject)
|
||||
|
||||
# Initialize the MISP event
|
||||
self.misp_event = MISPEvent()
|
||||
self.misp_event.info = f'{config.email_subject_prefix} - {self.subject}'
|
||||
self.misp_event.distribution = config.m2m_auto_distribution
|
||||
self.misp_event.threat_level_id = 3
|
||||
self.misp_event.analysis = 1
|
||||
self.misp_event.add_tag(config.tlptag_default)
|
||||
|
||||
def sighting(self, value, source):
|
||||
'''Add a sighting'''
|
||||
s = MISPSighting()
|
||||
s.from_dict(value=value, source=source)
|
||||
self.misp.set_sightings(s)
|
||||
|
||||
def _find_inline_forward(self):
|
||||
'''Does the body contains a forwarded email?'''
|
||||
for identifier in config.forward_identifiers:
|
||||
if identifier in self.clean_email_body:
|
||||
self.clean_email_body, fw_email = self.clean_email_body.split(identifier)
|
||||
self.forwarded_email(pseudofile=BytesIO(fw_email.encode()))
|
||||
|
||||
def _find_attached_forward(self):
|
||||
for attachment in self.original_mail.iter_attachments():
|
||||
# Search for email forwarded as attachment
|
||||
# I could have more than one, attaching everything.
|
||||
if attachment.get_filename() and attachment.get_filename().endswith('.eml'):
|
||||
self.forwarded_email(pseudofile=BytesIO(attachment.get_content().as_bytes()))
|
||||
else:
|
||||
if self.config_from_email_body.get('attachment') == config.m2m_benign_attachment_keyword:
|
||||
# Attach sane file
|
||||
self.misp_event.add_attribute('attachment', value='Report',
|
||||
data=BytesIO(attachment.get_content().as_bytes()))
|
||||
#else:
|
||||
#f_object, main_object, sections = make_binary_objects(pseudofile=BytesIO(attachment.get_content()),
|
||||
# filename=attachment.get_filename(), standalone=False)
|
||||
#self.misp_event.add_object(f_object)
|
||||
#if main_object:
|
||||
# self.misp_event.add_object(main_object)
|
||||
# [self.misp_event.add_object(section) for section in sections]
|
||||
|
||||
def email_from_spamtrap(self):
|
||||
'''The email comes from a spamtrap and should be attached as-is.'''
|
||||
self.clean_email_body = self.original_mail.get_body().as_string()
|
||||
self.forwarded_email(self.pseudofile)
|
||||
|
||||
def forwarded_email(self, pseudofile: BytesIO):
|
||||
'''Extracts all possible indicators out of an email and create a MISP event out of it.
|
||||
* Gets all relevant Headers
|
||||
* Attach the body
|
||||
* Create MISP file objects (uses lief if possible)
|
||||
* Set all references
|
||||
'''
|
||||
email_object = EMailObject(pseudofile=pseudofile, attach_original_mail=True, standalone=False)
|
||||
if email_object.attachments:
|
||||
# Create file objects for the attachments
|
||||
for attachment_name, attachment in email_object.attachments:
|
||||
if not attachment_name:
|
||||
attachment_name = 'NameMissing'
|
||||
#f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
|
||||
#self.misp_event.add_object(f_object)
|
||||
#if main_object:
|
||||
# self.misp_event.add_object(main_object)
|
||||
# for section in sections:
|
||||
# self.misp_event.add_object(section)
|
||||
#email_object.add_reference(f_object.uuid, 'related-to', 'Email attachment')
|
||||
self.process_body_iocs(email_object)
|
||||
self.misp_event.add_object(email_object)
|
||||
|
||||
def process_email_body(self):
|
||||
self.clean_email_body = self.original_mail.get_body().as_string()
|
||||
# Check if there are config lines in the body & convert them to a python dictionary:
|
||||
# <config.body_config_prefix>:<key>:<value> => {<key>: <value>}
|
||||
self.config_from_email_body = {k: v for k, v in re.findall(f'{config.body_config_prefix}:(.*):(.*)', self.clean_email_body)}
|
||||
if self.config_from_email_body:
|
||||
# ... remove the config lines from the body
|
||||
self.clean_email_body = re.sub(rf'^{config.body_config_prefix}.*\n?', '',
|
||||
self.original_mail.get_body().as_string(), flags=re.MULTILINE)
|
||||
|
||||
self._find_inline_forward()
|
||||
self._find_attached_forward()
|
||||
# # Prepare extraction of IOCs
|
||||
# Refang email data
|
||||
self.clean_email_body = refang(self.clean_email_body)
|
||||
|
||||
# Depending on the source of the mail, there is some cleanup to do. Ignore lines in body of message
|
||||
for ignoreline in config.ignorelist:
|
||||
self.clean_email_body = re.sub(rf'^{ignoreline}.*\n?', '', self.clean_email_body, flags=re.MULTILINE)
|
||||
|
||||
# Check if autopublish key is present and valid
|
||||
if self.config_from_email_body.get('m2mkey') == config.m2m_key:
|
||||
self.misp_event.publish()
|
||||
|
||||
# Add tags to the event if keywords are found in the mail
|
||||
for tag in config.tlptags:
|
||||
if any(alternativetag in self.clean_email_body for alternativetag in config.tlptags[tag]):
|
||||
self.misp_event.add_tag(tag)
|
||||
|
||||
# Remove everything after the stopword from the body
|
||||
self.clean_email_body = self.clean_email_body.split(config.stopword, 1)[0]
|
||||
|
||||
def process_body_iocs(self, email_object=None):
|
||||
if email_object:
|
||||
body = email_object.email.get_body().as_string()
|
||||
else:
|
||||
body = self.clean_email_body
|
||||
# Extract and add hashes
|
||||
contains_hash = False
|
||||
for h in set(re.findall(hashmarker.MD5_REGEX, body)):
|
||||
contains_hash = True
|
||||
attribute = self.misp_event.add_attribute('md5', h, enforceWarninglist=config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if config.sighting:
|
||||
self.sighting(h, config.sighting_source)
|
||||
for h in set(re.findall(hashmarker.SHA1_REGEX, body)):
|
||||
contains_hash = True
|
||||
attribute = self.misp_event.add_attribute('sha1', h, enforceWarninglist=config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if config.sighting:
|
||||
self.sighting(h, config.sighting_source)
|
||||
for h in set(re.findall(hashmarker.SHA256_REGEX, body)):
|
||||
contains_hash = True
|
||||
attribute = self.misp_event.add_attribute('sha256', h, enforceWarninglist=config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if config.sighting:
|
||||
self.sighting(h, config.sighting_source)
|
||||
|
||||
if contains_hash:
|
||||
[self.misp_event.add_tag(tag) for tag in config.hash_only_tags]
|
||||
|
||||
# # Extract network IOCs
|
||||
urllist = []
|
||||
urllist += re.findall(urlmarker.WEB_URL_REGEX, body)
|
||||
urllist += re.findall(urlmarker.IP_REGEX, body)
|
||||
if self.debug:
|
||||
syslog.syslog(str(urllist))
|
||||
|
||||
hostname_processed = []
|
||||
|
||||
# Add IOCs and expanded information to MISP
|
||||
for entry in set(urllist):
|
||||
ids_flag = True
|
||||
self.f.decode(entry)
|
||||
|
||||
domainname = self.f.get_domain().decode()
|
||||
if domainname in config.excludelist:
|
||||
# Ignore the entry
|
||||
continue
|
||||
|
||||
hostname = self.f.get_host().decode()
|
||||
|
||||
scheme = self.f.get_scheme()
|
||||
if scheme:
|
||||
scheme = scheme.decode()
|
||||
|
||||
resource_path = self.f.get_resource_path()
|
||||
if resource_path:
|
||||
resource_path = resource_path.decode()
|
||||
|
||||
if debug:
|
||||
syslog.syslog(domainname)
|
||||
|
||||
if domainname in config.internallist: # Add link to internal reference
|
||||
attribute = self.misp_event.add_attribute('link', entry, category='Internal reference',
|
||||
to_ids=False, enforceWarninglist=False)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
elif domainname in config.externallist: # External analysis
|
||||
attribute = self.misp_event.add_attribute('link', entry, category='External analysis',
|
||||
to_ids=False, enforceWarninglist=False)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else: # The URL is probably an indicator.
|
||||
comment = ""
|
||||
if (domainname in config.noidsflaglist) or (hostname in config.noidsflaglist):
|
||||
ids_flag = False
|
||||
comment = "Known host (mostly for connectivity test or IP lookup)"
|
||||
if debug:
|
||||
syslog.syslog(str(entry))
|
||||
|
||||
if scheme:
|
||||
if is_ip(hostname):
|
||||
attribute = self.misp_event.add_attribute('url', entry, to_ids=False,
|
||||
enforceWarninglist=config.enforcewarninglist)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else:
|
||||
if resource_path: # URL has path, ignore warning list
|
||||
attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag,
|
||||
enforceWarninglist=False, comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else: # URL has no path
|
||||
attribute = self.misp_event.add_attribute('url', entry, to_ids=ids_flag,
|
||||
enforceWarninglist=config.enforcewarninglist, comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
if config.sighting:
|
||||
self.sighting(entry, config.sighting_source)
|
||||
|
||||
if hostname in hostname_processed:
|
||||
# Hostname already processed.
|
||||
continue
|
||||
|
||||
hostname_processed.append(hostname)
|
||||
if config.sighting:
|
||||
self.sighting(hostname, config.sighting_source)
|
||||
|
||||
if debug:
|
||||
syslog.syslog(hostname)
|
||||
|
||||
comment = ''
|
||||
port = self.f.get_port()
|
||||
if port:
|
||||
port = port.decode()
|
||||
comment = f'on port: {port}'
|
||||
|
||||
if is_ip(hostname):
|
||||
attribute = self.misp_event.add_attribute('ip-dst', hostname, to_ids=ids_flag,
|
||||
enforceWarninglist=config.enforcewarninglist,
|
||||
comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
else:
|
||||
related_ips = []
|
||||
try:
|
||||
for rdata in dns.resolver.query(hostname, 'A'):
|
||||
if debug:
|
||||
syslog.syslog(str(rdata))
|
||||
related_ips.append(rdata.to_text())
|
||||
except Exception as e:
|
||||
if debug:
|
||||
syslog.syslog(str(e))
|
||||
|
||||
if related_ips:
|
||||
hip = MISPObject(name='ip-port')
|
||||
hip.add_attribute('hostname', value=hostname, to_ids=ids_flag,
|
||||
enforceWarninglist=config.enforcewarninglist, comment=comment)
|
||||
for ip in set(related_ips):
|
||||
hip.add_attribute('ip', type='ip-dst', value=ip, to_ids=False,
|
||||
enforceWarninglist=config.enforcewarninglist)
|
||||
self.misp_event.add_object(hip)
|
||||
if email_object:
|
||||
email_object.add_reference(hip.uuid, 'contains')
|
||||
else:
|
||||
attribute = self.misp_event.add_attribute('hostname', value=hostname,
|
||||
to_ids=ids_flag, enforceWarninglist=config.enforcewarninglist,
|
||||
comment=comment)
|
||||
if email_object:
|
||||
email_object.add_reference(attribute.uuid, 'contains')
|
||||
|
||||
def add_event(self):
|
||||
'''Add event on the remote MISP instance.'''
|
||||
|
||||
# Add additional tags depending on others
|
||||
tags = []
|
||||
for tag in [t.name for t in self.misp_event.tags]:
|
||||
if config.dependingtags.get(tag):
|
||||
tags += config.dependingtags.get(tag)
|
||||
|
||||
# Add additional tags according to configuration
|
||||
for malware in config.malwaretags:
|
||||
if malware.lower() in self.subject.lower():
|
||||
tags += config.malwaretags.get(malware)
|
||||
if tags:
|
||||
[self.misp_event.add_tag(tag) for tag in tags]
|
||||
|
||||
self.misp.add_event(self.misp_event)
|
||||
|
||||
from mail2misp import Mail2MISP
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Push a Mail into a MISP instance')
|
||||
parser.add_argument("-r", "--read", help="Read from tempfile.")
|
||||
parser.add_argument("-t", "--trap", action='store_true', default=False, help="Import the Email as-is.")
|
||||
parser.add_argument("-e", "--event", default=False, help="Add indicators to this MISP event.")
|
||||
parser.add_argument('infile', nargs='?', type=argparse.FileType('rb'))
|
||||
args = parser.parse_args()
|
||||
|
||||
|
@ -377,5 +67,9 @@ if __name__ == '__main__':
|
|||
|
||||
mail2misp.process_body_iocs()
|
||||
|
||||
if args.event:
|
||||
misp_event = args.event
|
||||
mail2misp.update_event(event_id=misp_event)
|
||||
else:
|
||||
mail2misp.add_event()
|
||||
syslog.syslog("Job finished.")
|
||||
|
|
|
@ -4,17 +4,20 @@
|
|||
misp_url = 'YOUR_MISP_URL'
|
||||
misp_key = 'YOUR_KEY_HERE' # The MISP auth key can be found on the MISP web interface under the automation section
|
||||
misp_verifycert = True
|
||||
body_config_prefix = 'm2m' # every line in the body starting with this value will be skipped from the IOCs
|
||||
spamtrap = False
|
||||
default_distribution = 0
|
||||
default_threat_level = 3
|
||||
default_analysis = 1
|
||||
|
||||
body_config_prefix = 'm2m' # every line in the body starting with this value will be skipped from the IOCs
|
||||
m2m_key = 'YOUSETYOURKEYHERE'
|
||||
m2m_auto_distribution = '3' # 3 = All communities
|
||||
m2m_benign_attachment_keyword = 'benign'
|
||||
|
||||
enable_dns = True
|
||||
debug = False
|
||||
nameservers = ['149.13.33.69']
|
||||
email_subject_prefix = 'M2M'
|
||||
attach_original_mail = True
|
||||
attach_original_mail = False
|
||||
|
||||
excludelist = ('google.com', 'microsoft.com')
|
||||
externallist = ('virustotal.com', 'malwr.com', 'hybrid-analysis.com', 'emergingthreats.net')
|
||||
|
@ -39,8 +42,9 @@ enforcewarninglist = True
|
|||
sighting = True
|
||||
sighting_source = "YOUR_MAIL_TO_MISP_IDENTIFIER"
|
||||
|
||||
# Remove "[tags]", "Re: ", "Fwd: " from subject
|
||||
removelist = ("[\(\[].*?[\)\]]", "Re: ", "Fwd: ", "{Spam?} ")
|
||||
# Remove "Re:", "Fwd:" and {Spam?} from subject
|
||||
# add: "[\(\[].*?[\)\]]" to remove everything between [] and (): i.e. [tag]
|
||||
removelist = (r"Re:", r"Fwd:", r"\{Spam\?\} ")
|
||||
|
||||
# TLP tag setup
|
||||
# Tuples contain different variations of spelling
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
mail_to_misp.py
|
|
@ -1,6 +0,0 @@
|
|||
dnspython
|
||||
pymisp[fileobjects]
|
||||
git+https://github.com/kbandla/pydeep.git
|
||||
git+https://github.com/stricaud/faup.git#egg=pyfaup&subdirectory=src/lib/bindings/python
|
||||
git+https://github.com/Rafiot/defang.git
|
||||
aiosmtpd
|
|
@ -0,0 +1,16 @@
|
|||
-i https://pypi.org/simple
|
||||
-e git+https://github.com/MISP/PyMISP.git@d4934cdf5f537c9f42ae37be7878de1848961de0#egg=pymisp
|
||||
certifi==2018.11.29
|
||||
chardet==3.0.4
|
||||
dnspython==1.16.0
|
||||
git+https://github.com/Rafiot/defang.git@52972a25313e2899f98f1777b940cb2122566a26#egg=defang
|
||||
git+https://github.com/kbandla/pydeep.git@bc0d33bff4b45718b4c5f2c79d4715d92a427eda#egg=pydeep
|
||||
git+https://github.com/stricaud/faup.git@de31b6965fc4149c2095c7b721f456e428404736#egg=pyfaup&subdirectory=src/lib/bindings/python
|
||||
idna==2.8
|
||||
jsonschema==2.6.0
|
||||
lief==0.9.0
|
||||
python-dateutil==2.7.5
|
||||
python-magic==0.4.15
|
||||
requests==2.21.0
|
||||
six==1.12.0
|
||||
urllib3>=1.24.2
|
|
@ -0,0 +1,77 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
misp_url = 'YOUR_MISP_URL'
|
||||
misp_key = 'YOUR_KEY_HERE' # The MISP auth key can be found on the MISP web interface under the automation section
|
||||
misp_verifycert = True
|
||||
spamtrap = False
|
||||
default_distribution = 0
|
||||
default_threat_level = 3
|
||||
default_analysis = 1
|
||||
|
||||
body_config_prefix = 'm2m' # every line in the body starting with this value will be skipped from the IOCs
|
||||
m2m_key = 'YOUSETYOURKEYHERE'
|
||||
m2m_benign_attachment_keyword = 'benign'
|
||||
|
||||
debug = True
|
||||
nameservers = ['8.8.8.8']
|
||||
email_subject_prefix = 'M2M'
|
||||
attach_original_mail = True
|
||||
|
||||
excludelist = ('google.com', 'microsoft.com')
|
||||
externallist = ('virustotal.com', 'malwr.com', 'hybrid-analysis.com', 'emergingthreats.net')
|
||||
internallist = ('internal.system.local')
|
||||
noidsflaglist = ('myexternalip.com', 'ipinfo.io', 'icanhazip.com', 'wtfismyip.com', 'ipecho.net',
|
||||
'api.ipify.org', 'checkip.amazonaws.com', 'whatismyipaddress.com', 'google.com',
|
||||
'dropbox.com'
|
||||
)
|
||||
|
||||
# Stop parsing when this term is found
|
||||
stopword = 'Whois & IP Information'
|
||||
|
||||
# Ignore lines in body of message containing:
|
||||
ignorelist = ("From:", "Sender:", "Received:", "Sender IP:", "Reply-To:", "Registrar WHOIS Server:",
|
||||
"Registrar:", "Domain Status:", "Registrant Email:", "IP Location:",
|
||||
"X-Get-Message-Sender-Via:", "X-Authenticated-Sender:")
|
||||
|
||||
# Ignore (don't add) attributes that are on server side warning list
|
||||
enforcewarninglist = True
|
||||
|
||||
# Add a sighting for each value
|
||||
sighting = False
|
||||
sighting_source = "YOUR_MAIL_TO_MISP_IDENTIFIER"
|
||||
|
||||
# Remove "Re:", "Fwd:" and {Spam?} from subject
|
||||
# add: "[\(\[].*?[\)\]]" to remove everything between [] and (): i.e. [tag]
|
||||
removelist = (r'Re:', r'Fwd:', r'\{Spam?\}')
|
||||
|
||||
# TLP tag setup
|
||||
# Tuples contain different variations of spelling
|
||||
tlptags = {'tlp:amber': ['tlp:amber', 'tlp: amber', 'tlp amber'],
|
||||
'tlp:green': ['tlp:green', 'tlp: green', 'tlp green'],
|
||||
'tlp:white': ['tlp:white', 'tlp: white', 'tlp white']
|
||||
}
|
||||
tlptag_default = sorted(tlptags.keys())[0]
|
||||
|
||||
malwaretags = {'locky': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Locky"'],
|
||||
'jaff': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Jaff"'],
|
||||
'dridex': ['misp-galaxy:tool="dridex"'],
|
||||
'netwire': ['Netwire RAT'],
|
||||
'Pony': ['misp-galaxy:tool="Hancitor"'],
|
||||
'ursnif': ['misp-galaxy:tool="Snifula"'],
|
||||
'NanoCore': ['misp-galaxy:tool="NanoCoreRAT"'],
|
||||
'trickbot': ['misp-galaxy:tool="Trick Bot"']
|
||||
}
|
||||
|
||||
# Tags to be set depending on the presence of other tags
|
||||
dependingtags = {'tlp:white': ['circl:osint-feed']
|
||||
}
|
||||
|
||||
# Known identifiers for forwarded messages
|
||||
forward_identifiers = {'-------- Forwarded Message --------', 'Begin forwarded message:'}
|
||||
|
||||
# Tags to add when hashes are found (e.g. to do automatic expansion)
|
||||
hash_only_tags = {'TODO:VT-ENRICHMENT'}
|
||||
|
||||
# If an attribute is on any MISP server side `warning list`, skip the creation of the attribute
|
||||
skip_item_on_warninglist = True
|
|
@ -0,0 +1,77 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
misp_url = 'YOUR_MISP_URL'
|
||||
misp_key = 'YOUR_KEY_HERE' # The MISP auth key can be found on the MISP web interface under the automation section
|
||||
misp_verifycert = True
|
||||
spamtrap = True
|
||||
default_distribution = 0
|
||||
default_threat_level = 3
|
||||
default_analysis = 1
|
||||
|
||||
body_config_prefix = 'm2m' # every line in the body starting with this value will be skipped from the IOCs
|
||||
m2m_key = 'YOUSETYOURKEYHERE'
|
||||
m2m_benign_attachment_keyword = 'benign'
|
||||
|
||||
debug = True
|
||||
nameservers = ['8.8.8.8']
|
||||
email_subject_prefix = 'M2M'
|
||||
attach_original_mail = True
|
||||
|
||||
excludelist = ('google.com', 'microsoft.com')
|
||||
externallist = ('virustotal.com', 'malwr.com', 'hybrid-analysis.com', 'emergingthreats.net')
|
||||
internallist = ('internal.system.local')
|
||||
noidsflaglist = ('myexternalip.com', 'ipinfo.io', 'icanhazip.com', 'wtfismyip.com', 'ipecho.net',
|
||||
'api.ipify.org', 'checkip.amazonaws.com', 'whatismyipaddress.com', 'google.com',
|
||||
'dropbox.com'
|
||||
)
|
||||
|
||||
# Stop parsing when this term is found
|
||||
stopword = 'Whois & IP Information'
|
||||
|
||||
# Ignore lines in body of message containing:
|
||||
ignorelist = ("From:", "Sender:", "Received:", "Sender IP:", "Reply-To:", "Registrar WHOIS Server:",
|
||||
"Registrar:", "Domain Status:", "Registrant Email:", "IP Location:",
|
||||
"X-Get-Message-Sender-Via:", "X-Authenticated-Sender:")
|
||||
|
||||
# Ignore (don't add) attributes that are on server side warning list
|
||||
enforcewarninglist = True
|
||||
|
||||
# Add a sighting for each value
|
||||
sighting = False
|
||||
sighting_source = "YOUR_MAIL_TO_MISP_IDENTIFIER"
|
||||
|
||||
# Remove "Re:", "Fwd:" and {Spam?} from subject
|
||||
# add: "[\(\[].*?[\)\]]" to remove everything between [] and (): i.e. [tag]
|
||||
removelist = (r'Re:', r'Fwd:', r'\{Spam\?\}')
|
||||
|
||||
# TLP tag setup
|
||||
# Tuples contain different variations of spelling
|
||||
tlptags = {'tlp:amber': ['tlp:amber', 'tlp: amber', 'tlp amber'],
|
||||
'tlp:green': ['tlp:green', 'tlp: green', 'tlp green'],
|
||||
'tlp:white': ['tlp:white', 'tlp: white', 'tlp white']
|
||||
}
|
||||
tlptag_default = sorted(tlptags.keys())[0]
|
||||
|
||||
malwaretags = {'locky': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Locky"'],
|
||||
'jaff': ['ecsirt:malicious-code="ransomware"', 'misp-galaxy:ransomware="Jaff"'],
|
||||
'dridex': ['misp-galaxy:tool="dridex"'],
|
||||
'netwire': ['Netwire RAT'],
|
||||
'Pony': ['misp-galaxy:tool="Hancitor"'],
|
||||
'ursnif': ['misp-galaxy:tool="Snifula"'],
|
||||
'NanoCore': ['misp-galaxy:tool="NanoCoreRAT"'],
|
||||
'trickbot': ['misp-galaxy:tool="Trick Bot"']
|
||||
}
|
||||
|
||||
# Tags to be set depending on the presence of other tags
|
||||
dependingtags = {'tlp:white': ['circl:osint-feed']
|
||||
}
|
||||
|
||||
# Known identifiers for forwarded messages
|
||||
forward_identifiers = {'-------- Forwarded Message --------', 'Begin forwarded message:'}
|
||||
|
||||
# Tags to add when hashes are found (e.g. to do automatic expansion)
|
||||
hash_only_tags = {'TODO:VT-ENRICHMENT'}
|
||||
|
||||
# If an attribute is on any MISP server side `warning list`, skip the creation of the attribute
|
||||
skip_item_on_warninglist = True
|
|
@ -0,0 +1 @@
|
|||
Subproject commit a20daf72071403b01bdf3816d6de533b0a4d2c40
|
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import unittest
|
||||
import importlib
|
||||
import sys
|
||||
from io import BytesIO
|
||||
sys.path.insert(0, ".")
|
||||
|
||||
from mail2misp import Mail2MISP
|
||||
|
||||
|
||||
class TestMailToMISP(unittest.TestCase):
|
||||
|
||||
def test_spamtrap(self):
|
||||
config = importlib.import_module('tests.config_spamtrap')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/simple_spamtrap.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.email_from_spamtrap()
|
||||
self.mail2misp.process_body_iocs()
|
||||
event = self.mail2misp.add_event()
|
||||
print(event)
|
||||
|
||||
def test_spamtrap_attachment(self):
|
||||
config = importlib.import_module('tests.config_spamtrap')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/attachment_spamtrap.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.email_from_spamtrap()
|
||||
self.mail2misp.process_body_iocs()
|
||||
event = self.mail2misp.add_event()
|
||||
print(event)
|
||||
|
||||
def test_forward(self):
|
||||
config = importlib.import_module('tests.config_forward')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/simple_forward.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.process_email_body()
|
||||
self.mail2misp.process_body_iocs()
|
||||
event = self.mail2misp.add_event()
|
||||
print(event)
|
||||
|
||||
def test_forward_attachment(self):
|
||||
config = importlib.import_module('tests.config_forward')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/attachment_forward.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.process_email_body()
|
||||
self.mail2misp.process_body_iocs()
|
||||
event = self.mail2misp.add_event()
|
||||
print(event)
|
||||
|
||||
def test_benign(self):
|
||||
config = importlib.import_module('tests.config_forward')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/test_benign.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.process_email_body()
|
||||
self.mail2misp.process_body_iocs()
|
||||
self.assertTrue('attachment' in [a.type for a in self.mail2misp.misp_event.attributes])
|
||||
self.assertTrue(self.mail2misp.misp_event.publish)
|
||||
|
||||
def test_textfile(self):
|
||||
config = importlib.import_module('tests.config_forward')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/test_textattachment.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.process_email_body()
|
||||
|
||||
def test_meta_event(self):
|
||||
config = importlib.import_module('tests.config_forward')
|
||||
self.mail2misp = Mail2MISP('', '', '', config=config, offline=True)
|
||||
with open('tests/mails/test_meta.eml', 'rb') as f:
|
||||
self.mail2misp.load_email(BytesIO(f.read()))
|
||||
self.mail2misp.process_email_body()
|
||||
self.mail2misp.process_body_iocs()
|
||||
self.assertTrue(self.mail2misp.misp_event.publish)
|
||||
self.assertEqual(self.mail2misp.misp_event.distribution, '3')
|
||||
self.assertEqual(self.mail2misp.misp_event.threat_level_id, '2')
|
||||
self.assertEqual(self.mail2misp.misp_event.analysis, '0')
|
||||
self.mail2misp.add_event()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue