mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			61 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			61 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
| """Test module for the ThreatConnect Export module"""
 | |
| import base64
 | |
| import csv
 | |
| import io
 | |
| import json
 | |
| import os
 | |
| import unittest
 | |
| import requests
 | |
| 
 | |
| 
 | |
| class TestModules(unittest.TestCase):
 | |
|     """Unittest module for threat_connect_export.py"""
 | |
|     def setUp(self):
 | |
|         self.headers = {'Content-Type': 'application/json'}
 | |
|         self.url = "http://127.0.0.1:6666/"
 | |
|         self.module = "threat_connect_export"
 | |
|         input_event_path = "%s/test_files/misp_event.json" % os.path.dirname(os.path.realpath(__file__))
 | |
|         with open(input_event_path, "r") as ifile:
 | |
|             self.event = json.load(ifile)
 | |
| 
 | |
|     def test_01_introspection(self):
 | |
|         """Taken from test.py"""
 | |
|         try:
 | |
|             response = requests.get(self.url + "modules")
 | |
|             modules = [module["name"] for module in response.json()]
 | |
|             assert self.module in modules
 | |
|         finally:
 | |
|             response.connection.close()
 | |
| 
 | |
|     def test_02_export(self):
 | |
|         """Test an event export"""
 | |
|         test_source = "Test Export"
 | |
|         query = {
 | |
|             "module": self.module,
 | |
|             "data": [self.event],
 | |
|             "config": {
 | |
|                 "Default_Source": test_source
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         try:
 | |
|             response = requests.post(self.url + "query", headers=self.headers, data=json.dumps(query))
 | |
|             data = base64.b64decode(response.json()["data"]).decode("utf-8")
 | |
|             csvfile = io.StringIO(data)
 | |
|             reader = csv.DictReader(csvfile)
 | |
| 
 | |
|             values = [field["Value"] for field in reader]
 | |
|             assert "google.com" in values
 | |
|             assert "127.0.0.1" in values
 | |
| 
 | |
|             # resetting file pointer to read through again and extract sources
 | |
|             csvfile.seek(0)
 | |
|             # use a set comprehension to deduplicate sources
 | |
|             sources = {field["Source"] for field in reader}
 | |
|             assert test_source in sources
 | |
|         finally:
 | |
|             response.connection.close()
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |