mirror of https://github.com/MISP/misp-modules
stiximport now uses temporary files to store stix data.
Set max size in config, in bytespull/43/head
parent
0089983661
commit
6db269f965
|
@ -3,6 +3,7 @@ from stix.core import STIXPackage
|
|||
import re
|
||||
import base64
|
||||
import hashlib
|
||||
import tempfile
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
userConfig = {}
|
||||
|
@ -12,7 +13,7 @@ moduleinfo = {'version': '0.1', 'author': 'Hannah Ward',
|
|||
'description': 'Import some stix stuff',
|
||||
'module-type': ['import']}
|
||||
|
||||
moduleconfig = []
|
||||
moduleconfig = ["max_size"]
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
|
@ -33,29 +34,46 @@ def handler(q=False):
|
|||
if not package:
|
||||
return json.dumps({"success": 0})
|
||||
|
||||
# Get the maxsize from the config
|
||||
# Default to 10MB
|
||||
# (I believe the max_size arg is given in bytes)
|
||||
# Check if we were given a configuration
|
||||
memsize = q.get("config", None)
|
||||
|
||||
# If we were, find out if there's a memsize field
|
||||
if memsize:
|
||||
memsize = memsize.get("max_size", 10 * 1024)
|
||||
else:
|
||||
memsize = 10 * 1024
|
||||
|
||||
# Load up the package into STIX
|
||||
package = loadPackage(package)
|
||||
package = loadPackage(package, memsize)
|
||||
|
||||
# Build all the observables
|
||||
if package.observables:
|
||||
for obs in package.observables:
|
||||
r["results"].append(buildObservable(obs))
|
||||
|
||||
# And now the threat actors
|
||||
if package.threat_actors:
|
||||
for ta in package.threat_actors:
|
||||
r["results"].append(buildActor(ta))
|
||||
|
||||
# Aaaand the indicators
|
||||
if package.indicators:
|
||||
for ind in package.indicators:
|
||||
r["results"].append(buildIndicator(ind))
|
||||
|
||||
# Are you seeing a pattern?
|
||||
if package.exploit_targets:
|
||||
for et in package.exploit_targets:
|
||||
r["results"].append(buildExploitTarget(et))
|
||||
|
||||
# LOADING STUFF
|
||||
if package.campaigns:
|
||||
for cpn in package.campaigns:
|
||||
r["results"].append(buildCampaign(cpn))
|
||||
|
||||
# Clean up results
|
||||
# Don't send on anything that didn't have a value
|
||||
r["results"] = [x for x in r["results"] if len(x["values"]) != 0]
|
||||
|
@ -178,19 +196,27 @@ def buildObservable(o):
|
|||
return r
|
||||
|
||||
|
||||
def loadPackage(data):
|
||||
def loadPackage(data, memsize=1024):
|
||||
# Write the stix package to a tmp file
|
||||
with open("/tmp/stixdump", "w") as f:
|
||||
f.write(data)
|
||||
|
||||
temp = tempfile.SpooledTemporaryFile(max_size=int(memsize), mode="w+")
|
||||
|
||||
temp.write(data)
|
||||
|
||||
# Back to the beginning so we can read it again
|
||||
temp.seek(0)
|
||||
try:
|
||||
# Try loading it into every format we know of
|
||||
try:
|
||||
package = STIXPackage().from_xml(open("/tmp/stixdump", "r"))
|
||||
package = STIXPackage().from_xml(temp)
|
||||
except:
|
||||
package = STIXPackage().from_json(open("/tmp/stixdump", "r"))
|
||||
# We have to seek back again
|
||||
temp.seek(0)
|
||||
package = STIXPackage().from_json(temp)
|
||||
except Exception:
|
||||
print("Failed to load package")
|
||||
raise ValueError("COULD NOT LOAD STIX PACKAGE!")
|
||||
temp.close()
|
||||
return package
|
||||
|
||||
|
||||
|
|
|
@ -30,7 +30,8 @@ class TestModules(unittest.TestCase):
|
|||
def test_stix(self):
|
||||
with open("tests/stix.xml", "r") as f:
|
||||
data = json.dumps({"module":"stiximport",
|
||||
"data":str(base64.b64encode(bytes(f.read(), 'utf-8')), 'utf-8')
|
||||
"data":str(base64.b64encode(bytes(f.read(), 'utf-8')), 'utf-8'),
|
||||
"config": {"max_size": "15000"},
|
||||
})
|
||||
response = requests.post(self.url + "query", data=data)
|
||||
print(response.json())
|
||||
|
|
Loading…
Reference in New Issue