misp-modules/tests/threatconnect_export_test.py

61 lines
2.0 KiB
Python
Raw Normal View History

"""Test module for the ThreatConnect Export module"""
import base64
import csv
import io
import json
import os
import unittest
import requests
class TestModules(unittest.TestCase):
"""Unittest module for threat_connect_export.py"""
def setUp(self):
self.headers = {'Content-Type': 'application/json'}
self.url = "http://127.0.0.1:6666/"
self.module = "threat_connect_export"
input_event_path = "%s/test_files/misp_event.json" % os.path.dirname(os.path.realpath(__file__))
with open(input_event_path, "r") as ifile:
self.event = json.load(ifile)
def test_01_introspection(self):
"""Taken from test.py"""
try:
response = requests.get(self.url + "modules")
modules = [module["name"] for module in response.json()]
assert self.module in modules
finally:
response.connection.close()
def test_02_export(self):
"""Test an event export"""
test_source = "Test Export"
query = {
"module": self.module,
"data": [self.event],
"config": {
"Default_Source": test_source
}
}
try:
response = requests.post(self.url + "query", headers=self.headers, data=json.dumps(query))
data = base64.b64decode(response.json()["data"]).decode("utf-8")
csvfile = io.StringIO(data)
reader = csv.DictReader(csvfile)
values = [field["Value"] for field in reader]
assert "google.com" in values
assert "127.0.0.1" in values
# resetting file pointer to read through again and extract sources
csvfile.seek(0)
# use a set comprehension to deduplicate sources
sources = {field["Source"] for field in reader}
assert test_source in sources
finally:
response.connection.close()
if __name__ == "__main__":
unittest.main()