mirror of https://github.com/MISP/misp-modules
Merge pull request #47 from FloatingGhost/CEF_Export
CEF export, fixes in CountryCode, virustotalpull/52/head
commit
2df8bf970e
|
@ -1,5 +1,6 @@
|
|||
*.pyc
|
||||
*.swp
|
||||
bodyvirustotal.json
|
||||
__pycache__
|
||||
build/
|
||||
dist/
|
||||
|
|
|
@ -20,7 +20,10 @@ common_tlds = {"com":"Commercial (Worldwide)",
|
|||
"gov":"Government (USA)"
|
||||
}
|
||||
|
||||
codes = requests.get("http://www.geognos.com/api/en/countries/info/all.json").json()
|
||||
|
||||
def handler(q=False):
|
||||
global codes
|
||||
if q is False:
|
||||
return False
|
||||
request = json.loads(q)
|
||||
|
@ -34,8 +37,6 @@ def handler(q=False):
|
|||
val = common_tlds[ext]
|
||||
else:
|
||||
# Retrieve a json full of country info
|
||||
codes = requests.get("http://www.geognos.com/api/en/countries/info/all.json").json()
|
||||
|
||||
if not codes["StatusMsg"] == "OK":
|
||||
val = "Unknown"
|
||||
else:
|
||||
|
|
|
@ -6,7 +6,7 @@ import base64
|
|||
import os
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
mispattributes = {'input': ['domain', "ip-src", "ip-dst"],
|
||||
mispattributes = {'input': ['hostname', 'domain', "ip-src", "ip-dst"],
|
||||
'output':['domain', "ip-src", "ip-dst", "text"]
|
||||
}
|
||||
|
||||
|
@ -16,16 +16,19 @@ moduleinfo = {'version': '1', 'author': 'Hannah Ward',
|
|||
'module-type': ['expansion']}
|
||||
|
||||
# config fields that your code expects from the site admin
|
||||
moduleconfig = ["apikey"]
|
||||
|
||||
moduleconfig = ["apikey", "event_limit"]
|
||||
limit = 5 #Default
|
||||
|
||||
def handler(q=False):
|
||||
global limit
|
||||
if q is False:
|
||||
return False
|
||||
|
||||
q = json.loads(q)
|
||||
|
||||
key = q["config"]["apikey"]
|
||||
limit = int(q["config"].get("event_limit", 5))
|
||||
|
||||
r = {"results": []}
|
||||
|
||||
if "ip-src" in q:
|
||||
|
@ -34,6 +37,8 @@ def handler(q=False):
|
|||
r["results"] += getIP(q["ip-dst"], key)
|
||||
if "domain" in q:
|
||||
r["results"] += getDomain(q["domain"], key)
|
||||
if 'hostname' in q:
|
||||
r["results"] += getDomain(q['hostname'], key)
|
||||
|
||||
uniq = []
|
||||
for res in r["results"]:
|
||||
|
@ -43,7 +48,7 @@ def handler(q=False):
|
|||
return r
|
||||
|
||||
def getIP(ip, key, do_not_recurse = False):
|
||||
print("Getting info for {}".format(ip))
|
||||
global limit
|
||||
toReturn = []
|
||||
req = requests.get("https://www.virustotal.com/vtapi/v2/ip-address/report",
|
||||
params = {"ip":ip, "apikey":key}
|
||||
|
@ -53,7 +58,7 @@ def getIP(ip, key, do_not_recurse = False):
|
|||
return []
|
||||
|
||||
if "resolutions" in req:
|
||||
for res in req["resolutions"]:
|
||||
for res in req["resolutions"][:limit]:
|
||||
toReturn.append( {"types":["domain"], "values":[res["hostname"]]})
|
||||
#Pivot from here to find all domain info
|
||||
if not do_not_recurse:
|
||||
|
@ -63,7 +68,7 @@ def getIP(ip, key, do_not_recurse = False):
|
|||
return toReturn
|
||||
|
||||
def getDomain(domain, key, do_not_recurse=False):
|
||||
print("Getting info for {}".format(domain))
|
||||
global limit
|
||||
toReturn = []
|
||||
req = requests.get("https://www.virustotal.com/vtapi/v2/domain/report",
|
||||
params = {"domain":domain, "apikey":key}
|
||||
|
@ -73,7 +78,7 @@ def getDomain(domain, key, do_not_recurse=False):
|
|||
return []
|
||||
|
||||
if "resolutions" in req:
|
||||
for res in req["resolutions"]:
|
||||
for res in req["resolutions"][:limit]:
|
||||
toReturn.append( {"types":["ip-dst", "ip-src"], "values":[res["ip_address"]]})
|
||||
#Pivot from here to find all info on IPs
|
||||
if not do_not_recurse:
|
||||
|
@ -103,13 +108,13 @@ def isset(d, key):
|
|||
return False
|
||||
|
||||
def getMoreInfo(req, key):
|
||||
print("Getting extra info for {}".format(req))
|
||||
global limit
|
||||
r = []
|
||||
#Get all hashes first
|
||||
hashes = []
|
||||
hashes = findAll(req, ["md5", "sha1", "sha256", "sha512"])
|
||||
r.append({"types":["md5", "sha1", "sha256", "sha512"], "values":hashes})
|
||||
for hsh in hashes[:5]:
|
||||
for hsh in hashes[:limit]:
|
||||
#Search VT for some juicy info
|
||||
data = requests.get("http://www.virustotal.com/vtapi/v2/file/report",
|
||||
params={"allinfo":1, "apikey":key, "resource":hsh}
|
||||
|
@ -130,7 +135,6 @@ def getMoreInfo(req, key):
|
|||
sample = requests.get("https://www.virustotal.com/vtapi/v2/file/download",
|
||||
params = {"hash":hsh, "apikey":key})
|
||||
|
||||
print(sample)
|
||||
malsample = sample.content
|
||||
r.append({"types":["malware-sample"],
|
||||
"categories":["Payload delivery"],
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
import json
|
||||
import base64
|
||||
import datetime
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
|
||||
# possible module-types: 'expansion', 'hover' or both
|
||||
moduleinfo = {'version': '1', 'author': 'Hannah Ward',
|
||||
'description': 'Export a module in CEF format',
|
||||
'module-type': ['export']}
|
||||
|
||||
# config fields that your code expects from the site admin
|
||||
moduleconfig = ["Default_Severity", "Device_Vendor", "Device_Product", "Device_Version"]
|
||||
|
||||
cefmapping = {"ip-src":"src", "ip-dst":"dst", "hostname":"dhost", "domain":"dhost",
|
||||
"md5":"fileHash", "sha1":"fileHash", "sha256":"fileHash",
|
||||
"url":"request"}
|
||||
|
||||
mispattributes = {'input':list(cefmapping.keys())}
|
||||
outputFileExtension = "cef"
|
||||
responseType = "application/txt"
|
||||
|
||||
def handler(q=False):
|
||||
if q is False:
|
||||
return False
|
||||
request = json.loads(q)
|
||||
if "config" in request:
|
||||
config = request["config"]
|
||||
else:
|
||||
config = {"Default_Severity":1, "Device_Vendor":"MISP", "Device_Product":"MISP", "Device_Version":1}
|
||||
|
||||
data = request["data"]
|
||||
response = ""
|
||||
for ev in data:
|
||||
event = ev["Attribute"]
|
||||
for attr in event:
|
||||
if attr["type"] in cefmapping:
|
||||
response += "{} host CEF:0|{}|{}|{}|{}|{}|{}|{}={}\n".format(
|
||||
datetime.datetime.fromtimestamp(int(attr["timestamp"])).strftime("%b %d %H:%M:%S"),
|
||||
config["Device_Vendor"],
|
||||
config["Device_Product"],
|
||||
config["Device_Version"],
|
||||
attr["category"],
|
||||
attr["category"],
|
||||
config["Default_Severity"],
|
||||
cefmapping[attr["type"]],
|
||||
attr["value"],
|
||||
)
|
||||
|
||||
r = {"response":[], "data":str(base64.b64encode(bytes(response, 'utf-8')), 'utf-8')}
|
||||
return r
|
||||
|
||||
|
||||
def introspection():
|
||||
modulesetup = {}
|
||||
try:
|
||||
responseType
|
||||
modulesetup['responseType'] = responseType
|
||||
except NameError:
|
||||
pass
|
||||
try:
|
||||
userConfig
|
||||
modulesetup['userConfig'] = userConfig
|
||||
except NameError:
|
||||
pass
|
||||
try:
|
||||
outputFileExtension
|
||||
modulesetup['outputFileExtension'] = outputFileExtension
|
||||
except NameError:
|
||||
pass
|
||||
try:
|
||||
inputSource
|
||||
modulesetup['inputSource'] = inputSource
|
||||
except NameError:
|
||||
pass
|
||||
return modulesetup
|
||||
|
||||
def version():
|
||||
moduleinfo['config'] = moduleconfig
|
||||
return moduleinfo
|
||||
|
|
@ -4,6 +4,7 @@ import re
|
|||
import base64
|
||||
import hashlib
|
||||
import tempfile
|
||||
import pickle
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
userConfig = {}
|
||||
|
@ -49,6 +50,9 @@ def handler(q=False):
|
|||
# Load up the package into STIX
|
||||
package = loadPackage(package, memsize)
|
||||
|
||||
# Hash it
|
||||
with open("/home/hward/tmp.dat", "wb") as f:
|
||||
pickle.dump( package, f)
|
||||
# Build all the observables
|
||||
if package.observables:
|
||||
for obs in package.observables:
|
||||
|
@ -62,7 +66,7 @@ def handler(q=False):
|
|||
# Aaaand the indicators
|
||||
if package.indicators:
|
||||
for ind in package.indicators:
|
||||
r["results"].append(buildIndicator(ind))
|
||||
r["results"] += buildIndicator(ind)
|
||||
|
||||
# Are you seeing a pattern?
|
||||
if package.exploit_targets:
|
||||
|
@ -76,7 +80,7 @@ def handler(q=False):
|
|||
|
||||
# Clean up results
|
||||
# Don't send on anything that didn't have a value
|
||||
r["results"] = [x for x in r["results"] if len(x["values"]) != 0]
|
||||
r["results"] = [x for x in r["results"] if isinstance(x, dict) and len(x["values"]) != 0]
|
||||
return r
|
||||
|
||||
# Quick and dirty regex for IP addresses
|
||||
|
@ -126,11 +130,14 @@ def buildIndicator(ind):
|
|||
and other fun things
|
||||
like that
|
||||
"""
|
||||
r = {"values": [], "types": []}
|
||||
|
||||
r = []
|
||||
# Try to get hashes. I hate stix
|
||||
if ind.observable:
|
||||
return buildObservable(ind.observable)
|
||||
if ind.observables:
|
||||
for i in ind.observables:
|
||||
if i.observable_composition:
|
||||
for j in i.observable_composition.observables:
|
||||
r.append(buildObservable(j))
|
||||
r.append(buildObservable(i))
|
||||
return r
|
||||
|
||||
|
||||
|
@ -152,7 +159,6 @@ def buildObservable(o):
|
|||
and extract the value
|
||||
and category
|
||||
"""
|
||||
|
||||
# Life is easier with json
|
||||
if not isinstance(o, dict):
|
||||
o = json.loads(o.to_json())
|
||||
|
@ -168,7 +174,6 @@ def buildObservable(o):
|
|||
props = o["object"]["properties"]
|
||||
|
||||
# If it has an address_value field, it's gonna be an address
|
||||
# print(props)
|
||||
# Kinda obvious really
|
||||
if "address_value" in props:
|
||||
|
||||
|
@ -193,7 +198,21 @@ def buildObservable(o):
|
|||
for hsh in props["hashes"]:
|
||||
r["values"].append(hsh["simple_hash_value"]["value"])
|
||||
r["types"] = identifyHash(hsh["simple_hash_value"]["value"])
|
||||
return r
|
||||
|
||||
elif "xsi:type" in props:
|
||||
# Cybox. Ew.
|
||||
try:
|
||||
type_ = props["xsi:type"]
|
||||
val = props["value"]
|
||||
|
||||
if type_ == "LinkObjectType":
|
||||
r["types"] = ["link"]
|
||||
r["values"].append(val)
|
||||
else:
|
||||
print("Ignoring {}".format(type_))
|
||||
except:
|
||||
pass
|
||||
return r
|
||||
|
||||
|
||||
def loadPackage(data, memsize=1024):
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
{"module": "virustotal", "ip-dst": "5.104.106.190", "config": {"api_key": "deadbeef"} }
|
|
@ -5,6 +5,7 @@ import unittest
|
|||
import requests
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
|
||||
class TestModules(unittest.TestCase):
|
||||
|
||||
|
@ -39,5 +40,17 @@ class TestModules(unittest.TestCase):
|
|||
response = requests.post(self.url + "query", data=data)
|
||||
print(response.json())
|
||||
|
||||
def test_virustotal(self):
|
||||
# This can't actually be tested without disclosing a private
|
||||
# API key. This will attempt to run with a .gitignored keyfile
|
||||
# and pass if it can't find one
|
||||
|
||||
if not os.path.exists("tests/bodyvirustotal.json"):
|
||||
return
|
||||
|
||||
with open("tests/bodyvirustotal.json", "r") as f:
|
||||
response = requests.post(self.url + "query", data=f.read()).json()
|
||||
assert(response)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue