misp-modules/misp_modules/modules/import_mod/stiximport.py

260 lines
6.2 KiB
Python
Raw Normal View History

import json
from stix.core import STIXPackage
import re
import base64
import hashlib
import tempfile
misperrors = {'error': 'Error'}
userConfig = {}
inputSource = ['file']
moduleinfo = {'version': '0.1', 'author': 'Hannah Ward',
'description': 'Import some stix stuff',
'module-type': ['import']}
moduleconfig = ["max_size"]
def handler(q=False):
2016-08-12 14:09:59 +02:00
# Just in case we have no data
if q is False:
return False
2016-08-11 17:37:29 +02:00
2016-08-12 14:09:59 +02:00
# The return value
r = {'results': []}
2016-08-11 17:37:29 +02:00
2016-08-12 14:09:59 +02:00
# Load up that JSON
q = json.loads(q)
2016-08-11 17:37:29 +02:00
2016-08-12 14:09:59 +02:00
# It's b64 encoded, so decode that stuff
2016-11-18 18:09:58 +01:00
package = base64.b64decode(q.get("data")).decode('utf-8')
2016-08-12 14:09:59 +02:00
# If something really weird happened
if not package:
2016-08-12 14:09:59 +02:00
return json.dumps({"success": 0})
# Get the maxsize from the config
# Default to 10MB
# (I believe the max_size arg is given in bytes)
# Check if we were given a configuration
memsize = q.get("config", None)
# If we were, find out if there's a memsize field
if memsize:
memsize = memsize.get("max_size", 10 * 1024)
else:
memsize = 10 * 1024
2016-08-12 14:09:59 +02:00
# Load up the package into STIX
package = loadPackage(package, memsize)
2016-08-11 17:37:29 +02:00
2016-08-12 14:09:59 +02:00
# Build all the observables
if package.observables:
2016-08-12 14:09:59 +02:00
for obs in package.observables:
r["results"].append(buildObservable(obs))
2016-08-12 11:06:53 +02:00
# And now the threat actors
2016-08-12 11:06:53 +02:00
if package.threat_actors:
2016-08-12 14:09:59 +02:00
for ta in package.threat_actors:
r["results"].append(buildActor(ta))
# Aaaand the indicators
if package.indicators:
2016-08-12 14:09:59 +02:00
for ind in package.indicators:
r["results"] += buildIndicator(ind)
2016-08-12 12:34:43 +02:00
# Are you seeing a pattern?
2016-08-12 12:34:43 +02:00
if package.exploit_targets:
2016-08-12 14:09:59 +02:00
for et in package.exploit_targets:
r["results"].append(buildExploitTarget(et))
2016-08-12 12:34:43 +02:00
# LOADING STUFF
2016-08-12 12:34:43 +02:00
if package.campaigns:
2016-08-12 14:09:59 +02:00
for cpn in package.campaigns:
r["results"].append(buildCampaign(cpn))
2016-08-12 14:09:59 +02:00
# Clean up results
# Don't send on anything that didn't have a value
r["results"] = [x for x in r["results"] if isinstance(x, dict) and len(x["values"]) != 0]
return r
2016-08-12 14:09:59 +02:00
# Quick and dirty regex for IP addresses
ipre = re.compile("([0-9]{1,3}.){3}[0-9]{1,3}")
2016-08-11 17:37:29 +02:00
2016-08-12 14:09:59 +02:00
2016-08-12 12:34:43 +02:00
def buildCampaign(cpn):
2016-08-12 14:09:59 +02:00
"""
Extract a campaign name
"""
return {"values": [cpn.title], "types": ["campaign-name"]}
2016-08-12 12:34:43 +02:00
def buildExploitTarget(et):
2016-08-12 14:09:59 +02:00
"""
Extract CVEs from exploit targets
"""
2016-08-12 12:34:43 +02:00
2016-08-12 14:09:59 +02:00
r = {"values": [], "types": ["vulnerability"]}
2016-08-12 12:34:43 +02:00
2016-08-12 14:09:59 +02:00
if et.vulnerabilities:
for v in et.vulnerabilities:
if v.cve_id:
r["values"].append(v.cve_id)
return r
2016-08-12 12:34:43 +02:00
def identifyHash(hsh):
2016-08-12 14:09:59 +02:00
"""
What's that hash!?
"""
possible_hashes = []
2016-08-12 14:09:59 +02:00
hashes = [x for x in hashlib.algorithms_guaranteed]
2016-08-12 14:09:59 +02:00
for h in hashes:
if len(str(hsh)) == len(hashlib.new(h).hexdigest()):
possible_hashes.append(h)
possible_hashes.append("filename|{}".format(h))
return possible_hashes
def buildIndicator(ind):
2016-08-12 14:09:59 +02:00
"""
Extract hashes
and other fun things
like that
"""
r = []
2016-08-12 14:09:59 +02:00
# Try to get hashes. I hate stix
if ind.observables:
for i in ind.observables:
if i.observable_composition:
for j in i.observable_composition.observables:
r.append(buildObservable(j))
r.append(buildObservable(i))
2016-08-12 14:09:59 +02:00
return r
2016-08-12 11:06:53 +02:00
def buildActor(ta):
2016-08-12 14:09:59 +02:00
"""
Extract the name
and comment of a
threat actor
"""
r = {"values": [ta.title], "types": ["threat-actor"]}
return r
2016-08-12 11:06:53 +02:00
def buildObservable(o):
2016-08-12 14:09:59 +02:00
"""
Take a STIX observable
and extract the value
and category
"""
# Life is easier with json
if not isinstance(o, dict):
o = json.loads(o.to_json())
# Make a new record to store values in
r = {"values": []}
# Get the object properties. This contains all the
# fun stuff like values
if "observable_composition" in o:
# May as well be useless
return r
2016-11-18 18:09:58 +01:00
if not o.get('object'):
return r
2016-08-12 14:09:59 +02:00
props = o["object"]["properties"]
# If it has an address_value field, it's gonna be an address
# Kinda obvious really
if "address_value" in props:
# We've got ourselves a nice little address
value = props["address_value"]
if isinstance(value, dict):
# Sometimes it's embedded in a dictionary
value = value["value"]
# Is it an IP?
if ipre.match(str(value)):
# Yes!
r["values"].append(value)
r["types"] = ["ip-src", "ip-dst"]
else:
# Probably a domain yo
r["values"].append(value)
r["types"] = ["domain", "hostname"]
if "hashes" in props:
for hsh in props["hashes"]:
r["values"].append(hsh["simple_hash_value"]["value"])
r["types"] = identifyHash(hsh["simple_hash_value"]["value"])
2016-11-18 18:09:58 +01:00
elif "xsi:type" in props:
# Cybox. Ew.
try:
type_ = props["xsi:type"]
val = props["value"]
if type_ == "LinkObjectType":
r["types"] = ["link"]
r["values"].append(val)
else:
print("Ignoring {}".format(type_))
except:
2016-11-18 18:09:58 +01:00
pass
return r
2016-08-12 14:09:59 +02:00
def loadPackage(data, memsize=1024):
2016-08-12 14:09:59 +02:00
# Write the stix package to a tmp file
temp = tempfile.SpooledTemporaryFile(max_size=int(memsize), mode="w+")
temp.write(data)
# Back to the beginning so we can read it again
temp.seek(0)
try:
2016-08-12 14:09:59 +02:00
# Try loading it into every format we know of
try:
package = STIXPackage().from_xml(temp)
2016-08-12 14:09:59 +02:00
except:
# We have to seek back again
temp.seek(0)
package = STIXPackage().from_json(temp)
2016-08-12 14:09:59 +02:00
except Exception:
print("Failed to load package")
raise ValueError("COULD NOT LOAD STIX PACKAGE!")
temp.close()
2016-08-12 14:09:59 +02:00
return package
def introspection():
modulesetup = {}
try:
userConfig
modulesetup['userConfig'] = userConfig
except NameError:
pass
try:
inputSource
modulesetup['inputSource'] = inputSource
except NameError:
pass
return modulesetup
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo