misp-modules/misp_modules/modules/expansion/vulndb.py

283 lines
14 KiB
Python

#!/usr/bin/env python3
'''
Import VulnDB
https://vulndb.cyberriskanalytics.com/
https://www.riskbasedsecurity.com/
'''
import oauth2 as oauth
import json
import logging
import sys
log = logging.getLogger('vulndb')
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
log.addHandler(ch)
misperrors = {'error': 'Error'}
mispattributes = {
'input': ['vulnerability'],
'output': ['text', 'link', 'cpe']}
moduleinfo = {'version': '0.1', 'author': 'Koen Van Impe',
'description': 'Query VulnDB - RiskBasedSecurity.com',
'module-type': ['expansion', 'hover']}
moduleconfig = ['apikey', 'apisecret', 'discard_dates', 'discard_external_references', 'discard_cvss', 'discard_productinformation', 'discard_classification', 'discard_cpe']
def handler(q=False):
# Base URL for VulnDB
VULNDB_URL = "https://vulndb.cyberriskanalytics.com"
if q is False:
return False
request = json.loads(q)
# Only continue if we have a vulnerability attribute
if not request.get('vulnerability'):
misperrors['error'] = 'Vulnerability ID missing for VulnDB.'
return misperrors
vulnerability = request.get('vulnerability')
if request["config"].get("apikey") is None or request["config"].get("apisecret") is None:
misperrors["error"] = "Missing API key or secret value for VulnDB."
return misperrors
apikey = request["config"].get("apikey")
apisecret = request["config"].get("apisecret")
# This has to be done the 'inverse' way, MISP-server settings are set to False by default
add_cvss = True
add_products = True
add_classifications = True
add_cpe = True
add_dates = True
add_ext_references = True
if request["config"].get("discard_dates") is not None and request["config"].get("discard_dates").lower() == "true":
add_dates = False
if request["config"].get("discard_external_references") is not None and request["config"].get("discard_external_references").lower() == "true":
add_ext_references = False
if request["config"].get("discard_cvss") is not None and request["config"].get("discard_cvss").lower() == "true":
add_cvss = False
if request["config"].get("discard_productinformation") is not None and request["config"].get("discard_productinformation").lower() == "true":
add_products = False
if request["config"].get("discard_classification") is not None and request["config"].get("discard_classification").lower() == "true":
add_classifications = False
if request["config"].get("discard_cpe") is not None and request["config"].get("discard_cpe").lower() == "true":
add_cpe = False
cpu_vulndb = ""
if add_cpe:
cpu_vulndb = "?show_cpe=true"
find_by_cve_url = "%s/api/v1/vulnerabilities/%s/find_by_cve_id%s" % (VULNDB_URL, vulnerability, cpu_vulndb)
log.debug(find_by_cve_url)
try:
consumer = oauth.Consumer(key=apikey, secret=apisecret)
client = oauth.Client(consumer)
resp, content = client.request(find_by_cve_url, "GET")
content_json = json.loads(content.decode())
if content_json:
if 'error' in content_json:
misperrors["error"] = "No CVE information found."
return misperrors
else:
output = {'results': list()}
values_text = list()
values_links = list()
values_cpe = list()
results = content_json["results"][0]
# Include the VulnDB title and ID
values_text.append(results["title"])
vulndb_id_link = "%s/vulnerabilities/%s" % (VULNDB_URL, results["vulndb_id"])
values_links.append(vulndb_id_link)
# Descriptive part of the VulnDB item
description = results.get('description', '') or ''
keywords = results.get('keywords', '') or ''
solution = results.get('solution', '') or ''
manual_notes = results.get('manual_notes', '') or ''
t_description = results.get('t_description', '') or ''
if description:
values_text.append(description)
if t_description:
values_text.append(t_description)
if manual_notes:
values_text.append("Notes: " + manual_notes)
if keywords:
values_text.append("Keywords: " + keywords)
if solution:
values_text.append("Solution: " + solution)
# VulnDB items contain a number of dates, do we include them?
if add_dates:
log.debug("Include dates")
solution_date = results.get('solution_date', '') or ''
if solution_date:
values_text.append("Solution date: " + solution_date)
disclosure_date = results.get('disclosure_date', '') or ''
if disclosure_date:
values_text.append("Disclosure date: " + disclosure_date)
discovery_date = results.get('discovery_date', '') or ''
if discovery_date:
values_text.append("Discovery date: " + discovery_date)
exploit_publish_date = results.get('exploit_publish_date', '') or ''
if exploit_publish_date:
values_text.append("Exploit published date: " + exploit_publish_date)
vendor_informed_date = results.get('vendor_informed_date', '') or ''
if vendor_informed_date:
values_text.append("Vendor informed date: " + vendor_informed_date)
vendor_ack_date = results.get('vendor_ack_date', '') or ''
if vendor_ack_date:
values_text.append("Vendor acknowledgement date: " + vendor_ack_date)
third_party_solution_date = results.get('third_party_solution_date', '') or ''
if third_party_solution_date:
values_text.append("Third party solution date: " + third_party_solution_date)
# External references
if add_ext_references:
ext_references = results.get('ext_references')
if ext_references:
log.debug("Include external references")
for reference in ext_references:
reference_type = reference["type"]
if reference_type == "Other Advisory URL":
values_links.append(reference["value"])
elif reference_type == "News Article":
values_links.append(reference["value"])
elif reference_type == "Generic Informational URL":
values_links.append(reference["value"])
elif reference_type == "Vendor Specific Advisory URL":
values_links.append(reference["value"])
elif reference_type == "Vendor URL":
values_links.append(reference["value"])
elif reference_type == "Mail List Post":
values_links.append(reference["value"])
elif reference_type == "Metasploit URL":
values_links.append(reference["value"])
elif reference_type == "Packet Storm":
values_links.append(reference["value"])
elif reference_type == "Generic Exploit URL":
values_links.append(reference["value"])
elif reference_type == "CERT VU":
reference_link = "http://www.kb.cert.org/vuls/id/%s" % reference["value"]
values_links.append(reference_link)
elif reference_type == "CVE ID":
reference_link = "https://nvd.nist.gov/vuln/detail/%s" % reference["value"]
values_links.append(reference_link)
elif reference_type == "Microsoft Knowledge Base Article":
reference_link = "https://support.microsoft.com/en-us/help/%s" % reference["value"]
values_links.append(reference_link)
elif reference_type == "Exploit Database":
reference_link = "https://www.exploit-db.com/exploits/%s" % reference["value"]
values_links.append(reference_link)
elif reference_type == "Generic Informational URL":
values_links.append(reference["value"])
elif reference_type == "Generic Informational URL":
values_links.append(reference["value"])
elif reference_type == "Generic Informational URL":
values_links.append(reference["value"])
elif reference_type == "Generic Informational URL":
values_links.append(reference["value"])
elif reference_type == "Generic Informational URL":
values_links.append(reference["value"])
# CVSS Scoring
if add_cvss:
cvss = results.get('cvss_metrics')
if cvss:
log.debug("Include CVSS")
for cvss_metric in cvss:
score = cvss_metric.get("score")
if score:
values_text.append("CVSS %s (base: %s) (source: %s)" % (score, cvss_metric.get("calculated_cvss_base_score"), cvss_metric.get("source")))
# Add products
if add_products:
products = results.get('products')
if products and len(products) > 0:
# Get the vendors
vendors = results.get('vendors')
vendors_name = ""
log.debug("Include product information")
if vendors:
for vendor in vendors:
vendor_detail = vendor.get("vendor")
if vendor_detail:
vendor_name = vendor_detail.get("name")
if vendor_name:
vendors_name += vendor_name + " "
# Walk through all vendors
for product in products:
vulnerable_product = vendors_name
name = product.get("name")
if name:
vulnerable_product += "%s " % name
versions = product.get("versions")
if versions:
vulnerable_product += "("
for version in versions:
affected = version.get("affected")
if affected and affected == "true":
vulnerable_product += " %s " % version.get("name")
if add_cpe:
version_cpe = version.get("cpe")
if version_cpe:
cpe = version_cpe[0].get("cpe")
if cpe:
values_cpe.append(cpe)
vulnerable_product += ")"
# Add vulnerable products
values_text.append(vulnerable_product)
# Add vulnerability classifications
if add_classifications:
classifications = results.get("classifications")
if classifications and len(classifications) > 0:
vulnerability_classification = ""
log.debug("Include classifications")
for classification in classifications:
longname = classification.get("longname")
description = classification.get("description")
vulnerability_classification += " \"%s\" " % longname
values_text.append(vulnerability_classification)
# Finished processing the VulnDB reply; set the result for MISP
output['results'] += [{'types': 'text', 'values': values_text }]
output['results'] += [{'types': 'link', 'values': values_links }]
if add_cpe:
output['results'] += [{'types': 'cpe', 'values': values_cpe }]
return output
else:
misperrors["error"] = "No information retrieved from VulnDB."
return misperrors
except:
misperrors["error"] = "Error while fetching information from VulnDB, wrong API keys?"
return misperrors
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo