From 74e660d61bd4167b8f2fe1b4b745faf42ea08140 Mon Sep 17 00:00:00 2001 From: Koen Van Impe Date: Mon, 6 Nov 2017 14:23:03 +0100 Subject: [PATCH] VulnDB Queries Search on CVE at https://vulndb.cyberriskanalytics.com/ https://www.riskbasedsecurity.com/ Get extended CVE info, links + CPE --- misp_modules/modules/expansion/__init__.py | 2 +- misp_modules/modules/expansion/vulndb.py | 282 +++++++++++++++++++++ 2 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 misp_modules/modules/expansion/vulndb.py diff --git a/misp_modules/modules/expansion/__init__.py b/misp_modules/modules/expansion/__init__.py index 50baa417..48117e1d 100644 --- a/misp_modules/modules/expansion/__init__.py +++ b/misp_modules/modules/expansion/__init__.py @@ -3,4 +3,4 @@ from . import _vmray __all__ = ['vmray_submit', 'asn_history', 'circl_passivedns', 'circl_passivessl', 'countrycode', 'cve', 'dns', 'domaintools', 'eupi', 'ipasn', 'passivetotal', 'sourcecache', 'virustotal', 'whois', 'shodan', 'reversedns', 'geoip_country', 'wiki', 'iprep', 'threatminer' ,'otx', - 'threatcrowd'] + 'threatcrowd','vulndb'] diff --git a/misp_modules/modules/expansion/vulndb.py b/misp_modules/modules/expansion/vulndb.py new file mode 100644 index 00000000..1b97b560 --- /dev/null +++ b/misp_modules/modules/expansion/vulndb.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 + +''' +Import VulnDB + https://vulndb.cyberriskanalytics.com/ + https://www.riskbasedsecurity.com/ +''' + +import oauth2 as oauth +import json + +import logging +import sys + + +log = logging.getLogger('vulndb') +log.setLevel(logging.DEBUG) +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +log.addHandler(ch) + + +misperrors = {'error': 'Error'} +mispattributes = { + 'input': ['vulnerability'], + 'output': ['text', 'link', 'cpe']} +moduleinfo = {'version': '0.1', 'author': 'Koen Van Impe', + 'description': 'Query VulnDB - RiskBasedSecurity.com', + 'module-type': ['expansion', 'hover']} + +moduleconfig = ['apikey', 'apisecret', 'discard_dates', 'discard_external_references', 'discard_cvss', 'discard_productinformation', 'discard_classification', 'discard_cpe'] + + +def handler(q=False): + # Base URL for VulnDB + VULNDB_URL = "https://vulndb.cyberriskanalytics.com" + + if q is False: + return False + request = json.loads(q) + + # Only continue if we have a vulnerability attribute + if not request.get('vulnerability'): + misperrors['error'] = 'Vulnerability id missing for VulnDB' + return misperrors + vulnerability = request.get('vulnerability') + + if request["config"].get("apikey") is None or request["config"].get("apisecret") is None: + misperrors["error"] = "Missing API key or secret value for VulnDB" + return misperrors + apikey = request["config"].get("apikey") + apisecret = request["config"].get("apisecret") + + # This has to be done the 'inverse' way, MISP-server settings are set to False by default + add_cvss = True + add_products = True + add_classifications = True + add_cpe = True + add_dates = True + add_ext_references = True + + if request["config"].get("discard_dates") is not None and request["config"].get("discard_dates").lower() == "true": + add_dates = False + if request["config"].get("discard_external_references") is not None and request["config"].get("discard_external_references").lower() == "true": + add_ext_references = False + if request["config"].get("discard_cvss") is not None and request["config"].get("discard_cvss").lower() == "true": + add_cvss = False + if request["config"].get("discard_productinformation") is not None and request["config"].get("discard_productinformation").lower() == "true": + add_products = False + if request["config"].get("discard_classification") is not None and request["config"].get("discard_classification").lower() == "true": + add_classifications = False + if request["config"].get("discard_cpe") is not None and request["config"].get("discard_cpe").lower() == "true": + add_cpe = False + + cpu_vulndb = "" + if add_cpe: + cpu_vulndb = "?show_cpe=true" + + find_by_cve_url = "%s/api/v1/vulnerabilities/%s/find_by_cve_id%s" % (VULNDB_URL, vulnerability, cpu_vulndb) + log.debug(find_by_cve_url) + + try: + + consumer = oauth.Consumer(key=apikey, secret=apisecret) + client = oauth.Client(consumer) + resp, content = client.request(find_by_cve_url, "GET") + content_json = json.loads(content.decode()) + + if content_json: + if 'error' in content_json: + misperrors["error"] = "No CVE information found" + return misperrors + else: + output = {'results': list()} + values_text = list() + values_links = list() + values_cpe = list() + + results = content_json["results"][0] + + # Include the VulnDB title and ID + values_text.append(results["title"]) + vulndb_id_link = "%s/vulnerabilities/%s" % (VULNDB_URL, results["vulndb_id"]) + values_links.append(vulndb_id_link) + + # Descriptive part of the VulnDB item + description = results.get('description', '') or '' + keywords = results.get('keywords', '') or '' + solution = results.get('solution', '') or '' + manual_notes = results.get('manual_notes', '') or '' + t_description = results.get('t_description', '') or '' + if description: + values_text.append(description) + if t_description: + values_text.append(t_description) + if manual_notes: + values_text.append("Notes: " + manual_notes) + if keywords: + values_text.append("Keywords: " + keywords) + if solution: + values_text.append("Solution: " + solution) + + # VulnDB items contain a number of dates, do we include them? + if add_dates: + log.debug("Include dates") + solution_date = results.get('solution_date', '') or '' + if solution_date: + values_text.append("Solution date: " + solution_date) + disclosure_date = results.get('disclosure_date', '') or '' + if disclosure_date: + values_text.append("Disclosure date: " + disclosure_date) + discovery_date = results.get('discovery_date', '') or '' + if discovery_date: + values_text.append("Discovery date: " + discovery_date) + exploit_publish_date = results.get('exploit_publish_date', '') or '' + if exploit_publish_date: + values_text.append("Exploit published date: " + exploit_publish_date) + vendor_informed_date = results.get('vendor_informed_date', '') or '' + if vendor_informed_date: + values_text.append("Vendor informed date: " + vendor_informed_date) + vendor_ack_date = results.get('vendor_ack_date', '') or '' + if vendor_ack_date: + values_text.append("Vendor acknowledgement date: " + vendor_ack_date) + third_party_solution_date = results.get('third_party_solution_date', '') or '' + if third_party_solution_date: + values_text.append("Third party solution date: " + third_party_solution_date) + + # External references + if add_ext_references: + ext_references = results.get('ext_references') + if ext_references: + log.debug("Include external references") + for reference in ext_references: + reference_type = reference["type"] + if reference_type == "Other Advisory URL": + values_links.append(reference["value"]) + elif reference_type == "News Article": + values_links.append(reference["value"]) + elif reference_type == "Generic Informational URL": + values_links.append(reference["value"]) + elif reference_type == "Vendor Specific Advisory URL": + values_links.append(reference["value"]) + elif reference_type == "Vendor URL": + values_links.append(reference["value"]) + elif reference_type == "Mail List Post": + values_links.append(reference["value"]) + elif reference_type == "Metasploit URL": + values_links.append(reference["value"]) + elif reference_type == "Packet Storm": + values_links.append(reference["value"]) + elif reference_type == "Generic Exploit URL": + values_links.append(reference["value"]) + elif reference_type == "CERT VU": + reference_link = "http://www.kb.cert.org/vuls/id/%s" % reference["value"] + values_links.append(reference_link) + elif reference_type == "CVE ID": + reference_link = "https://nvd.nist.gov/vuln/detail/%s" % reference["value"] + values_links.append(reference_link) + elif reference_type == "Microsoft Knowledge Base Article": + reference_link = "https://support.microsoft.com/en-us/help/%s" % reference["value"] + values_links.append(reference_link) + elif reference_type == "Exploit Database": + reference_link = "https://www.exploit-db.com/exploits/%s" % reference["value"] + values_links.append(reference_link) + elif reference_type == "Generic Informational URL": + values_links.append(reference["value"]) + elif reference_type == "Generic Informational URL": + values_links.append(reference["value"]) + elif reference_type == "Generic Informational URL": + values_links.append(reference["value"]) + elif reference_type == "Generic Informational URL": + values_links.append(reference["value"]) + elif reference_type == "Generic Informational URL": + values_links.append(reference["value"]) + + # CVSS Scoring + if add_cvss: + cvss = results.get('cvss_metrics') + if cvss: + log.debug("Include CVSS") + for cvss_metric in cvss: + score = cvss_metric.get("score") + if score: + values_text.append("CVSS %s (base: %s) (source: %s)" % (score, cvss_metric.get("calculated_cvss_base_score"), cvss_metric.get("source"))) + + # Add products + if add_products: + products = results.get('products') + if products and len(products) > 0: + + # Get the vendors + vendors = results.get('vendors') + vendors_name = "" + log.debug("Include product information") + if vendors: + for vendor in vendors: + vendor_detail = vendor.get("vendor") + if vendor_detail: + vendor_name = vendor_detail.get("name") + if vendor_name: + vendors_name += vendor_name + " " + + # Walk through all vendors + for product in products: + vulnerable_product = vendors_name + name = product.get("name") + if name: + vulnerable_product += "%s " % name + versions = product.get("versions") + if versions: + vulnerable_product += "(" + for version in versions: + affected = version.get("affected") + + if affected and affected == "true": + vulnerable_product += " %s " % version.get("name") + if add_cpe: + version_cpe = version.get("cpe") + if version_cpe: + cpe = version_cpe[0].get("cpe") + if cpe: + values_cpe.append(cpe) + + vulnerable_product += ")" + # Add vulnerable products + values_text.append(vulnerable_product) + + # Add vulnerability classifications + if add_classifications: + classifications = results.get("classifications") + if classifications and len(classifications) > 0: + vulnerability_classification = "" + log.debug("Include classifications") + for classification in classifications: + longname = classification.get("longname") + description = classification.get("description") + vulnerability_classification += " \"%s\" " % longname + values_text.append(vulnerability_classification) + + # Finished processing the VulnDB reply; set the result for MISP + output['results'] += [{'types': 'text', 'values': values_text }] + output['results'] += [{'types': 'link', 'values': values_links }] + if add_cpe: + output['results'] += [{'types': 'cpe', 'values': values_cpe }] + return output + else: + misperrors["error"] = "No information retrieved from VulnDB" + return misperrors + except: + misperrors["error"] = "Error while fetching information from VulnDB, wrong API keys?" + return misperrors + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo['config'] = moduleconfig + return moduleinfo