mirror of https://github.com/MISP/misp-modules
add: [test expansion] Added various tests for modules with api authentication
parent
1563be1100
commit
4cabbe6334
|
@ -264,6 +264,20 @@ class TestExpansions(unittest.TestCase):
|
|||
# Empty results, which in this case comes from a connection error
|
||||
continue
|
||||
|
||||
def test_passivetotal(self):
|
||||
module_name = "passivetotal"
|
||||
query = {"module": module_name, "ip-src": "149.13.33.14"}
|
||||
if module_name in self.configs:
|
||||
query["config"] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_values(response), 'circl.lu')
|
||||
except Exception:
|
||||
self.assertEqual(self.get_errors(response), 'We hit an error, time to bail!')
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Configuration is missing from the request.')
|
||||
|
||||
def test_pdf(self):
|
||||
filename = 'test.pdf'
|
||||
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
|
||||
|
@ -293,6 +307,35 @@ class TestExpansions(unittest.TestCase):
|
|||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), 'dns.google.')
|
||||
|
||||
def test_securitytrails(self):
|
||||
module_name = "securitytrails"
|
||||
query_types = ('ip-src', 'domain')
|
||||
query_values = ('149.13.33.14', 'circl.lu')
|
||||
results = ('www.attack-community.org', 'ns4.eurodns.com')
|
||||
if module_name in self.configs:
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name, query_type: query_value, "config": self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_values(response), result)
|
||||
except Exception:
|
||||
self.assertTrue(self.get_errors(response).stratswith('Error '))
|
||||
else:
|
||||
query = {"module": module_name, query_values[0]: query_types[0]}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'SecurityTrails authentication is missing')
|
||||
|
||||
def test_shodan(self):
|
||||
module_name = "shodan"
|
||||
query = {"module": module_name, "ip-src": "149.13.33.14"}
|
||||
if module_name in self.configs:
|
||||
query['config'] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response).startswith('{"region_code": null, "tags": [], "ip": 2500665614,'))
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Shodan authentication is missing')
|
||||
|
||||
def test_sigma_queries(self):
|
||||
query = {"module": "sigma_queries", "sigma": self.sigma_rule}
|
||||
response = self.misp_modules_post(query)
|
||||
|
@ -333,6 +376,88 @@ class TestExpansions(unittest.TestCase):
|
|||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response), result)
|
||||
|
||||
def test_urlhaus(self):
|
||||
query_types = ('domain', 'ip-src', 'sha256', 'url')
|
||||
query_values = ('www.bestwpdesign.com', '79.118.195.239',
|
||||
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
|
||||
'http://79.118.195.239:1924/.i')
|
||||
results = ('url', 'url', 'virustotal-report', 'virustotal-report')
|
||||
for query_type, query_value, result in zip(query_types[:2], query_values[:2], results[:2]):
|
||||
query = {"module": "urlhaus",
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value,
|
||||
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_attribute(response), result)
|
||||
for query_type, query_value, result in zip(query_types[2:], query_values[2:], results[2:]):
|
||||
query = {"module": "urlhaus",
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value,
|
||||
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_object(response), result)
|
||||
|
||||
def test_urlscan(self):
|
||||
module_name = "urlscan"
|
||||
query = {"module": module_name, "url": "https://circl.lu/team"}
|
||||
if module_name in self.configs:
|
||||
query['config'] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), 'circl.lu')
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Urlscan apikey is missing')
|
||||
|
||||
def test_virustotal_public(self):
|
||||
module_name = "virustotal_public"
|
||||
query_types = ('domain', 'ip-src', 'sha256', 'url')
|
||||
query_values = ('circl.lu', '149.13.33.14',
|
||||
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
|
||||
'http://194.169.88.56:49151/.i')
|
||||
results = ('whois', 'asn', 'file', 'virustotal-report')
|
||||
if module_name in self.configs:
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value},
|
||||
"config": self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_object(response), result)
|
||||
except Exception:
|
||||
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
|
||||
else:
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_types[0],
|
||||
"value": query_values[0]}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), "A VirusTotal api key is required for this module.")
|
||||
|
||||
def test_virustotal(self):
|
||||
module_name = "virustotal"
|
||||
query_types = ('domain', 'ip-src', 'sha256', 'url')
|
||||
query_values = ('circl.lu', '149.13.33.14',
|
||||
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
|
||||
'http://194.169.88.56:49151/.i')
|
||||
results = ('whois', 'asn', 'file', 'virustotal-report')
|
||||
if module_name in self.configs:
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value},
|
||||
"config": self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_object(response), result)
|
||||
except Exception:
|
||||
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
|
||||
else:
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_types[0],
|
||||
"value": query_values[0]}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), "A VirusTotal api key is required for this module.")
|
||||
|
||||
def test_wikidata(self):
|
||||
query = {"module": "wiki", "text": "Google"}
|
||||
response = self.misp_modules_post(query)
|
||||
|
|
Loading…
Reference in New Issue