2016-11-15 16:47:17 +01:00
|
|
|
#!/usr/bin/env python3
|
2016-06-18 07:53:26 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
import requests
|
2016-08-12 13:16:49 +02:00
|
|
|
import base64
|
|
|
|
import json
|
2016-08-17 14:01:11 +02:00
|
|
|
import os
|
2016-11-22 11:36:46 +01:00
|
|
|
from pymisp import MISPEvent, EncodeUpdate
|
2016-06-18 07:53:26 +02:00
|
|
|
|
2016-11-15 16:47:17 +01:00
|
|
|
|
2016-06-18 07:53:26 +02:00
|
|
|
class TestModules(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.maxDiff = None
|
|
|
|
self.headers = {'Content-Type': 'application/json'}
|
2016-08-12 13:16:49 +02:00
|
|
|
self.url = "http://127.0.0.1:6666/"
|
2016-06-18 07:53:26 +02:00
|
|
|
|
|
|
|
def test_introspection(self):
|
2016-08-12 13:16:49 +02:00
|
|
|
response = requests.get(self.url + "modules")
|
2016-06-18 07:53:26 +02:00
|
|
|
print(response.json())
|
|
|
|
|
|
|
|
def test_cve(self):
|
|
|
|
with open('tests/bodycve.json', 'r') as f:
|
2016-08-12 13:16:49 +02:00
|
|
|
response = requests.post(self.url + "query", data=f.read())
|
2016-06-18 07:53:26 +02:00
|
|
|
print(response.json())
|
|
|
|
|
|
|
|
def test_dns(self):
|
|
|
|
with open('tests/body.json', 'r') as f:
|
2016-08-12 13:16:49 +02:00
|
|
|
response = requests.post(self.url + "query", data=f.read())
|
|
|
|
print(response.json())
|
2016-08-25 17:36:28 +02:00
|
|
|
with open('tests/body_timeout.json', 'r') as f:
|
|
|
|
response = requests.post(self.url + "query", data=f.read())
|
|
|
|
print(response.json())
|
2016-08-12 13:16:49 +02:00
|
|
|
|
|
|
|
def test_stix(self):
|
2016-11-15 16:47:17 +01:00
|
|
|
with open("tests/stix.xml", "rb") as f:
|
|
|
|
content = base64.b64encode(f.read())
|
|
|
|
data = json.dumps({"module": "stiximport",
|
|
|
|
"data": content.decode('utf-8'),
|
|
|
|
"config": {"max_size": "15000"},
|
|
|
|
})
|
2016-08-12 13:16:49 +02:00
|
|
|
response = requests.post(self.url + "query", data=data)
|
2016-11-15 16:47:17 +01:00
|
|
|
print('STIX', response.json())
|
2016-08-12 13:16:49 +02:00
|
|
|
|
2016-08-17 14:01:11 +02:00
|
|
|
def test_virustotal(self):
|
|
|
|
# This can't actually be tested without disclosing a private
|
|
|
|
# API key. This will attempt to run with a .gitignored keyfile
|
|
|
|
# and pass if it can't find one
|
|
|
|
|
|
|
|
if not os.path.exists("tests/bodyvirustotal.json"):
|
2016-11-15 16:47:17 +01:00
|
|
|
return
|
|
|
|
|
2016-08-17 14:01:11 +02:00
|
|
|
with open("tests/bodyvirustotal.json", "r") as f:
|
2016-11-15 16:47:17 +01:00
|
|
|
response = requests.post(self.url + "query", data=f.read()).json()
|
2016-08-17 14:01:11 +02:00
|
|
|
assert(response)
|
|
|
|
|
2016-11-22 11:36:46 +01:00
|
|
|
def test_sign(self):
|
|
|
|
event = MISPEvent()
|
|
|
|
event.load('tests/57c4445b-c548-4654-af0b-4be3950d210f.json')
|
|
|
|
data = {'module': 'sign',
|
|
|
|
'config': {'uid': '5832bfa8-76d0-4bdb-a221-46fa950d210f', 'passphrase': 'misptestorg'},
|
|
|
|
'mispevent': json.dumps(event, cls=EncodeUpdate)}
|
2016-11-22 15:07:05 +01:00
|
|
|
try:
|
|
|
|
signed_event = requests.post(self.url + "query", data=json.dumps(data)).json()
|
|
|
|
event.load(signed_event)
|
|
|
|
data = {'module': 'verify',
|
|
|
|
'config': {'uid': '5832bfa8-76d0-4bdb-a221-46fa950d210f'},
|
|
|
|
'mispevent': json.dumps(event, cls=EncodeUpdate)}
|
|
|
|
verified = requests.post(self.url + "query", data=json.dumps(data))
|
|
|
|
assert(verified)
|
|
|
|
except:
|
|
|
|
pass
|
2016-11-22 11:36:46 +01:00
|
|
|
|
2016-08-12 13:16:49 +02:00
|
|
|
if __name__ == '__main__':
|
2016-11-15 16:47:17 +01:00
|
|
|
unittest.main()
|