Merge pull request #342 from MISP/tests

More expansion tests
pull/343/head
Christian Studer 3 years ago committed by GitHub
commit d2b92f8ad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .travis.yml
  2. 4
      misp_modules/modules/expansion/__init__.py
  3. 0
      misp_modules/modules/expansion/docx_enrich.py
  4. 0
      misp_modules/modules/expansion/ocr_enrich.py
  5. 0
      misp_modules/modules/expansion/ods_enrich.py
  6. 0
      misp_modules/modules/expansion/odt_enrich.py
  7. 0
      misp_modules/modules/expansion/pdf_enrich.py
  8. 0
      misp_modules/modules/expansion/pptx_enrich.py
  9. 0
      misp_modules/modules/expansion/xlsx_enrich.py
  10. 109
      tests/test_expansions.py
  11. BIN
      tests/test_files/misp-logo.png
  12. BIN
      tests/test_files/test.ods
  13. BIN
      tests/test_files/test.odt
  14. BIN
      tests/test_files/test.pdf
  15. BIN
      tests/test_files/test.pptx
  16. BIN
      tests/test_files/test.xlsx

@ -14,7 +14,7 @@ before_install:
- docker build -t misp-modules --build-arg BUILD_DATE=$(date -u +"%Y-%m-%d") docker/
install:
- sudo apt-get install libzbar0 libzbar-dev libpoppler-cpp-dev
- sudo apt-get install libzbar0 libzbar-dev libpoppler-cpp-dev tesseract-ocr
- pip install pipenv
- pipenv install --dev

@ -12,6 +12,6 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c
'xforceexchange', 'sigma_syntax_validator', 'stix2_pattern_syntax_validator',
'sigma_queries', 'dbl_spamhaus', 'vulners', 'yara_query', 'macaddress_io',
'intel471', 'backscatter_io', 'btc_scam_check', 'hibp', 'greynoise', 'macvendors',
'qrcode', 'ocr-enrich', 'pdf-enrich', 'docx-enrich', 'xlsx-enrich', 'pptx-enrich',
'ods-enrich', 'odt-enrich', 'joesandbox_submit', 'joesandbox_query', 'urlhaus',
'qrcode', 'ocr_enrich', 'pdf_enrich', 'docx_enrich', 'xlsx_enrich', 'pptx_enrich',
'ods_enrich', 'odt_enrich', 'joesandbox_submit', 'joesandbox_query', 'urlhaus',
'virustotal_public']

@ -4,7 +4,9 @@
import unittest
import requests
from urllib.parse import urljoin
from base64 import b64encode
import json
import os
class TestExpansions(unittest.TestCase):
@ -13,11 +15,19 @@ class TestExpansions(unittest.TestCase):
self.maxDiff = None
self.headers = {'Content-Type': 'application/json'}
self.url = "http://127.0.0.1:6666/"
self.dirname = os.path.dirname(os.path.realpath(__file__))
self.sigma_rule = "title: Antivirus Web Shell Detection\r\ndescription: Detects a highly relevant Antivirus alert that reports a web shell\r\ndate: 2018/09/09\r\nmodified: 2019/10/04\r\nauthor: Florian Roth\r\nreferences:\r\n - https://www.nextron-systems.com/2018/09/08/antivirus-event-analysis-cheat-sheet-v1-4/\r\ntags:\r\n - attack.persistence\r\n - attack.t1100\r\nlogsource:\r\n product: antivirus\r\ndetection:\r\n selection:\r\n Signature: \r\n - \"PHP/Backdoor*\"\r\n - \"JSP/Backdoor*\"\r\n - \"ASP/Backdoor*\"\r\n - \"Backdoor.PHP*\"\r\n - \"Backdoor.JSP*\"\r\n - \"Backdoor.ASP*\"\r\n - \"*Webshell*\"\r\n condition: selection\r\nfields:\r\n - FileName\r\n - User\r\nfalsepositives:\r\n - Unlikely\r\nlevel: critical"
def misp_modules_post(self, query):
return requests.post(urljoin(self.url, "query"), json=query)
def get_data(self, response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
return data
return data['results'][0]['data']
def get_errors(self, response):
data = response.json()
if not isinstance(data, dict):
@ -78,6 +88,14 @@ class TestExpansions(unittest.TestCase):
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), ['149.13.33.14'])
def test_docx(self):
filename = 'test.docx'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "docx_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), '\nThis is an basic test docx file. ')
def test_haveibeenpwned(self):
query = {"module": "hibp", "email-src": "info@circl.lu"}
response = self.misp_modules_post(query)
@ -89,7 +107,9 @@ class TestExpansions(unittest.TestCase):
def test_greynoise(self):
query = {"module": "greynoise", "ip-dst": "1.1.1.1"}
response = self.misp_modules_post(query)
self.assertTrue(self.get_values(response).startswith('{"ip":"1.1.1.1","status":"ok"'))
value = self.get_values(response)
if value != 'GreyNoise API not accessible (HTTP 429)':
self.assertTrue(value.startswith('{"ip":"1.1.1.1","status":"ok"'))
def test_ipasn(self):
query = {"module": "ipasn", "ip-dst": "1.1.1.1"}
@ -103,6 +123,60 @@ class TestExpansions(unittest.TestCase):
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), 'Samsung Electronics Co.,Ltd')
def test_ocr(self):
filename = 'misp-logo.png'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "ocr_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), 'Threat Sharing')
def test_ods(self):
filename = 'test.ods'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "ods_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), '\n column_0\n0 ods test')
def test_odt(self):
filename = 'test.odt'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "odt_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), 'odt test')
def test_otx(self):
query_types = ('domain', 'ip-src', 'md5')
query_values = ('circl.lu', '8.8.8.8', '616eff3e9a7575ae73821b4668d2801c')
results = ('149.13.33.14', 'ffc2595aefa80b61621023252b5f0ccb22b6e31d7f1640913cd8ff74ddbd8b41',
'8.8.8.8')
for query_type, query_value, result in zip(query_types, query_values, results):
query = {"module": "otx", query_type: query_value, "config": {"apikey": "1"}}
response = self.misp_modules_post(query)
try:
self.assertTrue(self.get_values(response), [result])
except KeyError:
# Empty results, which in this case comes from a connection error
continue
def test_pdf(self):
filename = 'test.pdf'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "pdf_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), 'Pdf test')
def test_pptx(self):
filename = 'test.pptx'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "pptx_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), '\npptx test\n')
def test_rbl(self):
query = {"module": "rbl", "ip-src": "8.8.8.8"}
response = self.misp_modules_post(query)
@ -126,11 +200,36 @@ class TestExpansions(unittest.TestCase):
response = self.misp_modules_post(query)
self.assertTrue(self.get_values(response).startswith('Syntax valid:'))
def test_sourcecache(self):
input_value = "https://www.misp-project.org/feeds/"
query = {"module": "sourcecache", "link": input_value}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), input_value)
self.assertTrue(self.get_data(response).startswith('PCFET0NUWVBFIEhUTUw+CjwhLS0KCUFyY2FuYSBieSBIVE1MN'))
def test_stix2_pattern_validator(self):
query = {"module": "stix2_pattern_syntax_validator", "stix2-pattern": "[ipv4-addr:value = '8.8.8.8']"}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), 'Syntax valid')
def test_threatcrowd(self):
query_types = ('domain', 'ip-src', 'md5', 'whois-registrant-email')
query_values = ('circl.lu', '149.13.33.4', '616eff3e9a7575ae73821b4668d2801c', 'hostmaster@eurodns.com')
results = ('149.13.33.14', 'cve.circl.lu', 'devilreturns.com', 'navabi.lu')
for query_type, query_value, result in zip(query_types, query_values, results):
query = {"module": "threatcrowd", query_type: query_value}
response = self.misp_modules_post(query)
self.assertTrue(self.get_values(response), [result])
def test_threatminer(self):
query_types = ('domain', 'ip-src', 'md5')
query_values = ('circl.lu', '149.13.33.4', 'b538dbc6160ef54f755a540e06dc27cd980fc4a12005e90b3627febb44a1a90f')
results = ('149.13.33.14', 'f6ecb9d5c21defb1f622364a30cb8274f817a1a2', 'http://www.circl.lu/')
for query_type, query_value, result in zip(query_types, query_values, results):
query = {"module": "threatminer", query_type: query_value}
response = self.misp_modules_post(query)
self.assertTrue(self.get_values(response)[0], result)
def test_wikidata(self):
query = {"module": "wiki", "text": "Google"}
response = self.misp_modules_post(query)
@ -141,6 +240,14 @@ class TestExpansions(unittest.TestCase):
except Exception:
self.assertEqual(self.get_values(response), 'No additional data found on Wikidata')
def test_xlsx(self):
filename = 'test.xlsx'
with open(f'{self.dirname}/test_files/{filename}', 'rb') as f:
encoded = b64encode(f.read()).decode()
query = {"module": "xlsx_enrich", "attachment": filename, "data": encoded}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), ' header\n0 xlsx test')
def test_yara_query(self):
query = {"module": "yara_query", "md5": "b2a5abfeef9e36964281a31e17b57c97"}
response = self.misp_modules_post(query)

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.
Loading…
Cancel
Save