2016-10-22 23:13:20 +02:00
#!/usr/bin/env python3
2016-06-18 07:53:26 +02:00
# -*- coding: utf-8 -*-
import unittest
import requests
2016-08-12 13:16:49 +02:00
import base64
import json
2017-01-11 23:53:54 +01:00
import os
2016-12-26 23:38:28 +01:00
import io
2017-07-02 23:07:09 +02:00
import re
2016-12-26 23:38:28 +01:00
import zipfile
2017-01-10 15:46:04 +01:00
from hashlib import sha256
2016-12-26 23:38:28 +01:00
from email . mime . application import MIMEApplication
from email . mime . text import MIMEText
from email . mime . multipart import MIMEMultipart
2017-01-11 23:53:54 +01:00
from email . header import Header
2016-06-18 07:53:26 +02:00
class TestModules ( unittest . TestCase ) :
def setUp ( self ) :
self . maxDiff = None
self . headers = { ' Content-Type ' : ' application/json ' }
2016-08-12 13:16:49 +02:00
self . url = " http://127.0.0.1:6666/ "
2016-06-18 07:53:26 +02:00
def test_introspection ( self ) :
2016-08-12 13:16:49 +02:00
response = requests . get ( self . url + " modules " )
2016-06-18 07:53:26 +02:00
print ( response . json ( ) )
2016-10-22 23:13:20 +02:00
response . connection . close ( )
2016-06-18 07:53:26 +02:00
def test_cve ( self ) :
with open ( ' tests/bodycve.json ' , ' r ' ) as f :
2016-08-12 13:16:49 +02:00
response = requests . post ( self . url + " query " , data = f . read ( ) )
2016-06-18 07:53:26 +02:00
print ( response . json ( ) )
2016-10-22 23:13:20 +02:00
response . connection . close ( )
2016-06-18 07:53:26 +02:00
def test_dns ( self ) :
with open ( ' tests/body.json ' , ' r ' ) as f :
2016-08-12 13:16:49 +02:00
response = requests . post ( self . url + " query " , data = f . read ( ) )
print ( response . json ( ) )
2016-10-22 23:13:20 +02:00
response . connection . close ( )
2016-08-25 17:36:28 +02:00
with open ( ' tests/body_timeout.json ' , ' r ' ) as f :
response = requests . post ( self . url + " query " , data = f . read ( ) )
print ( response . json ( ) )
2016-10-22 23:13:20 +02:00
response . connection . close ( )
2016-08-12 13:16:49 +02:00
2017-02-27 13:32:31 +01:00
def test_openioc ( self ) :
with open ( " tests/openioc.xml " , " rb " ) as f :
content = base64 . b64encode ( f . read ( ) )
data = json . dumps ( { " module " : " openiocimport " ,
" data " : content . decode ( ) ,
} )
response = requests . post ( self . url + " query " , data = data ) . json ( )
print ( response )
print ( " OpenIOC :: {} " . format ( response ) )
values = [ x [ " values " ] [ 0 ] for x in response [ " results " ] ]
assert ( " mrxcls.sys " in values )
assert ( " mdmcpq3.PNF " in values )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-10-22 23:13:20 +02:00
def test_email_headers ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
results = response . json ( ) [ ' results ' ]
values = [ x [ " values " ] for x in results ]
types = { }
for i in results :
2017-01-08 00:36:08 +01:00
types . setdefault ( i [ " type " ] , 0 )
types [ i [ " type " ] ] + = 1
2016-12-26 23:38:28 +01:00
# Check that there are the appropriate number of items
# Check that all the items were correct
self . assertEqual ( types [ ' target-email ' ] , 1 )
self . assertIn ( ' test@domain.com ' , values )
self . assertEqual ( types [ ' email-dst-display-name ' ] , 4 )
self . assertIn ( ' Last One ' , values )
self . assertIn ( ' Other Friend ' , values )
self . assertIn ( ' Second Person ' , values )
self . assertIn ( ' Testy Testerson ' , values )
self . assertEqual ( types [ ' email-dst ' ] , 4 )
self . assertIn ( ' test@domain.com ' , values )
self . assertIn ( ' second@domain.com ' , values )
self . assertIn ( ' other@friend.net ' , values )
self . assertIn ( ' last_one@finally.com ' , values )
self . assertEqual ( types [ ' email-src-display-name ' ] , 2 )
self . assertIn ( " Innocent Person " , values )
self . assertEqual ( types [ ' email-src ' ] , 2 )
self . assertIn ( " evil_spoofer@example.com " , values )
self . assertIn ( " IgnoreMeImInnocent@sender.com " , values )
self . assertEqual ( types [ ' email-thread-index ' ] , 1 )
self . assertIn ( ' AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA== ' , values )
self . assertEqual ( types [ ' email-message-id ' ] , 1 )
self . assertIn ( " <4988EF2D.40804@example.com> " , values )
self . assertEqual ( types [ ' email-subject ' ] , 1 )
self . assertIn ( " Example Message " , values )
self . assertEqual ( types [ ' email-header ' ] , 1 )
self . assertEqual ( types [ ' email-x-mailer ' ] , 1 )
self . assertIn ( " mlx 5.1.7 " , values )
self . assertEqual ( types [ ' email-reply-to ' ] , 1 )
self . assertIn ( " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " , values )
2016-10-22 23:13:20 +02:00
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:09:52 +01:00
def test_email_attachment_basic ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/EICAR.com " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' com ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ ' results ' ] ]
self . assertIn ( ' EICAR.com ' , values )
for i in response . json ( ) [ ' results ' ] :
2017-01-08 00:36:08 +01:00
if i [ " type " ] == ' email-attachment ' :
2016-12-26 23:38:28 +01:00
self . assertEqual ( i [ " values " ] , " EICAR.com " )
2017-01-08 00:36:08 +01:00
if i [ ' type ' ] == ' malware-sample ' :
2016-12-26 23:38:28 +01:00
attch_data = base64 . b64decode ( i [ " data " ] )
2017-01-08 00:36:08 +01:00
self . assertEqual ( attch_data , b ' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2016-12-26 23:09:52 +01:00
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_attachment_unpack ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/EICAR.com.zip " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com.zip " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' EICAR.com ' , values )
self . assertIn ( ' EICAR.com.zip ' , values )
for i in response . json ( ) [ ' results ' ] :
2017-01-08 00:36:08 +01:00
if i [ ' type ' ] == ' malware-sample ' and i [ " values " ] == ' EICAR.com.zip ' :
2016-12-26 23:38:28 +01:00
with zipfile . ZipFile ( io . BytesIO ( base64 . b64decode ( i [ " data " ] ) ) , ' r ' ) as zf :
with zf . open ( " EICAR.com " ) as ec :
attch_data = ec . read ( )
self . assertEqual ( attch_data ,
b ' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2017-01-08 00:36:08 +01:00
if i [ ' type ' ] == ' malware-sample ' and i [ " values " ] == ' EICAR.com ' :
2016-12-26 23:38:28 +01:00
attch_data = base64 . b64decode ( i [ " data " ] )
self . assertEqual ( attch_data ,
b ' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2019-12-04 17:03:16 +01:00
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-01-10 15:46:04 +01:00
def test_email_dont_unpack_compressed_doc_attachments ( self ) :
""" Ensures that compressed
"""
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2017-01-10 15:46:04 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/test_files/test.docx " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " test.docx " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' test.docx ' , values )
types = { }
for i in response . json ( ) [ ' results ' ] :
types . setdefault ( i [ " type " ] , 0 )
types [ i [ " type " ] ] + = 1
# Check that there is only one attachment in the bundle
self . assertEqual ( types [ ' malware-sample ' ] , 1 )
for i in response . json ( ) [ ' results ' ] :
if i [ ' type ' ] == ' malware-sample ' and i [ " values " ] == ' test.docx ' :
attch_data = base64 . b64decode ( i [ " data " ] )
filesum = sha256 ( )
filesum . update ( attch_data )
self . assertEqual ( filesum . hexdigest ( ) ,
' 098da5381a90d4a51e6b844c18a0fecf2e364813c2f8b317cfdc51c21f2506a5 ' )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_attachment_unpack_with_password ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : ' true ' ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/infected.zip " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com.zip " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' EICAR.com ' , values )
self . assertIn ( ' EICAR.com.zip ' , values )
for i in response . json ( ) [ ' results ' ] :
2017-01-08 00:36:08 +01:00
if i [ ' type ' ] == ' malware-sample ' and i [ " values " ] == ' EICAR.com.zip ' :
2016-12-26 23:38:28 +01:00
with zipfile . ZipFile ( io . BytesIO ( base64 . b64decode ( i [ " data " ] ) ) , ' r ' ) as zf :
# Make sure password was set and still in place
self . assertRaises ( RuntimeError , zf . open , " EICAR.com " )
2017-01-08 00:36:08 +01:00
if i [ ' type ' ] == ' malware-sample ' and i [ " values " ] == ' EICAR.com ' :
2016-12-26 23:38:28 +01:00
attch_data = base64 . b64decode ( i [ " data " ] )
self . assertEqual ( attch_data ,
b ' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_attachment_password_in_body ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : ' true ' ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a -> STRINGS <- test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/short_password.zip " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com.zip " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' EICAR.com ' , values )
for i in response . json ( ) [ ' results ' ] :
if i [ " values " ] == ' EICAR.com ' :
attch_data = base64 . b64decode ( i [ " data " ] ) . decode ( )
self . assertEqual ( attch_data ,
' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_attachment_password_in_body_quotes ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : ' true ' ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail
the password is " a long password " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/longer_password.zip " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com.zip " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' EICAR.com ' , values )
for i in response . json ( ) [ ' results ' ] :
# Check that it could be extracted.
2017-01-08 00:36:08 +01:00
if i [ ' type ' ] == ' malware-sample ' and i [ " values " ] == ' EICAR.com ' :
2016-12-26 23:38:28 +01:00
attch_data = base64 . b64decode ( i [ " data " ] ) . decode ( )
self . assertEqual ( attch_data ,
' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_attachment_password_in_html_body ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : ' true ' ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
html = """ \
< html >
< head > < / head >
< body >
< p > Hi ! < br >
This is the real password ? < br >
It is " a long password " .
< / p >
< / body >
< / html >
"""
message . attach ( MIMEText ( text , ' plain ' ) )
message . attach ( MIMEText ( html , ' html ' ) )
with open ( " tests/longer_password.zip " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com.zip " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' EICAR.com ' , values )
for i in response . json ( ) [ ' results ' ] :
# Check that it could be extracted.
if i [ " values " ] == ' EICAR.com ' :
attch_data = base64 . b64decode ( i [ " data " ] ) . decode ( )
self . assertEqual ( attch_data ,
' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-01-11 23:53:54 +01:00
def test_email_body_encoding ( self ) :
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
filenames = os . listdir ( " tests/test_files/encodings " )
for fn in filenames :
message = get_base_email ( )
encoding = os . path . splitext ( fn )
with open ( " tests/test_files/encodings/ {0} " . format ( fn ) , " r " , encoding = encoding [ 0 ] ) as fp :
# Encoding is used as the name of the file
text = fp . read ( )
message . attach ( MIMEText ( text , ' html ' , encoding [ 0 ] ) )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
2017-07-02 23:07:09 +02:00
response = requests . post ( self . url + " query " , data = data ) . json ( )
self . assertNotIn ( ' error ' , response , response . get ( ' error ' , " " ) )
self . assertIn ( ' results ' , response , " No server results found. " )
2017-01-11 23:53:54 +01:00
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-07-02 23:07:09 +02:00
def test_email_header_proper_encoding ( self ) :
2017-01-11 23:53:54 +01:00
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
filenames = os . listdir ( " tests/test_files/encodings " )
for encoding in [ ' utf-8 ' , ' utf-16 ' , ' utf-32 ' ] :
message = get_base_email ( )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
for hdr , hdr_val in message . items ( ) :
msg = message
2017-07-02 23:07:09 +02:00
encoded_header = hdr_val . encode ( encoding )
msg . replace_header ( hdr , Header ( encoded_header , encoding ) )
2017-01-11 23:53:54 +01:00
query [ ' data ' ] = decode_email ( msg )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
2017-07-02 23:07:09 +02:00
results = response . json ( ) [ ' results ' ]
values = [ ]
for x in results :
# Remove BOM from UTF-16 strings
if re . search ( ' \ufeff ' , x [ " values " ] ) :
values . append ( re . sub ( ' \ufeff ' , " " , x [ " values " ] ) )
else :
values . append ( x [ " values " ] )
types = { }
for i in results :
types . setdefault ( i [ " type " ] , 0 )
types [ i [ " type " ] ] + = 1
# Check that all the items were correct
self . assertEqual ( types [ ' target-email ' ] , 1 )
self . assertIn ( ' test@domain.com ' , values )
self . assertEqual ( types [ ' email-dst-display-name ' ] , 4 )
self . assertIn ( ' Last One ' , values )
self . assertIn ( ' Other Friend ' , values )
self . assertIn ( ' Second Person ' , values )
self . assertIn ( ' Testy Testerson ' , values )
self . assertEqual ( types [ ' email-dst ' ] , 4 )
self . assertIn ( ' test@domain.com ' , values )
self . assertIn ( ' second@domain.com ' , values )
self . assertIn ( ' other@friend.net ' , values )
self . assertIn ( ' last_one@finally.com ' , values )
self . assertEqual ( types [ ' email-src-display-name ' ] , 2 )
self . assertIn ( " Innocent Person " , values )
self . assertEqual ( types [ ' email-src ' ] , 2 )
self . assertIn ( " evil_spoofer@example.com " , values )
self . assertIn ( " IgnoreMeImInnocent@sender.com " , values )
self . assertEqual ( types [ ' email-thread-index ' ] , 1 )
self . assertIn ( ' AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA== ' , values )
self . assertEqual ( types [ ' email-message-id ' ] , 1 )
self . assertIn ( " <4988EF2D.40804@example.com> " , values )
self . assertEqual ( types [ ' email-subject ' ] , 1 )
self . assertIn ( " Example Message " , values )
self . assertEqual ( types [ ' email-header ' ] , 1 )
self . assertEqual ( types [ ' email-x-mailer ' ] , 1 )
self . assertIn ( " mlx 5.1.7 " , values )
self . assertEqual ( types [ ' email-reply-to ' ] , 1 )
self . assertIn ( " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " , values )
self . assertIn ( " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " , values )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-07-02 23:07:09 +02:00
def test_email_header_malformed_encoding ( self ) :
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
filenames = os . listdir ( " tests/test_files/encodings " )
for encoding in [ ' utf-8 ' , ' utf-16 ' , ' utf-32 ' ] :
message = get_base_email ( )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
for hdr , hdr_val in message . items ( ) :
msg = message
encoded_header = hdr_val . encode ( encoding )
pat = re . compile ( hdr_val . encode ( ) )
message_bytes = pat . sub ( encoded_header , msg . as_bytes ( ) )
message64 = base64 . b64encode ( message_bytes ) . decode ( )
query [ ' data ' ] = message64
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
results = response . json ( ) [ ' results ' ]
values = [ ]
for x in results :
# Remove BOM from UTF-16 strings
if re . search ( ' \ufeff ' , x [ " values " ] ) :
values . append ( re . sub ( ' \ufeff ' , " " , x [ " values " ] ) )
else :
values . append ( x [ " values " ] )
types = { }
for i in results :
types . setdefault ( i [ " type " ] , 0 )
types [ i [ " type " ] ] + = 1
# Check that all the items were correct
self . assertEqual ( types [ ' target-email ' ] , 1 )
self . assertIn ( ' test@domain.com ' , values )
self . assertEqual ( types [ ' email-dst-display-name ' ] , 4 )
self . assertIn ( ' Last One ' , values )
self . assertIn ( ' Other Friend ' , values )
self . assertIn ( ' Second Person ' , values )
self . assertIn ( ' Testy Testerson ' , values )
self . assertEqual ( types [ ' email-dst ' ] , 4 )
self . assertIn ( ' test@domain.com ' , values )
self . assertIn ( ' second@domain.com ' , values )
self . assertIn ( ' other@friend.net ' , values )
self . assertIn ( ' last_one@finally.com ' , values )
self . assertEqual ( types [ ' email-src-display-name ' ] , 2 )
self . assertIn ( " Innocent Person " , values )
self . assertEqual ( types [ ' email-src ' ] , 2 )
self . assertIn ( " evil_spoofer@example.com " , values )
self . assertIn ( " IgnoreMeImInnocent@sender.com " , values )
self . assertEqual ( types [ ' email-thread-index ' ] , 1 )
self . assertIn ( ' AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA== ' , values )
self . assertEqual ( types [ ' email-message-id ' ] , 1 )
self . assertIn ( " <4988EF2D.40804@example.com> " , values )
self . assertEqual ( types [ ' email-subject ' ] , 1 )
self . assertIn ( " Example Message " , values )
self . assertEqual ( types [ ' email-header ' ] , 1 )
self . assertEqual ( types [ ' email-x-mailer ' ] , 1 )
self . assertIn ( " mlx 5.1.7 " , values )
self . assertEqual ( types [ ' email-reply-to ' ] , 1 )
self . assertIn ( " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " , values )
self . assertIn ( " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " , values )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-07-02 23:07:09 +02:00
def test_email_header_CJK_encoding ( self ) :
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email ( )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
japanese_charset = " ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合 "
jisx213 = Header ( japanese_charset , ' euc_jisx0213 ' )
message . replace_header ( " Subject " , jisx213 )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
# Parse Response
RFC_format = ' =?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?= '
for i in response . json ( ) [ ' results ' ] :
if i [ ' type ' ] == ' email-subject ' :
RFC_encoding_error = " The subject was not decoded from RFC2047 format. "
self . assertNotEqual ( RFC_format , i [ ' values ' ] , RFC_encoding_error )
self . assertEqual ( japanese_charset , i [ ' values ' ] , " Subject not properly decoded " )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-07-02 23:07:09 +02:00
def test_email_malformed_header_CJK_encoding ( self ) :
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email ( )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
japanese_charset = " ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合 "
japanese_bytes = japanese_charset . encode ( )
message . replace_header ( ' Subject ' , " {{ REPLACE}} " )
pat = re . compile ( b ' {{ REPLACE}} ' )
message_bytes = pat . sub ( japanese_bytes , message . as_bytes ( ) )
message64 = base64 . b64encode ( message_bytes ) . decode ( )
query [ ' data ' ] = message64
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
# Parse Response
RFC_format = ' =?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?= '
for i in response . json ( ) [ ' results ' ] :
if i [ ' type ' ] == ' email-subject ' :
RFC_encoding_error = " The subject was not decoded from RFC2047 format. "
self . assertNotEqual ( RFC_format , i [ ' values ' ] , RFC_encoding_error )
self . assertEqual ( japanese_charset , i [ ' values ' ] , " Subject not properly decoded " )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-07-02 23:07:09 +02:00
def test_email_malformed_header_emoji_encoding ( self ) :
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
# filenames = os.listdir("tests/test_files/encodings")
# for encoding in ['utf-8', 'utf-16', 'utf-32']:
message = get_base_email ( )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
emoji_string = " Emoji Test 👍 checking this "
emoji_bytes = emoji_string . encode ( )
message . replace_header ( ' Subject ' , " {{ EMOJI}} " )
pat = re . compile ( b ' {{ EMOJI}} ' )
message_bytes = pat . sub ( emoji_bytes , message . as_bytes ( ) )
message64 = base64 . b64encode ( message_bytes ) . decode ( )
query [ ' data ' ] = message64
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
# Parse Response
RFC_format = " =?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?= "
for i in response . json ( ) [ ' results ' ] :
if i [ ' type ' ] == ' email-subject ' :
RFC_encoding_error = " The subject was not decoded from RFC2047 format. "
self . assertNotEqual ( RFC_format , i [ ' values ' ] , RFC_encoding_error )
self . assertEqual ( emoji_string , i [ ' values ' ] , " Subject not properly decoded " )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2017-07-02 23:07:09 +02:00
def test_email_attachment_emoji_filename ( self ) :
query = { " module " : " email_import " }
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : None }
message = get_base_email ( )
text = """ I am a test e-mail """
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/EICAR.com " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' com ' )
eicar_mime . add_header ( ' Content-Disposition ' ,
' attachment ' ,
filename = " Emoji Test 👍 checking this " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ ' results ' ] ]
self . assertIn ( " Emoji Test 👍 checking this " , values )
for i in response . json ( ) [ ' results ' ] :
if i [ " type " ] == ' email-attachment ' :
self . assertEqual ( i [ " values " ] , " Emoji Test 👍 checking this " )
if i [ ' type ' ] == ' malware-sample ' :
attch_data = base64 . b64decode ( i [ " data " ] )
self . assertEqual ( attch_data , b ' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2017-01-11 23:53:54 +01:00
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_attachment_password_in_subject ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : " true " ,
" guess_zip_attachment_passwords " : ' true ' ,
" extract_urls " : None }
message = get_base_email ( )
message . replace_header ( " Subject " , ' I contain the -> " a long password " <- that is the password ' )
text = """ I am a test e-mail
the password is NOT " this string " .
That is all .
"""
message . attach ( MIMEText ( text , ' plain ' ) )
with open ( " tests/longer_password.zip " , " rb " ) as fp :
eicar_mime = MIMEApplication ( fp . read ( ) , ' zip ' )
eicar_mime . add_header ( ' Content-Disposition ' , ' attachment ' , filename = " EICAR.com.zip " )
message . attach ( eicar_mime )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( ' EICAR.com ' , values )
self . assertIn ( ' I contain the -> " a long password " <- that is the password ' , values )
for i in response . json ( ) [ ' results ' ] :
# Check that it could be extracted.
if i [ " values " ] == ' EICAR.com ' :
attch_data = base64 . b64decode ( i [ " data " ] ) . decode ( )
self . assertEqual ( attch_data ,
' X5O!P % @AP[4 \\ PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST- ' )
2019-12-04 17:46:09 +01:00
@unittest.skip ( " Need Rewrite " )
2016-12-26 23:38:28 +01:00
def test_email_extract_html_body_urls ( self ) :
2017-02-27 13:32:31 +01:00
query = { " module " : " email_import " }
2016-12-26 23:38:28 +01:00
query [ " config " ] = { " unzip_attachments " : None ,
" guess_zip_attachment_passwords " : None ,
" extract_urls " : " true " }
message = get_base_email ( )
text = """ I am a test e-mail
That is all .
"""
html = """ \
< html >
< head > < / head >
< body >
< p > Hi ! < br >
< p > MISP modules are autonomous modules that can be used for expansion and other services in < a href = " https://github.com/MISP/MISP " > MISP < / a > . < / p >
< p > The modules are written in Python 3 following a simple API interface . The objective is to ease the extensions of MISP functionalities
without modifying core components . The API is available via a simple REST API which is independent from MISP installation or configuration . < / p >
< p > MISP modules support is included in MISP starting from version 2.4 .28 . < / p >
< p > For more information : < a href = " https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf " > Extending MISP with Python modules < / a > slides from MISP training . < / p >
< / p >
< / body >
< / html >
"""
message . attach ( MIMEText ( text , ' plain ' ) )
message . attach ( MIMEText ( html , ' html ' ) )
query [ ' data ' ] = decode_email ( message )
data = json . dumps ( query )
response = requests . post ( self . url + " query " , data = data )
2017-02-27 13:32:31 +01:00
# print(response.json())
2016-12-26 23:38:28 +01:00
values = [ x [ " values " ] for x in response . json ( ) [ " results " ] ]
self . assertIn ( " https://github.com/MISP/MISP " , values )
self . assertIn ( " https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf " , values )
2016-12-26 23:09:52 +01:00
2017-02-27 13:32:31 +01:00
# def test_domaintools(self):
2016-12-02 16:16:25 +01:00
# query = {'config': {'username': 'test_user', 'api_key': 'test_key'}, 'module': 'domaintools', 'domain': 'domaintools.com'}
# try:
# response = requests.post(self.url + "query", data=json.dumps(query)).json()
# except:
# pass
# response = requests.post(self.url + "query", data=json.dumps(query)).json()
# print(response)
2016-12-02 15:29:44 +01:00
2017-02-27 13:32:31 +01:00
2016-12-26 23:38:28 +01:00
def decode_email ( message ) :
message64 = base64 . b64encode ( message . as_bytes ( ) ) . decode ( )
return message64
def get_base_email ( ) :
2017-02-27 13:32:31 +01:00
headers = { " Received " : " via dmail-2008.19 for +INBOX; Tue, 3 Feb 2009 19:29:12 -0600 (CST) " ,
" Received " : " from abc.luxsci.com ([10.10.10.10]) by xyz.luxsci.com (8.13.7/8.13.7) with ESMTP id n141TCa7022588 for <test@domain.com>; Tue, 3 Feb 2009 19:29:12 -0600 " ,
" Received " : " from [192.168.0.3] (verizon.net [44.44.44.44]) (user=test@sender.com mech=PLAIN bits=2) by abc.luxsci.com (8.13.7/8.13.7) with ESMTP id n141SAfo021855 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=NOT) for <test@domain.com>; Tue, 3 Feb 2009 19:28:10 -0600 " ,
" X-Received " : " by 192.168.0.45 with SMTP id q4mr156123401yw1g.911.1912342394963; Tue, 3 Feb 2009 19:32:15 -0600 (PST) " ,
" Message-ID " : " <4988EF2D.40804@example.com> " ,
" Date " : " Tue, 03 Feb 2009 20:28:13 -0500 " ,
" From " : ' " Innocent Person " <IgnoreMeImInnocent@sender.com> ' ,
" User-Agent " : ' Thunderbird 2.0.0.19 (Windows/20081209) ' ,
" Sender " : ' " Malicious MailAgent " <mailagent@example.com> ' ,
" References " : " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " ,
" In-Reply-To " : " <CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com> " ,
" Accept-Language " : ' en-US ' ,
" X-Mailer " : ' mlx 5.1.7 ' ,
2016-12-26 23:38:28 +01:00
" Return-Path " : " evil_spoofer@example.com " ,
2017-02-27 13:32:31 +01:00
" Thread-Topic " : ' This is a thread. ' ,
" Thread-Index " : ' AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA== ' ,
" Content-Language " : ' en-US ' ,
" To " : ' " Testy Testerson " <test@domain.com> ' ,
" Cc " : ' " Second Person " <second@domain.com>, " Other Friend " <other@friend.net>, " Last One " <last_one@finally.com> ' ,
" Subject " : ' Example Message ' ,
" MIME-Version " : ' 1.0 ' }
2016-12-26 23:38:28 +01:00
msg = MIMEMultipart ( )
for key , val in headers . items ( ) :
msg . add_header ( key , val )
return msg
2016-12-02 15:29:44 +01:00
2016-08-12 13:16:49 +02:00
if __name__ == ' __main__ ' :
2016-10-22 23:13:20 +02:00
unittest . main ( )