Moved to misp_stix_converter

pull/72/head
Hannah Ward 2016-11-21 10:59:30 +00:00
parent 5624104b77
commit c567d1e6f2
No known key found for this signature in database
GPG Key ID: 6F3BAD60DE190290
3 changed files with 22 additions and 207 deletions

View File

@ -1,19 +1,21 @@
import json import json
from stix.core import STIXPackage
import re import re
import base64 import base64
import hashlib import hashlib
import tempfile import tempfile
import os
from pymisp.tools import stix
misperrors = {'error': 'Error'} misperrors = {'error': 'Error'}
userConfig = {} userConfig = {}
inputSource = ['file'] inputSource = ['file']
moduleinfo = {'version': '0.1', 'author': 'Hannah Ward', moduleinfo = {'version': '0.2', 'author': 'Hannah Ward',
'description': 'Import some stix stuff', 'description': 'Import some stix stuff',
'module-type': ['import']} 'module-type': ['import']}
moduleconfig = ["max_size"] moduleconfig = []
def handler(q=False): def handler(q=False):
@ -34,211 +36,18 @@ def handler(q=False):
if not package: if not package:
return json.dumps({"success": 0}) return json.dumps({"success": 0})
# Get the maxsize from the config tfile = tempfile.NamedTemporaryFile(mode="w", prefix="STIX", delete=False)
# Default to 10MB tfile.write(package)
# (I believe the max_size arg is given in bytes) tfile.close()
# Check if we were given a configuration
memsize = q.get("config", None)
# If we were, find out if there's a memsize field pkg = stix.load_stix(tfile.name)
if memsize:
memsize = memsize.get("max_size", 10 * 1024)
else:
memsize = 10 * 1024
# Load up the package into STIX for attrib in pkg.attributes:
package = loadPackage(package, memsize) r["results"].append({ "values" : [attrib.value] , "types": [attrib.type], "categories": [attrib.category]})
# Build all the observables os.unlink(tfile.name)
if package.observables:
for obs in package.observables:
r["results"].append(buildObservable(obs))
# And now the threat actors
if package.threat_actors:
for ta in package.threat_actors:
r["results"].append(buildActor(ta))
# Aaaand the indicators
if package.indicators:
for ind in package.indicators:
r["results"] += buildIndicator(ind)
# Are you seeing a pattern?
if package.exploit_targets:
for et in package.exploit_targets:
r["results"].append(buildExploitTarget(et))
# LOADING STUFF
if package.campaigns:
for cpn in package.campaigns:
r["results"].append(buildCampaign(cpn))
# Clean up results
# Don't send on anything that didn't have a value
r["results"] = [x for x in r["results"] if isinstance(x, dict) and len(x["values"]) != 0]
return r return r
# Quick and dirty regex for IP addresses
ipre = re.compile("([0-9]{1,3}.){3}[0-9]{1,3}")
def buildCampaign(cpn):
"""
Extract a campaign name
"""
return {"values": [cpn.title], "types": ["campaign-name"]}
def buildExploitTarget(et):
"""
Extract CVEs from exploit targets
"""
r = {"values": [], "types": ["vulnerability"]}
if et.vulnerabilities:
for v in et.vulnerabilities:
if v.cve_id:
r["values"].append(v.cve_id)
return r
def identifyHash(hsh):
"""
What's that hash!?
"""
possible_hashes = []
hashes = [x for x in hashlib.algorithms_guaranteed]
for h in hashes:
if len(str(hsh)) == len(hashlib.new(h).hexdigest()):
possible_hashes.append(h)
possible_hashes.append("filename|{}".format(h))
return possible_hashes
def buildIndicator(ind):
"""
Extract hashes
and other fun things
like that
"""
r = []
# Try to get hashes. I hate stix
if ind.observables:
for i in ind.observables:
if i.observable_composition:
for j in i.observable_composition.observables:
r.append(buildObservable(j))
r.append(buildObservable(i))
return r
def buildActor(ta):
"""
Extract the name
and comment of a
threat actor
"""
r = {"values": [ta.title], "types": ["threat-actor"]}
return r
def buildObservable(o):
"""
Take a STIX observable
and extract the value
and category
"""
# Life is easier with json
if not isinstance(o, dict):
o = json.loads(o.to_json())
# Make a new record to store values in
r = {"values": []}
# Get the object properties. This contains all the
# fun stuff like values
if "observable_composition" in o:
# May as well be useless
return r
if not o.get('object'):
return r
props = o["object"]["properties"]
# If it has an address_value field, it's gonna be an address
# Kinda obvious really
if "address_value" in props:
# We've got ourselves a nice little address
value = props["address_value"]
if isinstance(value, dict):
# Sometimes it's embedded in a dictionary
value = value["value"]
# Is it an IP?
if ipre.match(str(value)):
# Yes!
r["values"].append(value)
r["types"] = ["ip-src", "ip-dst"]
else:
# Probably a domain yo
r["values"].append(value)
r["types"] = ["domain", "hostname"]
if "hashes" in props:
for hsh in props["hashes"]:
r["values"].append(hsh["simple_hash_value"]["value"])
r["types"] = identifyHash(hsh["simple_hash_value"]["value"])
elif "xsi:type" in props:
# Cybox. Ew.
try:
type_ = props["xsi:type"]
val = props["value"]
if type_ == "LinkObjectType":
r["types"] = ["link"]
r["values"].append(val)
else:
print("Ignoring {}".format(type_))
except:
pass
return r
def loadPackage(data, memsize=1024):
# Write the stix package to a tmp file
temp = tempfile.SpooledTemporaryFile(max_size=int(memsize), mode="w+")
temp.write(data)
# Back to the beginning so we can read it again
temp.seek(0)
try:
# Try loading it into every format we know of
try:
package = STIXPackage().from_xml(temp)
except:
# We have to seek back again
temp.seek(0)
package = STIXPackage().from_json(temp)
except Exception:
print("Failed to load package")
raise ValueError("COULD NOT LOAD STIX PACKAGE!")
temp.close()
return package
def introspection(): def introspection():
modulesetup = {} modulesetup = {}
try: try:

View File

@ -38,5 +38,6 @@ setup(
'pillow', 'pillow',
'pytesseract', 'pytesseract',
'shodan', 'shodan',
] 'misp_stix_converter'
],
) )

View File

@ -37,10 +37,15 @@ class TestModules(unittest.TestCase):
content = base64.b64encode(f.read()) content = base64.b64encode(f.read())
data = json.dumps({"module": "stiximport", data = json.dumps({"module": "stiximport",
"data": content.decode('utf-8'), "data": content.decode('utf-8'),
"config": {"max_size": "15000"},
}) })
response = requests.post(self.url + "query", data=data) response = requests.post(self.url + "query", data=data).json()
print('STIX', response.json())
print("STIX :: {}".format(response))
values = [x["values"][0] for x in response["results"]]
assert("209.239.79.47" in values)
assert("41.213.121.180" in values)
assert("eu-society.com" in values)
def test_virustotal(self): def test_virustotal(self):
# This can't actually be tested without disclosing a private # This can't actually be tested without disclosing a private