mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			688 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			688 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
import unittest
 | 
						|
import requests
 | 
						|
import base64
 | 
						|
import json
 | 
						|
import os
 | 
						|
import io
 | 
						|
import re
 | 
						|
import zipfile
 | 
						|
from hashlib import sha256
 | 
						|
from email.mime.application import MIMEApplication
 | 
						|
from email.mime.text import MIMEText
 | 
						|
from email.mime.multipart import MIMEMultipart
 | 
						|
from email.header import Header
 | 
						|
 | 
						|
class TestModules(unittest.TestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.maxDiff = None
 | 
						|
        self.headers = {'Content-Type': 'application/json'}
 | 
						|
        self.url = "http://127.0.0.1:6666/"
 | 
						|
 | 
						|
    def test_introspection(self):
 | 
						|
        response = requests.get(self.url + "modules")
 | 
						|
        print(response.json())
 | 
						|
        response.connection.close()
 | 
						|
 | 
						|
    def test_cve(self):
 | 
						|
        with open('tests/bodycve.json', 'r') as f:
 | 
						|
            response = requests.post(self.url + "query", data=f.read())
 | 
						|
            print(response.json())
 | 
						|
            response.connection.close()
 | 
						|
 | 
						|
    def test_dns(self):
 | 
						|
        with open('tests/body.json', 'r') as f:
 | 
						|
            response = requests.post(self.url + "query", data=f.read())
 | 
						|
            print(response.json())
 | 
						|
            response.connection.close()
 | 
						|
        with open('tests/body_timeout.json', 'r') as f:
 | 
						|
            response = requests.post(self.url + "query", data=f.read())
 | 
						|
            print(response.json())
 | 
						|
            response.connection.close()
 | 
						|
 | 
						|
    def test_openioc(self):
 | 
						|
        with open("tests/openioc.xml", "rb") as f:
 | 
						|
            content = base64.b64encode(f.read())
 | 
						|
            data = json.dumps({"module": "openiocimport",
 | 
						|
                               "data": content.decode(),
 | 
						|
                               })
 | 
						|
            response = requests.post(self.url + "query", data=data).json()
 | 
						|
            print(response)
 | 
						|
 | 
						|
            print("OpenIOC :: {}".format(response))
 | 
						|
            values = [x["values"][0] for x in response["results"]]
 | 
						|
            assert("mrxcls.sys" in values)
 | 
						|
            assert("mdmcpq3.PNF" in values)
 | 
						|
 | 
						|
    def test_email_headers(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        results = response.json()['results']
 | 
						|
        values = [x["values"] for x in results]
 | 
						|
        types = {}
 | 
						|
        for i in results:
 | 
						|
            types.setdefault(i["type"], 0)
 | 
						|
            types[i["type"]] += 1
 | 
						|
        # Check that there are the appropriate number of items
 | 
						|
        # Check that all the items were correct
 | 
						|
        self.assertEqual(types['target-email'], 1)
 | 
						|
        self.assertIn('test@domain.com', values)
 | 
						|
        self.assertEqual(types['email-dst-display-name'], 4)
 | 
						|
        self.assertIn('Last One', values)
 | 
						|
        self.assertIn('Other Friend', values)
 | 
						|
        self.assertIn('Second Person', values)
 | 
						|
        self.assertIn('Testy Testerson', values)
 | 
						|
        self.assertEqual(types['email-dst'], 4)
 | 
						|
        self.assertIn('test@domain.com', values)
 | 
						|
        self.assertIn('second@domain.com', values)
 | 
						|
        self.assertIn('other@friend.net', values)
 | 
						|
        self.assertIn('last_one@finally.com', values)
 | 
						|
        self.assertEqual(types['email-src-display-name'], 2)
 | 
						|
        self.assertIn("Innocent Person", values)
 | 
						|
        self.assertEqual(types['email-src'], 2)
 | 
						|
        self.assertIn("evil_spoofer@example.com", values)
 | 
						|
        self.assertIn("IgnoreMeImInnocent@sender.com", values)
 | 
						|
        self.assertEqual(types['email-thread-index'], 1)
 | 
						|
        self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
 | 
						|
        self.assertEqual(types['email-message-id'], 1)
 | 
						|
        self.assertIn("<4988EF2D.40804@example.com>", values)
 | 
						|
        self.assertEqual(types['email-subject'], 1)
 | 
						|
        self.assertIn("Example Message", values)
 | 
						|
        self.assertEqual(types['email-header'], 1)
 | 
						|
        self.assertEqual(types['email-x-mailer'], 1)
 | 
						|
        self.assertIn("mlx 5.1.7", values)
 | 
						|
        self.assertEqual(types['email-reply-to'], 1)
 | 
						|
        self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
 | 
						|
 | 
						|
    def test_email_attachment_basic(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/EICAR.com", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'com')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()['results']]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i["type"] == 'email-attachment':
 | 
						|
                self.assertEqual(i["values"], "EICAR.com")
 | 
						|
            if i['type'] == 'malware-sample':
 | 
						|
                attch_data = base64.b64decode(i["data"])
 | 
						|
                self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_attachment_unpack(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/EICAR.com.zip", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        self.assertIn('EICAR.com.zip', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
 | 
						|
                with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
 | 
						|
                    with zf.open("EICAR.com") as ec:
 | 
						|
                        attch_data = ec.read()
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
            if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
 | 
						|
                attch_data = base64.b64decode(i["data"])
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_dont_unpack_compressed_doc_attachments(self):
 | 
						|
        """Ensures that compressed
 | 
						|
        """
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/test_files/test.docx", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="test.docx")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('test.docx', values)
 | 
						|
        types = {}
 | 
						|
        for i in response.json()['results']:
 | 
						|
            types.setdefault(i["type"], 0)
 | 
						|
            types[i["type"]] += 1
 | 
						|
        # Check that there is only one attachment in the bundle
 | 
						|
        self.assertEqual(types['malware-sample'], 1)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i['type'] == 'malware-sample' and i["values"] == 'test.docx':
 | 
						|
                attch_data = base64.b64decode(i["data"])
 | 
						|
                filesum = sha256()
 | 
						|
                filesum.update(attch_data)
 | 
						|
                self.assertEqual(filesum.hexdigest(),
 | 
						|
                                 '098da5381a90d4a51e6b844c18a0fecf2e364813c2f8b317cfdc51c21f2506a5')
 | 
						|
 | 
						|
    def test_email_attachment_unpack_with_password(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": 'true',
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/infected.zip", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        self.assertIn('EICAR.com.zip', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com.zip':
 | 
						|
                with zipfile.ZipFile(io.BytesIO(base64.b64decode(i["data"])), 'r') as zf:
 | 
						|
                    # Make sure password was set and still in place
 | 
						|
                    self.assertRaises(RuntimeError, zf.open, "EICAR.com")
 | 
						|
            if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
 | 
						|
                attch_data = base64.b64decode(i["data"])
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_attachment_password_in_body(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": 'true',
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a -> STRINGS <- test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/short_password.zip", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i["values"] == 'EICAR.com':
 | 
						|
                attch_data = base64.b64decode(i["data"]).decode()
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_attachment_password_in_body_quotes(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": 'true',
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail
 | 
						|
        the password is "a long password".
 | 
						|
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/longer_password.zip", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            # Check that it could be extracted.
 | 
						|
            if i['type'] == 'malware-sample' and i["values"] == 'EICAR.com':
 | 
						|
                attch_data = base64.b64decode(i["data"]).decode()
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_attachment_password_in_html_body(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": 'true',
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail
 | 
						|
        the password is NOT "this string".
 | 
						|
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        html = """\
 | 
						|
<html>
 | 
						|
  <head></head>
 | 
						|
  <body>
 | 
						|
    <p>Hi!<br>
 | 
						|
       This is the real password?<br>
 | 
						|
       It is "a long password".
 | 
						|
    </p>
 | 
						|
  </body>
 | 
						|
</html>
 | 
						|
"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        message.attach(MIMEText(html, 'html'))
 | 
						|
        with open("tests/longer_password.zip", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            # Check that it could be extracted.
 | 
						|
            if i["values"] == 'EICAR.com':
 | 
						|
                attch_data = base64.b64decode(i["data"]).decode()
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_body_encoding(self):
 | 
						|
        query = {"module":"email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        filenames = os.listdir("tests/test_files/encodings")
 | 
						|
        for fn in filenames:
 | 
						|
            message = get_base_email()
 | 
						|
            encoding = os.path.splitext(fn)
 | 
						|
            with open("tests/test_files/encodings/{0}".format(fn), "r", encoding=encoding[0]) as fp:
 | 
						|
                # Encoding is used as the name of the file
 | 
						|
                text = fp.read()
 | 
						|
                message.attach(MIMEText(text, 'html', encoding[0]))
 | 
						|
                query['data'] = decode_email(message)
 | 
						|
                data = json.dumps(query)
 | 
						|
                response = requests.post(self.url + "query", data=data).json()
 | 
						|
                self.assertNotIn('error', response, response.get('error', ""))
 | 
						|
                self.assertIn('results', response, "No server results found.")
 | 
						|
 | 
						|
 | 
						|
    def test_email_header_proper_encoding(self):
 | 
						|
        query = {"module":"email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        filenames = os.listdir("tests/test_files/encodings")
 | 
						|
        for encoding in ['utf-8', 'utf-16', 'utf-32']:
 | 
						|
            message = get_base_email()
 | 
						|
            text = """I am a test e-mail
 | 
						|
            the password is NOT "this string".
 | 
						|
            That is all.
 | 
						|
            """
 | 
						|
            message.attach(MIMEText(text, 'plain'))
 | 
						|
            for hdr, hdr_val in message.items():
 | 
						|
                msg = message
 | 
						|
                encoded_header = hdr_val.encode(encoding)
 | 
						|
                msg.replace_header(hdr, Header(encoded_header, encoding))
 | 
						|
                query['data'] = decode_email(msg)
 | 
						|
                data = json.dumps(query)
 | 
						|
                response = requests.post(self.url + "query", data=data)
 | 
						|
                results = response.json()['results']
 | 
						|
                values = []
 | 
						|
                for x in results:
 | 
						|
                    # Remove BOM from UTF-16 strings
 | 
						|
                    if re.search('\ufeff', x["values"]):
 | 
						|
                        values.append(re.sub('\ufeff', "", x["values"]))
 | 
						|
                    else:
 | 
						|
                        values.append(x["values"])
 | 
						|
                types = {}
 | 
						|
                for i in results:
 | 
						|
                    types.setdefault(i["type"], 0)
 | 
						|
                    types[i["type"]] += 1
 | 
						|
                # Check that all the items were correct
 | 
						|
                self.assertEqual(types['target-email'], 1)
 | 
						|
                self.assertIn('test@domain.com', values)
 | 
						|
                self.assertEqual(types['email-dst-display-name'], 4)
 | 
						|
                self.assertIn('Last One', values)
 | 
						|
                self.assertIn('Other Friend', values)
 | 
						|
                self.assertIn('Second Person', values)
 | 
						|
                self.assertIn('Testy Testerson', values)
 | 
						|
                self.assertEqual(types['email-dst'], 4)
 | 
						|
                self.assertIn('test@domain.com', values)
 | 
						|
                self.assertIn('second@domain.com', values)
 | 
						|
                self.assertIn('other@friend.net', values)
 | 
						|
                self.assertIn('last_one@finally.com', values)
 | 
						|
                self.assertEqual(types['email-src-display-name'], 2)
 | 
						|
                self.assertIn("Innocent Person", values)
 | 
						|
                self.assertEqual(types['email-src'], 2)
 | 
						|
                self.assertIn("evil_spoofer@example.com", values)
 | 
						|
                self.assertIn("IgnoreMeImInnocent@sender.com", values)
 | 
						|
                self.assertEqual(types['email-thread-index'], 1)
 | 
						|
                self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
 | 
						|
                self.assertEqual(types['email-message-id'], 1)
 | 
						|
                self.assertIn("<4988EF2D.40804@example.com>", values)
 | 
						|
                self.assertEqual(types['email-subject'], 1)
 | 
						|
                self.assertIn("Example Message", values)
 | 
						|
                self.assertEqual(types['email-header'], 1)
 | 
						|
                self.assertEqual(types['email-x-mailer'], 1)
 | 
						|
                self.assertIn("mlx 5.1.7", values)
 | 
						|
                self.assertEqual(types['email-reply-to'], 1)
 | 
						|
                self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
 | 
						|
 | 
						|
                self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
 | 
						|
 | 
						|
    def test_email_header_malformed_encoding(self):
 | 
						|
        query = {"module":"email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        filenames = os.listdir("tests/test_files/encodings")
 | 
						|
        for encoding in ['utf-8', 'utf-16', 'utf-32']:
 | 
						|
            message = get_base_email()
 | 
						|
            text = """I am a test e-mail
 | 
						|
            the password is NOT "this string".
 | 
						|
            That is all.
 | 
						|
            """
 | 
						|
            message.attach(MIMEText(text, 'plain'))
 | 
						|
            for hdr, hdr_val in message.items():
 | 
						|
                msg = message
 | 
						|
                encoded_header = hdr_val.encode(encoding)
 | 
						|
                pat = re.compile(hdr_val.encode())
 | 
						|
                message_bytes = pat.sub(encoded_header, msg.as_bytes())
 | 
						|
                message64 = base64.b64encode(message_bytes).decode()
 | 
						|
                query['data'] = message64
 | 
						|
 | 
						|
                data = json.dumps(query)
 | 
						|
                response = requests.post(self.url + "query", data=data)
 | 
						|
                results = response.json()['results']
 | 
						|
                values = []
 | 
						|
                for x in results:
 | 
						|
                    # Remove BOM from UTF-16 strings
 | 
						|
                    if re.search('\ufeff', x["values"]):
 | 
						|
                        values.append(re.sub('\ufeff', "", x["values"]))
 | 
						|
                    else:
 | 
						|
                        values.append(x["values"])
 | 
						|
                types = {}
 | 
						|
                for i in results:
 | 
						|
                    types.setdefault(i["type"], 0)
 | 
						|
                    types[i["type"]] += 1
 | 
						|
                # Check that all the items were correct
 | 
						|
                self.assertEqual(types['target-email'], 1)
 | 
						|
                self.assertIn('test@domain.com', values)
 | 
						|
                self.assertEqual(types['email-dst-display-name'], 4)
 | 
						|
                self.assertIn('Last One', values)
 | 
						|
                self.assertIn('Other Friend', values)
 | 
						|
                self.assertIn('Second Person', values)
 | 
						|
                self.assertIn('Testy Testerson', values)
 | 
						|
                self.assertEqual(types['email-dst'], 4)
 | 
						|
                self.assertIn('test@domain.com', values)
 | 
						|
                self.assertIn('second@domain.com', values)
 | 
						|
                self.assertIn('other@friend.net', values)
 | 
						|
                self.assertIn('last_one@finally.com', values)
 | 
						|
                self.assertEqual(types['email-src-display-name'], 2)
 | 
						|
                self.assertIn("Innocent Person", values)
 | 
						|
                self.assertEqual(types['email-src'], 2)
 | 
						|
                self.assertIn("evil_spoofer@example.com", values)
 | 
						|
                self.assertIn("IgnoreMeImInnocent@sender.com", values)
 | 
						|
                self.assertEqual(types['email-thread-index'], 1)
 | 
						|
                self.assertIn('AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==', values)
 | 
						|
                self.assertEqual(types['email-message-id'], 1)
 | 
						|
                self.assertIn("<4988EF2D.40804@example.com>", values)
 | 
						|
                self.assertEqual(types['email-subject'], 1)
 | 
						|
                self.assertIn("Example Message", values)
 | 
						|
                self.assertEqual(types['email-header'], 1)
 | 
						|
                self.assertEqual(types['email-x-mailer'], 1)
 | 
						|
                self.assertIn("mlx 5.1.7", values)
 | 
						|
                self.assertEqual(types['email-reply-to'], 1)
 | 
						|
                self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
 | 
						|
 | 
						|
                self.assertIn("<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>", values)
 | 
						|
 | 
						|
    def test_email_header_CJK_encoding(self):
 | 
						|
        query = {"module":"email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        # filenames = os.listdir("tests/test_files/encodings")
 | 
						|
        # for encoding in ['utf-8', 'utf-16', 'utf-32']:
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail
 | 
						|
        the password is NOT "this string".
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
 | 
						|
        jisx213 = Header(japanese_charset, 'euc_jisx0213')
 | 
						|
        message.replace_header("Subject", jisx213)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        # Parse Response
 | 
						|
        RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i['type'] == 'email-subject':
 | 
						|
                RFC_encoding_error = "The subject was not decoded from RFC2047 format."
 | 
						|
                self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
 | 
						|
                self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
 | 
						|
 | 
						|
    def test_email_malformed_header_CJK_encoding(self):
 | 
						|
        query = {"module":"email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        # filenames = os.listdir("tests/test_files/encodings")
 | 
						|
        # for encoding in ['utf-8', 'utf-16', 'utf-32']:
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail
 | 
						|
        the password is NOT "this string".
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        japanese_charset = "ビット及び8ビットの2バイト情報交換用符号化拡張漢字集合"
 | 
						|
        japanese_bytes = japanese_charset.encode()
 | 
						|
        message.replace_header('Subject', "{{REPLACE}}")
 | 
						|
        pat = re.compile(b'{{REPLACE}}')
 | 
						|
        message_bytes = pat.sub(japanese_bytes, message.as_bytes())
 | 
						|
        message64 = base64.b64encode(message_bytes).decode()
 | 
						|
        query['data'] = message64
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        # Parse Response
 | 
						|
        RFC_format = '=?euc_jisx0213?b?pdOlw6XItdqk0zil06XDpcikzjKl0KWkpci+8MrzuPK0uc3RyeS55rK9s8jEpbTBu/q9uLnn?='
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i['type'] == 'email-subject':
 | 
						|
                RFC_encoding_error = "The subject was not decoded from RFC2047 format."
 | 
						|
                self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
 | 
						|
                self.assertEqual(japanese_charset, i['values'], "Subject not properly decoded")
 | 
						|
 | 
						|
    def test_email_malformed_header_emoji_encoding(self):
 | 
						|
        query = {"module":"email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        # filenames = os.listdir("tests/test_files/encodings")
 | 
						|
        # for encoding in ['utf-8', 'utf-16', 'utf-32']:
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail
 | 
						|
        the password is NOT "this string".
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        emoji_string = "Emoji Test 👍 checking this"
 | 
						|
        emoji_bytes = emoji_string.encode()
 | 
						|
        message.replace_header('Subject', "{{EMOJI}}")
 | 
						|
        pat = re.compile(b'{{EMOJI}}')
 | 
						|
        message_bytes = pat.sub(emoji_bytes, message.as_bytes())
 | 
						|
        message64 = base64.b64encode(message_bytes).decode()
 | 
						|
        query['data'] = message64
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        # Parse Response
 | 
						|
        RFC_format = "=?unknown-8bit?q?Emoji_Test_=F0=9F=91=8D_checking_this?="
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i['type'] == 'email-subject':
 | 
						|
                RFC_encoding_error = "The subject was not decoded from RFC2047 format."
 | 
						|
                self.assertNotEqual(RFC_format, i['values'], RFC_encoding_error)
 | 
						|
                self.assertEqual(emoji_string, i['values'], "Subject not properly decoded")
 | 
						|
 | 
						|
    def test_email_attachment_emoji_filename(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/EICAR.com", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'com')
 | 
						|
            eicar_mime.add_header('Content-Disposition',
 | 
						|
                                      'attachment',
 | 
						|
                                      filename="Emoji Test 👍 checking this")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()['results']]
 | 
						|
        self.assertIn("Emoji Test 👍 checking this", values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            if i["type"] == 'email-attachment':
 | 
						|
                self.assertEqual(i["values"], "Emoji Test 👍 checking this")
 | 
						|
            if i['type'] == 'malware-sample':
 | 
						|
                attch_data = base64.b64decode(i["data"])
 | 
						|
                self.assertEqual(attch_data, b'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
 | 
						|
    def test_email_attachment_password_in_subject(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": "true",
 | 
						|
                           "guess_zip_attachment_passwords": 'true',
 | 
						|
                           "extract_urls": None}
 | 
						|
        message = get_base_email()
 | 
						|
        message.replace_header("Subject", 'I contain the -> "a long password" <- that is the password')
 | 
						|
        text = """I am a test e-mail
 | 
						|
        the password is NOT "this string".
 | 
						|
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        with open("tests/longer_password.zip", "rb") as fp:
 | 
						|
            eicar_mime = MIMEApplication(fp.read(), 'zip')
 | 
						|
            eicar_mime.add_header('Content-Disposition', 'attachment', filename="EICAR.com.zip")
 | 
						|
            message.attach(eicar_mime)
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn('EICAR.com', values)
 | 
						|
        self.assertIn('I contain the -> "a long password" <- that is the password', values)
 | 
						|
        for i in response.json()['results']:
 | 
						|
            # Check that it could be extracted.
 | 
						|
            if i["values"] == 'EICAR.com':
 | 
						|
                attch_data = base64.b64decode(i["data"]).decode()
 | 
						|
                self.assertEqual(attch_data,
 | 
						|
                                 'X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-')
 | 
						|
 | 
						|
    def test_email_extract_html_body_urls(self):
 | 
						|
        query = {"module": "email_import"}
 | 
						|
        query["config"] = {"unzip_attachments": None,
 | 
						|
                           "guess_zip_attachment_passwords": None,
 | 
						|
                           "extract_urls": "true"}
 | 
						|
        message = get_base_email()
 | 
						|
        text = """I am a test e-mail
 | 
						|
 | 
						|
        That is all.
 | 
						|
        """
 | 
						|
        html = """\
 | 
						|
<html>
 | 
						|
  <head></head>
 | 
						|
  <body>
 | 
						|
    <p>Hi!<br>
 | 
						|
<p>MISP modules are autonomous modules that can be used for expansion and other services in <a href="https://github.com/MISP/MISP">MISP</a>.</p>
 | 
						|
<p>The modules are written in Python 3 following a simple API interface. The objective is to ease the extensions of MISP functionalities
 | 
						|
without modifying core components. The API is available via a simple REST API which is independent from MISP installation or configuration.</p>
 | 
						|
<p>MISP modules support is included in MISP starting from version 2.4.28.</p>
 | 
						|
<p>For more information: <a href="https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf">Extending MISP with Python modules</a> slides from MISP training.</p>
 | 
						|
    </p>
 | 
						|
  </body>
 | 
						|
</html>
 | 
						|
"""
 | 
						|
        message.attach(MIMEText(text, 'plain'))
 | 
						|
        message.attach(MIMEText(html, 'html'))
 | 
						|
        query['data'] = decode_email(message)
 | 
						|
        data = json.dumps(query)
 | 
						|
        response = requests.post(self.url + "query", data=data)
 | 
						|
        # print(response.json())
 | 
						|
        values = [x["values"] for x in response.json()["results"]]
 | 
						|
        self.assertIn("https://github.com/MISP/MISP", values)
 | 
						|
        self.assertIn("https://www.circl.lu/assets/files/misp-training/3.1-MISP-modules.pdf", values)
 | 
						|
 | 
						|
    # def test_domaintools(self):
 | 
						|
    #    query = {'config': {'username': 'test_user', 'api_key': 'test_key'}, 'module': 'domaintools', 'domain': 'domaintools.com'}
 | 
						|
    #    try:
 | 
						|
    #        response = requests.post(self.url + "query", data=json.dumps(query)).json()
 | 
						|
    #    except:
 | 
						|
    #        pass
 | 
						|
    #    response = requests.post(self.url + "query", data=json.dumps(query)).json()
 | 
						|
    #    print(response)
 | 
						|
 | 
						|
 | 
						|
def decode_email(message):
 | 
						|
    message64 = base64.b64encode(message.as_bytes()).decode()
 | 
						|
    return message64
 | 
						|
 | 
						|
 | 
						|
def get_base_email():
 | 
						|
    headers = {"Received": "via dmail-2008.19 for +INBOX; Tue, 3 Feb 2009 19:29:12 -0600 (CST)",
 | 
						|
               "Received": "from abc.luxsci.com ([10.10.10.10]) by xyz.luxsci.com (8.13.7/8.13.7) with ESMTP id n141TCa7022588 for <test@domain.com>; Tue, 3 Feb 2009 19:29:12 -0600",
 | 
						|
               "Received": "from [192.168.0.3] (verizon.net [44.44.44.44]) (user=test@sender.com mech=PLAIN bits=2) by abc.luxsci.com (8.13.7/8.13.7) with ESMTP id n141SAfo021855 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=NOT) for <test@domain.com>; Tue, 3 Feb 2009 19:28:10 -0600",
 | 
						|
               "X-Received": "by 192.168.0.45 with SMTP id q4mr156123401yw1g.911.1912342394963; Tue, 3 Feb 2009 19:32:15 -0600 (PST)",
 | 
						|
               "Message-ID": "<4988EF2D.40804@example.com>",
 | 
						|
               "Date": "Tue, 03 Feb 2009 20:28:13 -0500",
 | 
						|
               "From": '"Innocent Person" <IgnoreMeImInnocent@sender.com>',
 | 
						|
               "User-Agent": 'Thunderbird 2.0.0.19 (Windows/20081209)',
 | 
						|
               "Sender": '"Malicious MailAgent" <mailagent@example.com>',
 | 
						|
               "References": "<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>",
 | 
						|
               "In-Reply-To": "<CI7DgL-A6dm92s7gf4-88g@E_0x238G4K2H08H9SDwsw8b6LwuA@mail.example.com>",
 | 
						|
               "Accept-Language": 'en-US',
 | 
						|
               "X-Mailer": 'mlx 5.1.7',
 | 
						|
               "Return-Path": "evil_spoofer@example.com",
 | 
						|
               "Thread-Topic": 'This is a thread.',
 | 
						|
               "Thread-Index": 'AQHSR8Us3H3SoaY1oUy9AAwZfMF922bnA9GAgAAi9s4AAGvxAA==',
 | 
						|
               "Content-Language": 'en-US',
 | 
						|
               "To": '"Testy Testerson" <test@domain.com>',
 | 
						|
               "Cc": '"Second Person" <second@domain.com>, "Other Friend" <other@friend.net>, "Last One" <last_one@finally.com>',
 | 
						|
               "Subject": 'Example Message',
 | 
						|
               "MIME-Version": '1.0'}
 | 
						|
    msg = MIMEMultipart()
 | 
						|
    for key, val in headers.items():
 | 
						|
        msg.add_header(key, val)
 | 
						|
    return msg
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |