mirror of https://github.com/MISP/misp-modules
Added virustotal tests
parent
4ba86d4fa3
commit
232014f221
|
@ -1,5 +1,6 @@
|
||||||
*.pyc
|
*.pyc
|
||||||
*.swp
|
*.swp
|
||||||
|
test/bodyvirustotal.json
|
||||||
__pycache__
|
__pycache__
|
||||||
build/
|
build/
|
||||||
dist/
|
dist/
|
||||||
|
|
|
@ -6,7 +6,7 @@ import base64
|
||||||
import os
|
import os
|
||||||
|
|
||||||
misperrors = {'error': 'Error'}
|
misperrors = {'error': 'Error'}
|
||||||
mispattributes = {'input': ['domain', "ip-src", "ip-dst"],
|
mispattributes = {'input': ['hostname', 'domain', "ip-src", "ip-dst"],
|
||||||
'output':['domain', "ip-src", "ip-dst", "text"]
|
'output':['domain', "ip-src", "ip-dst", "text"]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,16 +16,19 @@ moduleinfo = {'version': '1', 'author': 'Hannah Ward',
|
||||||
'module-type': ['expansion']}
|
'module-type': ['expansion']}
|
||||||
|
|
||||||
# config fields that your code expects from the site admin
|
# config fields that your code expects from the site admin
|
||||||
moduleconfig = ["apikey"]
|
moduleconfig = ["apikey", "event_limit"]
|
||||||
|
limit = 5 #Default
|
||||||
|
|
||||||
def handler(q=False):
|
def handler(q=False):
|
||||||
|
global limit
|
||||||
if q is False:
|
if q is False:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
q = json.loads(q)
|
q = json.loads(q)
|
||||||
|
|
||||||
key = q["config"]["apikey"]
|
key = q["config"]["apikey"]
|
||||||
|
limit = int(q["config"].get("event_limit", 5))
|
||||||
|
|
||||||
r = {"results": []}
|
r = {"results": []}
|
||||||
|
|
||||||
if "ip-src" in q:
|
if "ip-src" in q:
|
||||||
|
@ -34,6 +37,8 @@ def handler(q=False):
|
||||||
r["results"] += getIP(q["ip-dst"], key)
|
r["results"] += getIP(q["ip-dst"], key)
|
||||||
if "domain" in q:
|
if "domain" in q:
|
||||||
r["results"] += getDomain(q["domain"], key)
|
r["results"] += getDomain(q["domain"], key)
|
||||||
|
if 'hostname' in q:
|
||||||
|
r["results"] += getDomain(q['hostname'], key)
|
||||||
|
|
||||||
uniq = []
|
uniq = []
|
||||||
for res in r["results"]:
|
for res in r["results"]:
|
||||||
|
@ -43,6 +48,7 @@ def handler(q=False):
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def getIP(ip, key, do_not_recurse = False):
|
def getIP(ip, key, do_not_recurse = False):
|
||||||
|
global limit
|
||||||
print("Getting info for {}".format(ip))
|
print("Getting info for {}".format(ip))
|
||||||
toReturn = []
|
toReturn = []
|
||||||
req = requests.get("https://www.virustotal.com/vtapi/v2/ip-address/report",
|
req = requests.get("https://www.virustotal.com/vtapi/v2/ip-address/report",
|
||||||
|
@ -53,7 +59,7 @@ def getIP(ip, key, do_not_recurse = False):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
if "resolutions" in req:
|
if "resolutions" in req:
|
||||||
for res in req["resolutions"]:
|
for res in req["resolutions"][:limit]:
|
||||||
toReturn.append( {"types":["domain"], "values":[res["hostname"]]})
|
toReturn.append( {"types":["domain"], "values":[res["hostname"]]})
|
||||||
#Pivot from here to find all domain info
|
#Pivot from here to find all domain info
|
||||||
if not do_not_recurse:
|
if not do_not_recurse:
|
||||||
|
@ -63,6 +69,8 @@ def getIP(ip, key, do_not_recurse = False):
|
||||||
return toReturn
|
return toReturn
|
||||||
|
|
||||||
def getDomain(domain, key, do_not_recurse=False):
|
def getDomain(domain, key, do_not_recurse=False):
|
||||||
|
global limit
|
||||||
|
|
||||||
print("Getting info for {}".format(domain))
|
print("Getting info for {}".format(domain))
|
||||||
toReturn = []
|
toReturn = []
|
||||||
req = requests.get("https://www.virustotal.com/vtapi/v2/domain/report",
|
req = requests.get("https://www.virustotal.com/vtapi/v2/domain/report",
|
||||||
|
@ -73,7 +81,7 @@ def getDomain(domain, key, do_not_recurse=False):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
if "resolutions" in req:
|
if "resolutions" in req:
|
||||||
for res in req["resolutions"]:
|
for res in req["resolutions"][:limit]:
|
||||||
toReturn.append( {"types":["ip-dst", "ip-src"], "values":[res["ip_address"]]})
|
toReturn.append( {"types":["ip-dst", "ip-src"], "values":[res["ip_address"]]})
|
||||||
#Pivot from here to find all info on IPs
|
#Pivot from here to find all info on IPs
|
||||||
if not do_not_recurse:
|
if not do_not_recurse:
|
||||||
|
@ -103,13 +111,14 @@ def isset(d, key):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def getMoreInfo(req, key):
|
def getMoreInfo(req, key):
|
||||||
|
global limit
|
||||||
print("Getting extra info for {}".format(req))
|
print("Getting extra info for {}".format(req))
|
||||||
r = []
|
r = []
|
||||||
#Get all hashes first
|
#Get all hashes first
|
||||||
hashes = []
|
hashes = []
|
||||||
hashes = findAll(req, ["md5", "sha1", "sha256", "sha512"])
|
hashes = findAll(req, ["md5", "sha1", "sha256", "sha512"])
|
||||||
r.append({"types":["md5", "sha1", "sha256", "sha512"], "values":hashes})
|
r.append({"types":["md5", "sha1", "sha256", "sha512"], "values":hashes})
|
||||||
for hsh in hashes[:5]:
|
for hsh in hashes[:limit]:
|
||||||
#Search VT for some juicy info
|
#Search VT for some juicy info
|
||||||
data = requests.get("http://www.virustotal.com/vtapi/v2/file/report",
|
data = requests.get("http://www.virustotal.com/vtapi/v2/file/report",
|
||||||
params={"allinfo":1, "apikey":key, "resource":hsh}
|
params={"allinfo":1, "apikey":key, "resource":hsh}
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
{"module": "virustotal", "ip-dst": "5.104.106.190", "config": {"api_key": "deadbeef"} }
|
|
@ -5,6 +5,7 @@ import unittest
|
||||||
import requests
|
import requests
|
||||||
import base64
|
import base64
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
class TestModules(unittest.TestCase):
|
class TestModules(unittest.TestCase):
|
||||||
|
|
||||||
|
@ -36,5 +37,17 @@ class TestModules(unittest.TestCase):
|
||||||
response = requests.post(self.url + "query", data=data)
|
response = requests.post(self.url + "query", data=data)
|
||||||
print(response.json())
|
print(response.json())
|
||||||
|
|
||||||
|
def test_virustotal(self):
|
||||||
|
# This can't actually be tested without disclosing a private
|
||||||
|
# API key. This will attempt to run with a .gitignored keyfile
|
||||||
|
# and pass if it can't find one
|
||||||
|
|
||||||
|
if not os.path.exists("tests/bodyvirustotal.json"):
|
||||||
|
return
|
||||||
|
|
||||||
|
with open("tests/bodyvirustotal.json", "r") as f:
|
||||||
|
response = requests.post(self.url + "query", data=f.read()).json()
|
||||||
|
assert(response)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue