mirror of https://github.com/MISP/misp-modules
VulnDB Queries
Search on CVE at https://vulndb.cyberriskanalytics.com/ https://www.riskbasedsecurity.com/ Get extended CVE info, links + CPEpull/140/head
parent
b28d116787
commit
74e660d61b
|
@ -3,4 +3,4 @@ from . import _vmray
|
|||
__all__ = ['vmray_submit', 'asn_history', 'circl_passivedns', 'circl_passivessl',
|
||||
'countrycode', 'cve', 'dns', 'domaintools', 'eupi', 'ipasn', 'passivetotal', 'sourcecache',
|
||||
'virustotal', 'whois', 'shodan', 'reversedns', 'geoip_country', 'wiki', 'iprep', 'threatminer' ,'otx',
|
||||
'threatcrowd']
|
||||
'threatcrowd','vulndb']
|
||||
|
|
|
@ -0,0 +1,282 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
'''
|
||||
Import VulnDB
|
||||
https://vulndb.cyberriskanalytics.com/
|
||||
https://www.riskbasedsecurity.com/
|
||||
'''
|
||||
|
||||
import oauth2 as oauth
|
||||
import json
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
|
||||
log = logging.getLogger('vulndb')
|
||||
log.setLevel(logging.DEBUG)
|
||||
ch = logging.StreamHandler(sys.stdout)
|
||||
ch.setLevel(logging.DEBUG)
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
ch.setFormatter(formatter)
|
||||
log.addHandler(ch)
|
||||
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
mispattributes = {
|
||||
'input': ['vulnerability'],
|
||||
'output': ['text', 'link', 'cpe']}
|
||||
moduleinfo = {'version': '0.1', 'author': 'Koen Van Impe',
|
||||
'description': 'Query VulnDB - RiskBasedSecurity.com',
|
||||
'module-type': ['expansion', 'hover']}
|
||||
|
||||
moduleconfig = ['apikey', 'apisecret', 'discard_dates', 'discard_external_references', 'discard_cvss', 'discard_productinformation', 'discard_classification', 'discard_cpe']
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
# Base URL for VulnDB
|
||||
VULNDB_URL = "https://vulndb.cyberriskanalytics.com"
|
||||
|
||||
if q is False:
|
||||
return False
|
||||
request = json.loads(q)
|
||||
|
||||
# Only continue if we have a vulnerability attribute
|
||||
if not request.get('vulnerability'):
|
||||
misperrors['error'] = 'Vulnerability id missing for VulnDB'
|
||||
return misperrors
|
||||
vulnerability = request.get('vulnerability')
|
||||
|
||||
if request["config"].get("apikey") is None or request["config"].get("apisecret") is None:
|
||||
misperrors["error"] = "Missing API key or secret value for VulnDB"
|
||||
return misperrors
|
||||
apikey = request["config"].get("apikey")
|
||||
apisecret = request["config"].get("apisecret")
|
||||
|
||||
# This has to be done the 'inverse' way, MISP-server settings are set to False by default
|
||||
add_cvss = True
|
||||
add_products = True
|
||||
add_classifications = True
|
||||
add_cpe = True
|
||||
add_dates = True
|
||||
add_ext_references = True
|
||||
|
||||
if request["config"].get("discard_dates") is not None and request["config"].get("discard_dates").lower() == "true":
|
||||
add_dates = False
|
||||
if request["config"].get("discard_external_references") is not None and request["config"].get("discard_external_references").lower() == "true":
|
||||
add_ext_references = False
|
||||
if request["config"].get("discard_cvss") is not None and request["config"].get("discard_cvss").lower() == "true":
|
||||
add_cvss = False
|
||||
if request["config"].get("discard_productinformation") is not None and request["config"].get("discard_productinformation").lower() == "true":
|
||||
add_products = False
|
||||
if request["config"].get("discard_classification") is not None and request["config"].get("discard_classification").lower() == "true":
|
||||
add_classifications = False
|
||||
if request["config"].get("discard_cpe") is not None and request["config"].get("discard_cpe").lower() == "true":
|
||||
add_cpe = False
|
||||
|
||||
cpu_vulndb = ""
|
||||
if add_cpe:
|
||||
cpu_vulndb = "?show_cpe=true"
|
||||
|
||||
find_by_cve_url = "%s/api/v1/vulnerabilities/%s/find_by_cve_id%s" % (VULNDB_URL, vulnerability, cpu_vulndb)
|
||||
log.debug(find_by_cve_url)
|
||||
|
||||
try:
|
||||
|
||||
consumer = oauth.Consumer(key=apikey, secret=apisecret)
|
||||
client = oauth.Client(consumer)
|
||||
resp, content = client.request(find_by_cve_url, "GET")
|
||||
content_json = json.loads(content.decode())
|
||||
|
||||
if content_json:
|
||||
if 'error' in content_json:
|
||||
misperrors["error"] = "No CVE information found"
|
||||
return misperrors
|
||||
else:
|
||||
output = {'results': list()}
|
||||
values_text = list()
|
||||
values_links = list()
|
||||
values_cpe = list()
|
||||
|
||||
results = content_json["results"][0]
|
||||
|
||||
# Include the VulnDB title and ID
|
||||
values_text.append(results["title"])
|
||||
vulndb_id_link = "%s/vulnerabilities/%s" % (VULNDB_URL, results["vulndb_id"])
|
||||
values_links.append(vulndb_id_link)
|
||||
|
||||
# Descriptive part of the VulnDB item
|
||||
description = results.get('description', '') or ''
|
||||
keywords = results.get('keywords', '') or ''
|
||||
solution = results.get('solution', '') or ''
|
||||
manual_notes = results.get('manual_notes', '') or ''
|
||||
t_description = results.get('t_description', '') or ''
|
||||
if description:
|
||||
values_text.append(description)
|
||||
if t_description:
|
||||
values_text.append(t_description)
|
||||
if manual_notes:
|
||||
values_text.append("Notes: " + manual_notes)
|
||||
if keywords:
|
||||
values_text.append("Keywords: " + keywords)
|
||||
if solution:
|
||||
values_text.append("Solution: " + solution)
|
||||
|
||||
# VulnDB items contain a number of dates, do we include them?
|
||||
if add_dates:
|
||||
log.debug("Include dates")
|
||||
solution_date = results.get('solution_date', '') or ''
|
||||
if solution_date:
|
||||
values_text.append("Solution date: " + solution_date)
|
||||
disclosure_date = results.get('disclosure_date', '') or ''
|
||||
if disclosure_date:
|
||||
values_text.append("Disclosure date: " + disclosure_date)
|
||||
discovery_date = results.get('discovery_date', '') or ''
|
||||
if discovery_date:
|
||||
values_text.append("Discovery date: " + discovery_date)
|
||||
exploit_publish_date = results.get('exploit_publish_date', '') or ''
|
||||
if exploit_publish_date:
|
||||
values_text.append("Exploit published date: " + exploit_publish_date)
|
||||
vendor_informed_date = results.get('vendor_informed_date', '') or ''
|
||||
if vendor_informed_date:
|
||||
values_text.append("Vendor informed date: " + vendor_informed_date)
|
||||
vendor_ack_date = results.get('vendor_ack_date', '') or ''
|
||||
if vendor_ack_date:
|
||||
values_text.append("Vendor acknowledgement date: " + vendor_ack_date)
|
||||
third_party_solution_date = results.get('third_party_solution_date', '') or ''
|
||||
if third_party_solution_date:
|
||||
values_text.append("Third party solution date: " + third_party_solution_date)
|
||||
|
||||
# External references
|
||||
if add_ext_references:
|
||||
ext_references = results.get('ext_references')
|
||||
if ext_references:
|
||||
log.debug("Include external references")
|
||||
for reference in ext_references:
|
||||
reference_type = reference["type"]
|
||||
if reference_type == "Other Advisory URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "News Article":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Generic Informational URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Vendor Specific Advisory URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Vendor URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Mail List Post":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Metasploit URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Packet Storm":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Generic Exploit URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "CERT VU":
|
||||
reference_link = "http://www.kb.cert.org/vuls/id/%s" % reference["value"]
|
||||
values_links.append(reference_link)
|
||||
elif reference_type == "CVE ID":
|
||||
reference_link = "https://nvd.nist.gov/vuln/detail/%s" % reference["value"]
|
||||
values_links.append(reference_link)
|
||||
elif reference_type == "Microsoft Knowledge Base Article":
|
||||
reference_link = "https://support.microsoft.com/en-us/help/%s" % reference["value"]
|
||||
values_links.append(reference_link)
|
||||
elif reference_type == "Exploit Database":
|
||||
reference_link = "https://www.exploit-db.com/exploits/%s" % reference["value"]
|
||||
values_links.append(reference_link)
|
||||
elif reference_type == "Generic Informational URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Generic Informational URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Generic Informational URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Generic Informational URL":
|
||||
values_links.append(reference["value"])
|
||||
elif reference_type == "Generic Informational URL":
|
||||
values_links.append(reference["value"])
|
||||
|
||||
# CVSS Scoring
|
||||
if add_cvss:
|
||||
cvss = results.get('cvss_metrics')
|
||||
if cvss:
|
||||
log.debug("Include CVSS")
|
||||
for cvss_metric in cvss:
|
||||
score = cvss_metric.get("score")
|
||||
if score:
|
||||
values_text.append("CVSS %s (base: %s) (source: %s)" % (score, cvss_metric.get("calculated_cvss_base_score"), cvss_metric.get("source")))
|
||||
|
||||
# Add products
|
||||
if add_products:
|
||||
products = results.get('products')
|
||||
if products and len(products) > 0:
|
||||
|
||||
# Get the vendors
|
||||
vendors = results.get('vendors')
|
||||
vendors_name = ""
|
||||
log.debug("Include product information")
|
||||
if vendors:
|
||||
for vendor in vendors:
|
||||
vendor_detail = vendor.get("vendor")
|
||||
if vendor_detail:
|
||||
vendor_name = vendor_detail.get("name")
|
||||
if vendor_name:
|
||||
vendors_name += vendor_name + " "
|
||||
|
||||
# Walk through all vendors
|
||||
for product in products:
|
||||
vulnerable_product = vendors_name
|
||||
name = product.get("name")
|
||||
if name:
|
||||
vulnerable_product += "%s " % name
|
||||
versions = product.get("versions")
|
||||
if versions:
|
||||
vulnerable_product += "("
|
||||
for version in versions:
|
||||
affected = version.get("affected")
|
||||
|
||||
if affected and affected == "true":
|
||||
vulnerable_product += " %s " % version.get("name")
|
||||
if add_cpe:
|
||||
version_cpe = version.get("cpe")
|
||||
if version_cpe:
|
||||
cpe = version_cpe[0].get("cpe")
|
||||
if cpe:
|
||||
values_cpe.append(cpe)
|
||||
|
||||
vulnerable_product += ")"
|
||||
# Add vulnerable products
|
||||
values_text.append(vulnerable_product)
|
||||
|
||||
# Add vulnerability classifications
|
||||
if add_classifications:
|
||||
classifications = results.get("classifications")
|
||||
if classifications and len(classifications) > 0:
|
||||
vulnerability_classification = ""
|
||||
log.debug("Include classifications")
|
||||
for classification in classifications:
|
||||
longname = classification.get("longname")
|
||||
description = classification.get("description")
|
||||
vulnerability_classification += " \"%s\" " % longname
|
||||
values_text.append(vulnerability_classification)
|
||||
|
||||
# Finished processing the VulnDB reply; set the result for MISP
|
||||
output['results'] += [{'types': 'text', 'values': values_text }]
|
||||
output['results'] += [{'types': 'link', 'values': values_links }]
|
||||
if add_cpe:
|
||||
output['results'] += [{'types': 'cpe', 'values': values_cpe }]
|
||||
return output
|
||||
else:
|
||||
misperrors["error"] = "No information retrieved from VulnDB"
|
||||
return misperrors
|
||||
except:
|
||||
misperrors["error"] = "Error while fetching information from VulnDB, wrong API keys?"
|
||||
return misperrors
|
||||
|
||||
|
||||
def introspection():
|
||||
return mispattributes
|
||||
|
||||
|
||||
def version():
|
||||
moduleinfo['config'] = moduleconfig
|
||||
return moduleinfo
|
Loading…
Reference in New Issue