diff --git a/tests/test_expansions.py b/tests/test_expansions.py index 9233fb4..13b2a34 100644 --- a/tests/test_expansions.py +++ b/tests/test_expansions.py @@ -264,6 +264,20 @@ class TestExpansions(unittest.TestCase): # Empty results, which in this case comes from a connection error continue + def test_passivetotal(self): + module_name = "passivetotal" + query = {"module": module_name, "ip-src": "149.13.33.14"} + if module_name in self.configs: + query["config"] = self.configs[module_name] + response = self.misp_modules_post(query) + try: + self.assertEqual(self.get_values(response), 'circl.lu') + except Exception: + self.assertEqual(self.get_errors(response), 'We hit an error, time to bail!') + else: + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), 'Configuration is missing from the request.') + def test_pdf(self): filename = 'test.pdf' with open(f'{self.dirname}/test_files/{filename}', 'rb') as f: @@ -293,6 +307,35 @@ class TestExpansions(unittest.TestCase): response = self.misp_modules_post(query) self.assertEqual(self.get_values(response), 'dns.google.') + def test_securitytrails(self): + module_name = "securitytrails" + query_types = ('ip-src', 'domain') + query_values = ('149.13.33.14', 'circl.lu') + results = ('www.attack-community.org', 'ns4.eurodns.com') + if module_name in self.configs: + for query_type, query_value, result in zip(query_types, query_values, results): + query = {"module": module_name, query_type: query_value, "config": self.configs[module_name]} + response = self.misp_modules_post(query) + try: + self.assertEqual(self.get_values(response), result) + except Exception: + self.assertTrue(self.get_errors(response).stratswith('Error ')) + else: + query = {"module": module_name, query_values[0]: query_types[0]} + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), 'SecurityTrails authentication is missing') + + def test_shodan(self): + module_name = "shodan" + query = {"module": module_name, "ip-src": "149.13.33.14"} + if module_name in self.configs: + query['config'] = self.configs[module_name] + response = self.misp_modules_post(query) + self.assertTrue(self.get_values(response).startswith('{"region_code": null, "tags": [], "ip": 2500665614,')) + else: + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), 'Shodan authentication is missing') + def test_sigma_queries(self): query = {"module": "sigma_queries", "sigma": self.sigma_rule} response = self.misp_modules_post(query) @@ -333,6 +376,88 @@ class TestExpansions(unittest.TestCase): response = self.misp_modules_post(query) self.assertTrue(self.get_values(response), result) + def test_urlhaus(self): + query_types = ('domain', 'ip-src', 'sha256', 'url') + query_values = ('www.bestwpdesign.com', '79.118.195.239', + 'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3', + 'http://79.118.195.239:1924/.i') + results = ('url', 'url', 'virustotal-report', 'virustotal-report') + for query_type, query_value, result in zip(query_types[:2], query_values[:2], results[:2]): + query = {"module": "urlhaus", + "attribute": {"type": query_type, + "value": query_value, + "uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}} + response = self.misp_modules_post(query) + self.assertEqual(self.get_attribute(response), result) + for query_type, query_value, result in zip(query_types[2:], query_values[2:], results[2:]): + query = {"module": "urlhaus", + "attribute": {"type": query_type, + "value": query_value, + "uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}} + response = self.misp_modules_post(query) + self.assertEqual(self.get_object(response), result) + + def test_urlscan(self): + module_name = "urlscan" + query = {"module": module_name, "url": "https://circl.lu/team"} + if module_name in self.configs: + query['config'] = self.configs[module_name] + response = self.misp_modules_post(query) + self.assertEqual(self.get_values(response), 'circl.lu') + else: + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), 'Urlscan apikey is missing') + + def test_virustotal_public(self): + module_name = "virustotal_public" + query_types = ('domain', 'ip-src', 'sha256', 'url') + query_values = ('circl.lu', '149.13.33.14', + 'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3', + 'http://194.169.88.56:49151/.i') + results = ('whois', 'asn', 'file', 'virustotal-report') + if module_name in self.configs: + for query_type, query_value, result in zip(query_types, query_values, results): + query = {"module": module_name, + "attribute": {"type": query_type, + "value": query_value}, + "config": self.configs[module_name]} + response = self.misp_modules_post(query) + try: + self.assertEqual(self.get_object(response), result) + except Exception: + self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.") + else: + query = {"module": module_name, + "attribute": {"type": query_types[0], + "value": query_values[0]}} + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), "A VirusTotal api key is required for this module.") + + def test_virustotal(self): + module_name = "virustotal" + query_types = ('domain', 'ip-src', 'sha256', 'url') + query_values = ('circl.lu', '149.13.33.14', + 'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3', + 'http://194.169.88.56:49151/.i') + results = ('whois', 'asn', 'file', 'virustotal-report') + if module_name in self.configs: + for query_type, query_value, result in zip(query_types, query_values, results): + query = {"module": module_name, + "attribute": {"type": query_type, + "value": query_value}, + "config": self.configs[module_name]} + response = self.misp_modules_post(query) + try: + self.assertEqual(self.get_object(response), result) + except Exception: + self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.") + else: + query = {"module": module_name, + "attribute": {"type": query_types[0], + "value": query_values[0]}} + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), "A VirusTotal api key is required for this module.") + def test_wikidata(self): query = {"module": "wiki", "text": "Google"} response = self.misp_modules_post(query)