mirror of https://github.com/MISP/misp-modules
commit
8135a3ceec
|
@ -17,12 +17,17 @@ class TestExpansions(unittest.TestCase):
|
|||
self.url = "http://127.0.0.1:6666/"
|
||||
self.dirname = os.path.dirname(os.path.realpath(__file__))
|
||||
self.sigma_rule = "title: Antivirus Web Shell Detection\r\ndescription: Detects a highly relevant Antivirus alert that reports a web shell\r\ndate: 2018/09/09\r\nmodified: 2019/10/04\r\nauthor: Florian Roth\r\nreferences:\r\n - https://www.nextron-systems.com/2018/09/08/antivirus-event-analysis-cheat-sheet-v1-4/\r\ntags:\r\n - attack.persistence\r\n - attack.t1100\r\nlogsource:\r\n product: antivirus\r\ndetection:\r\n selection:\r\n Signature: \r\n - \"PHP/Backdoor*\"\r\n - \"JSP/Backdoor*\"\r\n - \"ASP/Backdoor*\"\r\n - \"Backdoor.PHP*\"\r\n - \"Backdoor.JSP*\"\r\n - \"Backdoor.ASP*\"\r\n - \"*Webshell*\"\r\n condition: selection\r\nfields:\r\n - FileName\r\n - User\r\nfalsepositives:\r\n - Unlikely\r\nlevel: critical"
|
||||
try:
|
||||
with open(f'{self.dirname}/expansion_configs.json', 'rb') as f:
|
||||
self.configs = json.loads(f.read().decode())
|
||||
except FileNotFoundError:
|
||||
self.configs = {}
|
||||
|
||||
def misp_modules_post(self, query):
|
||||
return requests.post(urljoin(self.url, "query"), json=query)
|
||||
|
||||
@staticmethod
|
||||
def get_attribute(reponse):
|
||||
def get_attribute(response):
|
||||
data = response.json()
|
||||
if not isinstance(data, dict):
|
||||
print(json.dumps(data, indent=2))
|
||||
|
@ -59,8 +64,20 @@ class TestExpansions(unittest.TestCase):
|
|||
if not isinstance(data, dict):
|
||||
print(json.dumps(data, indent=2))
|
||||
return data
|
||||
for result in data['results']:
|
||||
values = result['values']
|
||||
if values:
|
||||
return values[0] if isinstance(values, list) else values
|
||||
return data['results'][0]['values']
|
||||
|
||||
def test_apiosintds(self):
|
||||
query = {'module': 'apiosintds', 'ip-dst': '185.255.79.90'}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertTrue(self.get_values(response).startswith('185.255.79.90 IS listed by OSINT.digitalside.it.'))
|
||||
except AssertionError:
|
||||
self.assertTrue(self.get_values(response).startswith('185.255.79.90 IS NOT listed by OSINT.digitalside.it.'))
|
||||
|
||||
def test_bgpranking(self):
|
||||
query = {"module": "bgpranking", "AS": "13335"}
|
||||
response = self.misp_modules_post(query)
|
||||
|
@ -69,7 +86,7 @@ class TestExpansions(unittest.TestCase):
|
|||
def test_btc_steroids(self):
|
||||
query = {"module": "btc_steroids", "btc": "1ES14c7qLb5CYhLMUekctxLgc1FV2Ti9DA"}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response)[0].startswith('\n\nAddress:\t1ES14c7qLb5CYhLMUekctxLgc1FV2Ti9DA\nBalance:\t0.0000000000 BTC (+0.0005355700 BTC / -0.0005355700 BTC)'))
|
||||
self.assertTrue(self.get_values(response).startswith('\n\nAddress:\t1ES14c7qLb5CYhLMUekctxLgc1FV2Ti9DA\nBalance:\t0.0000000000 BTC (+0.0005355700 BTC / -0.0005355700 BTC)'))
|
||||
|
||||
def test_btc_scam_check(self):
|
||||
query = {"module": "btc_scam_check", "btc": "1ES14c7qLb5CYhLMUekctxLgc1FV2Ti9DA"}
|
||||
|
@ -80,21 +97,21 @@ class TestExpansions(unittest.TestCase):
|
|||
query = {"module": "countrycode", "domain": "www.circl.lu"}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_values(response), ['Luxembourg'])
|
||||
self.assertEqual(self.get_values(response), 'Luxembourg')
|
||||
except Exception:
|
||||
results = ('http://www.geognos.com/api/en/countries/info/all.json not reachable', 'Unknown',
|
||||
'Not able to get the countrycode references from http://www.geognos.com/api/en/countries/info/all.json')
|
||||
self.assertIn(self.get_values(response), results)
|
||||
|
||||
def test_cve(self):
|
||||
query = {"module": "cve", "vulnerability": "CVE-2010-3333", "config": {"custom_API": "https://cve.circl.lu/api/cve/"}}
|
||||
query = {"module": "cve", "vulnerability": "CVE-2010-4444", "config": {"custom_API": "https://cve.circl.lu/api/cve/"}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response).startswith("Stack-based buffer overflow in Microsoft Office XP SP3, Office 2003 SP3"))
|
||||
self.assertTrue(self.get_values(response).startswith("Unspecified vulnerability in Oracle Sun Java System Access Manager"))
|
||||
|
||||
def test_cve_advanced(self):
|
||||
query = {"module": "cve_advanced",
|
||||
"attribute": {"type": "vulnerability",
|
||||
"value": "CVE-2010-3333",
|
||||
"value": "CVE-2010-4444",
|
||||
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"},
|
||||
"config": {}}
|
||||
response = self.misp_modules_post(query)
|
||||
|
@ -117,7 +134,7 @@ class TestExpansions(unittest.TestCase):
|
|||
def test_dns(self):
|
||||
query = {"module": "dns", "hostname": "www.circl.lu", "config": {"nameserver": "8.8.8.8"}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), ['149.13.33.14'])
|
||||
self.assertEqual(self.get_values(response), '149.13.33.14')
|
||||
|
||||
def test_docx(self):
|
||||
filename = 'test.docx'
|
||||
|
@ -127,6 +144,24 @@ class TestExpansions(unittest.TestCase):
|
|||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), '\nThis is an basic test docx file. ')
|
||||
|
||||
def test_farsight_passivedns(self):
|
||||
module_name = 'farsight_passivedns'
|
||||
if module_name in self.configs:
|
||||
query_types = ('domain', 'ip-src')
|
||||
query_values = ('google.com', '8.8.8.8')
|
||||
results = ('mail.casadostemperos.com.br', 'outmail.wphf.at')
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name, query_type: query_value, 'config': self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertIn(result, self.get_values(response))
|
||||
except Exception:
|
||||
self.assertTrue(self.get_errors(response).startwith('Something went wrong'))
|
||||
else:
|
||||
query = {"module": module_name, "ip-src": "8.8.8.8"}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Farsight DNSDB apikey is missing')
|
||||
|
||||
def test_haveibeenpwned(self):
|
||||
query = {"module": "hibp", "email-src": "info@circl.lu"}
|
||||
response = self.misp_modules_post(query)
|
||||
|
@ -149,6 +184,17 @@ class TestExpansions(unittest.TestCase):
|
|||
entry = self.get_values(response)['response'][key]['asn']
|
||||
self.assertEqual(entry, '13335')
|
||||
|
||||
def test_macaddess_io(self):
|
||||
module_name = 'macaddress_io'
|
||||
query = {"module": module_name, "mac-address": "44:38:39:ff:ef:57"}
|
||||
if module_name in self.configs:
|
||||
query["config"] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response)['Valid MAC address'], 'True')
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Authorization required')
|
||||
|
||||
def test_macvendors(self):
|
||||
query = {"module": "macvendors", "mac-address": "FC-A1-3E-2A-1C-33"}
|
||||
response = self.misp_modules_post(query)
|
||||
|
@ -178,20 +224,63 @@ class TestExpansions(unittest.TestCase):
|
|||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), 'odt test')
|
||||
|
||||
def test_onyphe(self):
|
||||
module_name = "onyphe"
|
||||
query = {"module": module_name, "ip-src": "8.8.8.8"}
|
||||
if module_name in self.configs:
|
||||
query["config"] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertTrue(self.get_values(response).startswith('https://pastebin.com/raw/'))
|
||||
except Exception:
|
||||
self.assertEqual(self.get_errors(response), 'no more credits')
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Onyphe authentication is missing')
|
||||
|
||||
def test_onyphe_full(self):
|
||||
module_name = "onyphe_full"
|
||||
query = {"module": module_name, "ip-src": "8.8.8.8"}
|
||||
if module_name in self.configs:
|
||||
query["config"] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_values(response), '37.7510,-97.8220')
|
||||
except Exception:
|
||||
self.assertTrue(self.get_errors(response).startswith('Error '))
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Onyphe authentication is missing')
|
||||
|
||||
def test_otx(self):
|
||||
query_types = ('domain', 'ip-src', 'md5')
|
||||
query_values = ('circl.lu', '8.8.8.8', '616eff3e9a7575ae73821b4668d2801c')
|
||||
results = ('149.13.33.14', 'ffc2595aefa80b61621023252b5f0ccb22b6e31d7f1640913cd8ff74ddbd8b41',
|
||||
results = (('149.13.33.14', '149.13.33.17'),
|
||||
'ffc2595aefa80b61621023252b5f0ccb22b6e31d7f1640913cd8ff74ddbd8b41',
|
||||
'8.8.8.8')
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": "otx", query_type: query_value, "config": {"apikey": "1"}}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertTrue(self.get_values(response), [result])
|
||||
self.assertIn(self.get_values(response), result)
|
||||
except KeyError:
|
||||
# Empty results, which in this case comes from a connection error
|
||||
continue
|
||||
|
||||
def test_passivetotal(self):
|
||||
module_name = "passivetotal"
|
||||
query = {"module": module_name, "ip-src": "149.13.33.14"}
|
||||
if module_name in self.configs:
|
||||
query["config"] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_values(response), 'circl.lu')
|
||||
except Exception:
|
||||
self.assertIn(self.get_errors(response), ('We hit an error, time to bail!', 'API quota exceeded.'))
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Configuration is missing from the request.')
|
||||
|
||||
def test_pdf(self):
|
||||
filename = 'test.pdf'
|
||||
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
|
||||
|
@ -208,6 +297,14 @@ class TestExpansions(unittest.TestCase):
|
|||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), '\npptx test\n')
|
||||
|
||||
def test_qrcode(self):
|
||||
filename = 'qrcode.jpeg'
|
||||
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
|
||||
encoded = b64encode(f.read()).decode()
|
||||
query = {"module": "qrcode", "attachment": filename, "data": encoded}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), '1GXZ6v7FZzYBEnoRaG77SJxhu7QkvQmFuh')
|
||||
|
||||
def test_rbl(self):
|
||||
query = {"module": "rbl", "ip-src": "8.8.8.8"}
|
||||
response = self.misp_modules_post(query)
|
||||
|
@ -219,7 +316,36 @@ class TestExpansions(unittest.TestCase):
|
|||
def test_reversedns(self):
|
||||
query = {"module": "reversedns", "ip-src": "8.8.8.8"}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), ['dns.google.'])
|
||||
self.assertEqual(self.get_values(response), 'dns.google.')
|
||||
|
||||
def test_securitytrails(self):
|
||||
module_name = "securitytrails"
|
||||
query_types = ('ip-src', 'domain')
|
||||
query_values = ('149.13.33.14', 'circl.lu')
|
||||
results = ('circl.lu', 'ns4.eurodns.com')
|
||||
if module_name in self.configs:
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name, query_type: query_value, "config": self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_values(response), result)
|
||||
except Exception:
|
||||
self.assertTrue(self.get_errors(response).startswith("You've exceeded the usage limits for your account."))
|
||||
else:
|
||||
query = {"module": module_name, query_values[0]: query_types[0]}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'SecurityTrails authentication is missing')
|
||||
|
||||
def test_shodan(self):
|
||||
module_name = "shodan"
|
||||
query = {"module": module_name, "ip-src": "149.13.33.14"}
|
||||
if module_name in self.configs:
|
||||
query['config'] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertIn("circl.lu", self.get_values(response))
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Shodan authentication is missing')
|
||||
|
||||
def test_sigma_queries(self):
|
||||
query = {"module": "sigma_queries", "sigma": self.sigma_rule}
|
||||
|
@ -250,7 +376,7 @@ class TestExpansions(unittest.TestCase):
|
|||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": "threatcrowd", query_type: query_value}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response), [result])
|
||||
self.assertTrue(self.get_values(response), result)
|
||||
|
||||
def test_threatminer(self):
|
||||
query_types = ('domain', 'ip-src', 'md5')
|
||||
|
@ -259,7 +385,100 @@ class TestExpansions(unittest.TestCase):
|
|||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": "threatminer", query_type: query_value}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response)[0], result)
|
||||
self.assertTrue(self.get_values(response), result)
|
||||
|
||||
def test_urlhaus(self):
|
||||
query_types = ('domain', 'ip-src', 'sha256', 'url')
|
||||
query_values = ('www.bestwpdesign.com', '79.118.195.239',
|
||||
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
|
||||
'http://79.118.195.239:1924/.i')
|
||||
results = ('url', 'url', 'virustotal-report', 'virustotal-report')
|
||||
for query_type, query_value, result in zip(query_types[:2], query_values[:2], results[:2]):
|
||||
query = {"module": "urlhaus",
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value,
|
||||
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_attribute(response), result)
|
||||
for query_type, query_value, result in zip(query_types[2:], query_values[2:], results[2:]):
|
||||
query = {"module": "urlhaus",
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value,
|
||||
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_object(response), result)
|
||||
|
||||
def test_urlscan(self):
|
||||
module_name = "urlscan"
|
||||
query = {"module": module_name, "url": "https://circl.lu/team"}
|
||||
if module_name in self.configs:
|
||||
query['config'] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), 'circl.lu')
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), 'Urlscan apikey is missing')
|
||||
|
||||
def test_virustotal_public(self):
|
||||
module_name = "virustotal_public"
|
||||
query_types = ('domain', 'ip-src', 'sha256', 'url')
|
||||
query_values = ('circl.lu', '149.13.33.14',
|
||||
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
|
||||
'http://194.169.88.56:49151/.i')
|
||||
results = ('whois', 'asn', 'file', 'virustotal-report')
|
||||
if module_name in self.configs:
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value},
|
||||
"config": self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_object(response), result)
|
||||
except Exception:
|
||||
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
|
||||
else:
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_types[0],
|
||||
"value": query_values[0]}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), "A VirusTotal api key is required for this module.")
|
||||
|
||||
def test_virustotal(self):
|
||||
module_name = "virustotal"
|
||||
query_types = ('domain', 'ip-src', 'sha256', 'url')
|
||||
query_values = ('circl.lu', '149.13.33.14',
|
||||
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
|
||||
'http://194.169.88.56:49151/.i')
|
||||
results = ('domain-ip', 'asn', 'virustotal-report', 'virustotal-report')
|
||||
if module_name in self.configs:
|
||||
for query_type, query_value, result in zip(query_types, query_values, results):
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_type,
|
||||
"value": query_value},
|
||||
"config": self.configs[module_name]}
|
||||
response = self.misp_modules_post(query)
|
||||
try:
|
||||
self.assertEqual(self.get_object(response), result)
|
||||
except Exception:
|
||||
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
|
||||
else:
|
||||
query = {"module": module_name,
|
||||
"attribute": {"type": query_types[0],
|
||||
"value": query_values[0]}}
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), "A VirusTotal api key is required for this module.")
|
||||
|
||||
def test_vulners(self):
|
||||
module_name = "vulners"
|
||||
query = {"module": module_name, "vulnerability": "CVE-2010-3333"}
|
||||
if module_name in self.configs:
|
||||
query['config'] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_values(response).endswith('"RTF Stack Buffer Overflow Vulnerability."'))
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_errors(response), "A Vulners api key is required for this module.")
|
||||
|
||||
def test_wikidata(self):
|
||||
query = {"module": "wiki", "text": "Google"}
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 86 KiB |
Loading…
Reference in New Issue