mirror of https://github.com/MISP/misp-modules
Fix STIX import module
parent
c676587461
commit
5624104b77
|
@ -28,7 +28,7 @@ def handler(q=False):
|
||||||
q = json.loads(q)
|
q = json.loads(q)
|
||||||
|
|
||||||
# It's b64 encoded, so decode that stuff
|
# It's b64 encoded, so decode that stuff
|
||||||
package = str(base64.b64decode(q.get("data", None)), 'utf-8')
|
package = base64.b64decode(q.get("data")).decode('utf-8')
|
||||||
|
|
||||||
# If something really weird happened
|
# If something really weird happened
|
||||||
if not package:
|
if not package:
|
||||||
|
@ -168,6 +168,9 @@ def buildObservable(o):
|
||||||
# May as well be useless
|
# May as well be useless
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
if not o.get('object'):
|
||||||
|
return r
|
||||||
|
|
||||||
props = o["object"]["properties"]
|
props = o["object"]["properties"]
|
||||||
|
|
||||||
# If it has an address_value field, it's gonna be an address
|
# If it has an address_value field, it's gonna be an address
|
||||||
|
@ -195,7 +198,7 @@ def buildObservable(o):
|
||||||
for hsh in props["hashes"]:
|
for hsh in props["hashes"]:
|
||||||
r["values"].append(hsh["simple_hash_value"]["value"])
|
r["values"].append(hsh["simple_hash_value"]["value"])
|
||||||
r["types"] = identifyHash(hsh["simple_hash_value"]["value"])
|
r["types"] = identifyHash(hsh["simple_hash_value"]["value"])
|
||||||
|
|
||||||
elif "xsi:type" in props:
|
elif "xsi:type" in props:
|
||||||
# Cybox. Ew.
|
# Cybox. Ew.
|
||||||
try:
|
try:
|
||||||
|
@ -208,7 +211,7 @@ def buildObservable(o):
|
||||||
else:
|
else:
|
||||||
print("Ignoring {}".format(type_))
|
print("Ignoring {}".format(type_))
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
@ -7,6 +7,7 @@ import base64
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
class TestModules(unittest.TestCase):
|
class TestModules(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -32,13 +33,14 @@ class TestModules(unittest.TestCase):
|
||||||
print(response.json())
|
print(response.json())
|
||||||
|
|
||||||
def test_stix(self):
|
def test_stix(self):
|
||||||
with open("tests/stix.xml", "r") as f:
|
with open("tests/stix.xml", "rb") as f:
|
||||||
data = json.dumps({"module":"stiximport",
|
content = base64.b64encode(f.read())
|
||||||
"data":str(base64.b64encode(bytes(f.read(), 'utf-8')), 'utf-8'),
|
data = json.dumps({"module": "stiximport",
|
||||||
"config": {"max_size": "15000"},
|
"data": content.decode('utf-8'),
|
||||||
})
|
"config": {"max_size": "15000"},
|
||||||
|
})
|
||||||
response = requests.post(self.url + "query", data=data)
|
response = requests.post(self.url + "query", data=data)
|
||||||
print(response.json())
|
print('STIX', response.json())
|
||||||
|
|
||||||
def test_virustotal(self):
|
def test_virustotal(self):
|
||||||
# This can't actually be tested without disclosing a private
|
# This can't actually be tested without disclosing a private
|
||||||
|
@ -46,11 +48,11 @@ class TestModules(unittest.TestCase):
|
||||||
# and pass if it can't find one
|
# and pass if it can't find one
|
||||||
|
|
||||||
if not os.path.exists("tests/bodyvirustotal.json"):
|
if not os.path.exists("tests/bodyvirustotal.json"):
|
||||||
return
|
return
|
||||||
|
|
||||||
with open("tests/bodyvirustotal.json", "r") as f:
|
with open("tests/bodyvirustotal.json", "r") as f:
|
||||||
response = requests.post(self.url + "query", data=f.read()).json()
|
response = requests.post(self.url + "query", data=f.read()).json()
|
||||||
assert(response)
|
assert(response)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue