From c567d1e6f28e7fe3c9dcd2da0ae10cc89deb821b Mon Sep 17 00:00:00 2001 From: Hannah Ward Date: Mon, 21 Nov 2016 10:59:30 +0000 Subject: [PATCH] Moved to misp_stix_converter --- misp_modules/modules/import_mod/stiximport.py | 215 +----------------- setup.py | 3 +- tests/test.py | 11 +- 3 files changed, 22 insertions(+), 207 deletions(-) diff --git a/misp_modules/modules/import_mod/stiximport.py b/misp_modules/modules/import_mod/stiximport.py index 16a2f22..99e891c 100755 --- a/misp_modules/modules/import_mod/stiximport.py +++ b/misp_modules/modules/import_mod/stiximport.py @@ -1,19 +1,21 @@ import json -from stix.core import STIXPackage import re import base64 import hashlib import tempfile +import os + +from pymisp.tools import stix misperrors = {'error': 'Error'} userConfig = {} inputSource = ['file'] -moduleinfo = {'version': '0.1', 'author': 'Hannah Ward', +moduleinfo = {'version': '0.2', 'author': 'Hannah Ward', 'description': 'Import some stix stuff', 'module-type': ['import']} -moduleconfig = ["max_size"] +moduleconfig = [] def handler(q=False): @@ -34,211 +36,18 @@ def handler(q=False): if not package: return json.dumps({"success": 0}) - # Get the maxsize from the config - # Default to 10MB - # (I believe the max_size arg is given in bytes) - # Check if we were given a configuration - memsize = q.get("config", None) + tfile = tempfile.NamedTemporaryFile(mode="w", prefix="STIX", delete=False) + tfile.write(package) + tfile.close() - # If we were, find out if there's a memsize field - if memsize: - memsize = memsize.get("max_size", 10 * 1024) - else: - memsize = 10 * 1024 + pkg = stix.load_stix(tfile.name) - # Load up the package into STIX - package = loadPackage(package, memsize) + for attrib in pkg.attributes: + r["results"].append({ "values" : [attrib.value] , "types": [attrib.type], "categories": [attrib.category]}) - # Build all the observables - if package.observables: - for obs in package.observables: - r["results"].append(buildObservable(obs)) - - # And now the threat actors - if package.threat_actors: - for ta in package.threat_actors: - r["results"].append(buildActor(ta)) - - # Aaaand the indicators - if package.indicators: - for ind in package.indicators: - r["results"] += buildIndicator(ind) - - # Are you seeing a pattern? - if package.exploit_targets: - for et in package.exploit_targets: - r["results"].append(buildExploitTarget(et)) - - # LOADING STUFF - if package.campaigns: - for cpn in package.campaigns: - r["results"].append(buildCampaign(cpn)) - - # Clean up results - # Don't send on anything that didn't have a value - r["results"] = [x for x in r["results"] if isinstance(x, dict) and len(x["values"]) != 0] + os.unlink(tfile.name) return r -# Quick and dirty regex for IP addresses -ipre = re.compile("([0-9]{1,3}.){3}[0-9]{1,3}") - - -def buildCampaign(cpn): - """ - Extract a campaign name - """ - return {"values": [cpn.title], "types": ["campaign-name"]} - - -def buildExploitTarget(et): - """ - Extract CVEs from exploit targets - """ - - r = {"values": [], "types": ["vulnerability"]} - - if et.vulnerabilities: - for v in et.vulnerabilities: - if v.cve_id: - r["values"].append(v.cve_id) - return r - - - -def identifyHash(hsh): - """ - What's that hash!? - """ - - possible_hashes = [] - - hashes = [x for x in hashlib.algorithms_guaranteed] - - for h in hashes: - if len(str(hsh)) == len(hashlib.new(h).hexdigest()): - possible_hashes.append(h) - possible_hashes.append("filename|{}".format(h)) - return possible_hashes - - -def buildIndicator(ind): - """ - Extract hashes - and other fun things - like that - """ - r = [] - # Try to get hashes. I hate stix - if ind.observables: - for i in ind.observables: - if i.observable_composition: - for j in i.observable_composition.observables: - r.append(buildObservable(j)) - r.append(buildObservable(i)) - return r - - -def buildActor(ta): - """ - Extract the name - and comment of a - threat actor - """ - - r = {"values": [ta.title], "types": ["threat-actor"]} - - return r - - -def buildObservable(o): - """ - Take a STIX observable - and extract the value - and category - """ - # Life is easier with json - if not isinstance(o, dict): - o = json.loads(o.to_json()) - # Make a new record to store values in - r = {"values": []} - - # Get the object properties. This contains all the - # fun stuff like values - if "observable_composition" in o: - # May as well be useless - return r - - if not o.get('object'): - return r - - props = o["object"]["properties"] - - # If it has an address_value field, it's gonna be an address - # Kinda obvious really - if "address_value" in props: - - # We've got ourselves a nice little address - value = props["address_value"] - - if isinstance(value, dict): - # Sometimes it's embedded in a dictionary - value = value["value"] - - # Is it an IP? - if ipre.match(str(value)): - # Yes! - r["values"].append(value) - r["types"] = ["ip-src", "ip-dst"] - else: - # Probably a domain yo - r["values"].append(value) - r["types"] = ["domain", "hostname"] - - if "hashes" in props: - for hsh in props["hashes"]: - r["values"].append(hsh["simple_hash_value"]["value"]) - r["types"] = identifyHash(hsh["simple_hash_value"]["value"]) - - elif "xsi:type" in props: - # Cybox. Ew. - try: - type_ = props["xsi:type"] - val = props["value"] - - if type_ == "LinkObjectType": - r["types"] = ["link"] - r["values"].append(val) - else: - print("Ignoring {}".format(type_)) - except: - pass - return r - - -def loadPackage(data, memsize=1024): - # Write the stix package to a tmp file - - temp = tempfile.SpooledTemporaryFile(max_size=int(memsize), mode="w+") - - temp.write(data) - - # Back to the beginning so we can read it again - temp.seek(0) - try: - # Try loading it into every format we know of - try: - package = STIXPackage().from_xml(temp) - except: - # We have to seek back again - temp.seek(0) - package = STIXPackage().from_json(temp) - except Exception: - print("Failed to load package") - raise ValueError("COULD NOT LOAD STIX PACKAGE!") - temp.close() - return package - - def introspection(): modulesetup = {} try: diff --git a/setup.py b/setup.py index 8ad517e..0010c05 100644 --- a/setup.py +++ b/setup.py @@ -38,5 +38,6 @@ setup( 'pillow', 'pytesseract', 'shodan', - ] + 'misp_stix_converter' + ], ) diff --git a/tests/test.py b/tests/test.py index d506595..faf5173 100644 --- a/tests/test.py +++ b/tests/test.py @@ -37,10 +37,15 @@ class TestModules(unittest.TestCase): content = base64.b64encode(f.read()) data = json.dumps({"module": "stiximport", "data": content.decode('utf-8'), - "config": {"max_size": "15000"}, }) - response = requests.post(self.url + "query", data=data) - print('STIX', response.json()) + response = requests.post(self.url + "query", data=data).json() + + print("STIX :: {}".format(response)) + values = [x["values"][0] for x in response["results"]] + + assert("209.239.79.47" in values) + assert("41.213.121.180" in values) + assert("eu-society.com" in values) def test_virustotal(self): # This can't actually be tested without disclosing a private