mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			283 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			283 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| '''
 | |
| Import VulnDB
 | |
|     https://vulndb.cyberriskanalytics.com/
 | |
|     https://www.riskbasedsecurity.com/
 | |
| '''
 | |
| 
 | |
| import oauth2 as oauth
 | |
| import json
 | |
| 
 | |
| import logging
 | |
| import sys
 | |
| 
 | |
| 
 | |
| log = logging.getLogger('vulndb')
 | |
| log.setLevel(logging.DEBUG)
 | |
| ch = logging.StreamHandler(sys.stdout)
 | |
| ch.setLevel(logging.DEBUG)
 | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 | |
| ch.setFormatter(formatter)
 | |
| log.addHandler(ch)
 | |
| 
 | |
| 
 | |
| misperrors = {'error': 'Error'}
 | |
| mispattributes = {
 | |
|         'input': ['vulnerability'], 
 | |
|         'output': ['text', 'link', 'cpe']}
 | |
| moduleinfo = {'version': '0.1', 'author': 'Koen Van Impe',
 | |
|               'description': 'Query VulnDB - RiskBasedSecurity.com',
 | |
|               'module-type': ['expansion', 'hover']}
 | |
| 
 | |
| moduleconfig = ['apikey', 'apisecret', 'discard_dates', 'discard_external_references', 'discard_cvss', 'discard_productinformation', 'discard_classification', 'discard_cpe']
 | |
| 
 | |
| 
 | |
| def handler(q=False):
 | |
|     # Base URL for VulnDB
 | |
|     VULNDB_URL = "https://vulndb.cyberriskanalytics.com"
 | |
| 
 | |
|     if q is False:
 | |
|         return False
 | |
|     request = json.loads(q)
 | |
| 
 | |
|     # Only continue if we have a vulnerability attribute
 | |
|     if not request.get('vulnerability'):
 | |
|         misperrors['error'] = 'Vulnerability ID missing for VulnDB.'
 | |
|         return misperrors
 | |
|     vulnerability = request.get('vulnerability')
 | |
| 
 | |
|     if request["config"].get("apikey") is None or request["config"].get("apisecret") is None:
 | |
|         misperrors["error"] = "Missing API key or secret value for VulnDB."
 | |
|         return misperrors
 | |
|     apikey = request["config"].get("apikey")
 | |
|     apisecret = request["config"].get("apisecret")
 | |
| 
 | |
|     # This has to be done the 'inverse' way, MISP-server settings are set to False by default
 | |
|     add_cvss = True
 | |
|     add_products = True
 | |
|     add_classifications = True
 | |
|     add_cpe = True
 | |
|     add_dates = True
 | |
|     add_ext_references = True
 | |
| 
 | |
|     if request["config"].get("discard_dates") is not None  and request["config"].get("discard_dates").lower() == "true":
 | |
|         add_dates = False
 | |
|     if request["config"].get("discard_external_references") is not None and request["config"].get("discard_external_references").lower() == "true":
 | |
|         add_ext_references = False
 | |
|     if request["config"].get("discard_cvss") is not None and request["config"].get("discard_cvss").lower() == "true":
 | |
|         add_cvss = False
 | |
|     if request["config"].get("discard_productinformation") is not None and request["config"].get("discard_productinformation").lower() == "true":
 | |
|         add_products = False
 | |
|     if request["config"].get("discard_classification") is not None and request["config"].get("discard_classification").lower() == "true":
 | |
|         add_classifications = False
 | |
|     if request["config"].get("discard_cpe") is not None and request["config"].get("discard_cpe").lower() == "true":
 | |
|         add_cpe = False
 | |
| 
 | |
|     cpu_vulndb = ""
 | |
|     if add_cpe:
 | |
|         cpu_vulndb = "?show_cpe=true"
 | |
| 
 | |
|     find_by_cve_url = "%s/api/v1/vulnerabilities/%s/find_by_cve_id%s" % (VULNDB_URL, vulnerability, cpu_vulndb)
 | |
|     log.debug(find_by_cve_url)
 | |
|     
 | |
|     try:
 | |
| 
 | |
|         consumer = oauth.Consumer(key=apikey, secret=apisecret)
 | |
|         client = oauth.Client(consumer)
 | |
|         resp, content = client.request(find_by_cve_url, "GET")
 | |
|         content_json = json.loads(content.decode())
 | |
| 
 | |
|         if content_json:
 | |
|             if 'error' in content_json:
 | |
|                 misperrors["error"] = "No CVE information found."
 | |
|                 return misperrors
 | |
|             else:
 | |
|                 output = {'results': list()}
 | |
|                 values_text = list()
 | |
|                 values_links = list()
 | |
|                 values_cpe = list()
 | |
| 
 | |
|                 results = content_json["results"][0]
 | |
| 
 | |
|                 # Include the VulnDB title and ID
 | |
|                 values_text.append(results["title"])
 | |
|                 vulndb_id_link = "%s/vulnerabilities/%s" % (VULNDB_URL, results["vulndb_id"])
 | |
|                 values_links.append(vulndb_id_link)
 | |
| 
 | |
|                 # Descriptive part of the VulnDB item
 | |
|                 description = results.get('description', '') or ''
 | |
|                 keywords = results.get('keywords', '') or ''
 | |
|                 solution = results.get('solution', '') or ''
 | |
|                 manual_notes = results.get('manual_notes', '') or ''
 | |
|                 t_description = results.get('t_description', '') or ''
 | |
|                 if description:
 | |
|                     values_text.append(description)
 | |
|                 if t_description:
 | |
|                     values_text.append(t_description)
 | |
|                 if manual_notes:
 | |
|                     values_text.append("Notes: " + manual_notes)                                        
 | |
|                 if keywords:
 | |
|                     values_text.append("Keywords: " + keywords)
 | |
|                 if solution:
 | |
|                     values_text.append("Solution: " + solution)
 | |
| 
 | |
|                 # VulnDB items contain a number of dates, do we include them?
 | |
|                 if add_dates:
 | |
|                     log.debug("Include dates")
 | |
|                     solution_date = results.get('solution_date', '') or ''
 | |
|                     if solution_date:
 | |
|                         values_text.append("Solution date: " + solution_date)
 | |
|                     disclosure_date = results.get('disclosure_date', '') or ''
 | |
|                     if disclosure_date:
 | |
|                         values_text.append("Disclosure date: " + disclosure_date)                    
 | |
|                     discovery_date = results.get('discovery_date', '') or ''
 | |
|                     if discovery_date:
 | |
|                         values_text.append("Discovery date: " + discovery_date)                    
 | |
|                     exploit_publish_date = results.get('exploit_publish_date', '') or ''
 | |
|                     if exploit_publish_date:
 | |
|                         values_text.append("Exploit published date: " + exploit_publish_date)                    
 | |
|                     vendor_informed_date = results.get('vendor_informed_date', '') or ''
 | |
|                     if vendor_informed_date:
 | |
|                         values_text.append("Vendor informed date: " + vendor_informed_date)                    
 | |
|                     vendor_ack_date = results.get('vendor_ack_date', '') or ''
 | |
|                     if vendor_ack_date:
 | |
|                         values_text.append("Vendor acknowledgement date: " + vendor_ack_date)                    
 | |
|                     third_party_solution_date = results.get('third_party_solution_date', '') or ''
 | |
|                     if third_party_solution_date:
 | |
|                         values_text.append("Third party solution date: " + third_party_solution_date)                    
 | |
| 
 | |
|                 # External references
 | |
|                 if add_ext_references:
 | |
|                     ext_references = results.get('ext_references')
 | |
|                     if ext_references:
 | |
|                         log.debug("Include external references")
 | |
|                         for reference in ext_references:
 | |
|                             reference_type = reference["type"]
 | |
|                             if reference_type == "Other Advisory URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "News Article":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Generic Informational URL":
 | |
|                                 values_links.append(reference["value"])                               
 | |
|                             elif reference_type == "Vendor Specific Advisory URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Vendor URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Mail List Post":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Metasploit URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Packet Storm":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Generic Exploit URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "CERT VU":
 | |
|                                 reference_link = "http://www.kb.cert.org/vuls/id/%s" % reference["value"]
 | |
|                                 values_links.append(reference_link)
 | |
|                             elif reference_type == "CVE ID":
 | |
|                                 reference_link = "https://nvd.nist.gov/vuln/detail/%s" % reference["value"]
 | |
|                                 values_links.append(reference_link)
 | |
|                             elif reference_type == "Microsoft Knowledge Base Article":
 | |
|                                 reference_link = "https://support.microsoft.com/en-us/help/%s" % reference["value"]
 | |
|                                 values_links.append(reference_link)
 | |
|                             elif reference_type == "Exploit Database":
 | |
|                                 reference_link = "https://www.exploit-db.com/exploits/%s" % reference["value"]
 | |
|                                 values_links.append(reference_link)                                
 | |
|                             elif reference_type == "Generic Informational URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Generic Informational URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Generic Informational URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Generic Informational URL":
 | |
|                                 values_links.append(reference["value"])
 | |
|                             elif reference_type == "Generic Informational URL":
 | |
|                                 values_links.append(reference["value"])
 | |
| 
 | |
|                 # CVSS Scoring
 | |
|                 if add_cvss:
 | |
|                     cvss = results.get('cvss_metrics')
 | |
|                     if cvss:
 | |
|                         log.debug("Include CVSS")
 | |
|                         for cvss_metric in cvss:
 | |
|                             score = cvss_metric.get("score")
 | |
|                             if score:
 | |
|                                 values_text.append("CVSS %s (base: %s) (source: %s)" % (score, cvss_metric.get("calculated_cvss_base_score"), cvss_metric.get("source")))
 | |
| 
 | |
|                 # Add products
 | |
|                 if add_products:
 | |
|                     products = results.get('products')
 | |
|                     if products and len(products) > 0:
 | |
| 
 | |
|                         # Get the vendors
 | |
|                         vendors = results.get('vendors')
 | |
|                         vendors_name = ""
 | |
|                         log.debug("Include product information")
 | |
|                         if vendors:
 | |
|                             for vendor in vendors:
 | |
|                                 vendor_detail = vendor.get("vendor")
 | |
|                                 if vendor_detail:
 | |
|                                     vendor_name = vendor_detail.get("name")
 | |
|                                     if vendor_name:
 | |
|                                         vendors_name += vendor_name + " "
 | |
| 
 | |
|                         # Walk through all vendors
 | |
|                         for product in products:
 | |
|                             vulnerable_product = vendors_name
 | |
|                             name = product.get("name")
 | |
|                             if name:
 | |
|                                 vulnerable_product += "%s " % name
 | |
|                             versions = product.get("versions")
 | |
|                             if versions:
 | |
|                                 vulnerable_product += "("
 | |
|                                 for version in versions:
 | |
|                                     affected = version.get("affected")
 | |
| 
 | |
|                                     if affected and affected == "true":
 | |
|                                         vulnerable_product += " %s " % version.get("name")
 | |
|                                     if add_cpe:
 | |
|                                         version_cpe = version.get("cpe")
 | |
|                                         if version_cpe:
 | |
|                                             cpe = version_cpe[0].get("cpe")
 | |
|                                             if cpe:
 | |
|                                                 values_cpe.append(cpe)
 | |
| 
 | |
|                                 vulnerable_product += ")"
 | |
|                             # Add vulnerable products
 | |
|                             values_text.append(vulnerable_product)
 | |
| 
 | |
|                 # Add vulnerability classifications
 | |
|                 if add_classifications:
 | |
|                     classifications = results.get("classifications")
 | |
|                     if classifications and len(classifications) > 0:
 | |
|                         vulnerability_classification = ""
 | |
|                         log.debug("Include classifications")
 | |
|                         for classification in classifications:
 | |
|                             longname = classification.get("longname")
 | |
|                             description = classification.get("description")
 | |
|                             vulnerability_classification += " \"%s\" " % longname
 | |
|                         values_text.append(vulnerability_classification)
 | |
| 
 | |
|                 # Finished processing the VulnDB reply; set the result for MISP
 | |
|                 output['results'] += [{'types': 'text', 'values':  values_text }]
 | |
|                 output['results'] += [{'types': 'link', 'values':  values_links }]
 | |
|                 if add_cpe:
 | |
|                     output['results'] += [{'types': 'cpe', 'values':  values_cpe }]
 | |
|                 return output
 | |
|         else:
 | |
|             misperrors["error"] = "No information retrieved from VulnDB."
 | |
|             return misperrors
 | |
|     except:
 | |
|         misperrors["error"] = "Error while fetching information from VulnDB, wrong API keys?"
 | |
|         return misperrors        
 | |
| 
 | |
| 
 | |
| def introspection():
 | |
|     return mispattributes
 | |
| 
 | |
| 
 | |
| def version():
 | |
|     moduleinfo['config'] = moduleconfig
 | |
|     return moduleinfo
 |