mirror of https://github.com/MISP/misp-modules
61 lines
2.0 KiB
Python
61 lines
2.0 KiB
Python
"""Test module for the ThreatConnect Export module"""
|
|
import base64
|
|
import csv
|
|
import io
|
|
import json
|
|
import os
|
|
import unittest
|
|
import requests
|
|
|
|
|
|
class TestModules(unittest.TestCase):
|
|
"""Unittest module for threat_connect_export.py"""
|
|
def setUp(self):
|
|
self.headers = {'Content-Type': 'application/json'}
|
|
self.url = "http://127.0.0.1:6666/"
|
|
self.module = "threat_connect_export"
|
|
input_event_path = "%s/test_files/misp_event.json" % os.path.dirname(os.path.realpath(__file__))
|
|
with open(input_event_path, "r") as ifile:
|
|
self.event = json.load(ifile)
|
|
|
|
def test_01_introspection(self):
|
|
"""Taken from test.py"""
|
|
try:
|
|
response = requests.get(self.url + "modules")
|
|
modules = [module["name"] for module in response.json()]
|
|
assert self.module in modules
|
|
finally:
|
|
response.connection.close()
|
|
|
|
def test_02_export(self):
|
|
"""Test an event export"""
|
|
test_source = "Test Export"
|
|
query = {
|
|
"module": self.module,
|
|
"data": [self.event],
|
|
"config": {
|
|
"Default_Source": test_source
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(self.url + "query", headers=self.headers, data=json.dumps(query))
|
|
data = base64.b64decode(response.json()["data"]).decode("utf-8")
|
|
csvfile = io.StringIO(data)
|
|
reader = csv.DictReader(csvfile)
|
|
|
|
values = [field["Value"] for field in reader]
|
|
assert "google.com" in values
|
|
assert "127.0.0.1" in values
|
|
|
|
# resetting file pointer to read through again and extract sources
|
|
csvfile.seek(0)
|
|
# use a set comprehension to deduplicate sources
|
|
sources = {field["Source"] for field in reader}
|
|
assert test_source in sources
|
|
finally:
|
|
response.connection.close()
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|