mirror of https://github.com/MISP/misp-modules
chg: [cpe] Support of the new CVE-Search API
parent
ab23547844
commit
b98562a75e
|
@ -6,19 +6,18 @@ from pymisp import MISPEvent, MISPObject
|
||||||
misperrors = {'error': 'Error'}
|
misperrors = {'error': 'Error'}
|
||||||
mispattributes = {'input': ['cpe'], 'format': 'misp_standard'}
|
mispattributes = {'input': ['cpe'], 'format': 'misp_standard'}
|
||||||
moduleinfo = {
|
moduleinfo = {
|
||||||
'version': '1',
|
'version': '2',
|
||||||
'author': 'Christian Studer',
|
'author': 'Christian Studer',
|
||||||
'description': 'An expansion module to enrich a CPE attribute with its related vulnerabilities.',
|
'description': 'An expansion module to enrich a CPE attribute with its related vulnerabilities.',
|
||||||
'module-type': ['expansion', 'hover']
|
'module-type': ['expansion', 'hover']
|
||||||
}
|
}
|
||||||
moduleconfig = ["custom_API_URL", "limit"]
|
moduleconfig = ["custom_API_URL", "limit"]
|
||||||
cveapi_url = 'https://cvepremium.circl.lu/api/cvefor/'
|
cveapi_url = 'https://cvepremium.circl.lu/api/query'
|
||||||
|
|
||||||
|
|
||||||
class VulnerabilitiesParser():
|
class VulnerabilitiesParser():
|
||||||
def __init__(self, attribute, api_url):
|
def __init__(self, attribute):
|
||||||
self.attribute = attribute
|
self.attribute = attribute
|
||||||
self.api_url = api_url
|
|
||||||
self.misp_event = MISPEvent()
|
self.misp_event = MISPEvent()
|
||||||
self.misp_event.add_attribute(**attribute)
|
self.misp_event.add_attribute(**attribute)
|
||||||
self.vulnerability_mapping = {
|
self.vulnerability_mapping = {
|
||||||
|
@ -100,18 +99,27 @@ def handler(q=False):
|
||||||
attribute = request['attribute']
|
attribute = request['attribute']
|
||||||
if attribute.get('type') != 'cpe':
|
if attribute.get('type') != 'cpe':
|
||||||
return {'error': 'Wrong input attribute type.'}
|
return {'error': 'Wrong input attribute type.'}
|
||||||
api_url = check_url(request['config']['custom_API_URL']) if request['config'].get('custom_API_URL') else cveapi_url
|
url = check_url(request['config']['custom_API_URL']) if request['config'].get('custom_API_URL') else cveapi_url
|
||||||
url = f"{api_url}{attribute['value']}"
|
params = {
|
||||||
|
"retrieve": "cves",
|
||||||
|
"dict_filter": {
|
||||||
|
"vulnerable_configuration": attribute['value']
|
||||||
|
}
|
||||||
|
}
|
||||||
if request['config'].get('limit'):
|
if request['config'].get('limit'):
|
||||||
url = f"{url}/{request['config']['limit']}"
|
params.update({
|
||||||
response = requests.get(url)
|
"limit": int(request['config']['limit']),
|
||||||
|
"sort": "cvss",
|
||||||
|
"sort_dir": "DESC"
|
||||||
|
})
|
||||||
|
response = requests.post(url, json=params)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
vulnerabilities = response.json()
|
vulnerabilities = response.json()['data']
|
||||||
if not vulnerabilities:
|
if not vulnerabilities:
|
||||||
return {'error': 'No related vulnerability for this CPE.'}
|
return {'error': 'No related vulnerability for this CPE.'}
|
||||||
else:
|
else:
|
||||||
return {'error': 'API not accessible.'}
|
return {'error': 'API not accessible.'}
|
||||||
parser = VulnerabilitiesParser(attribute, api_url)
|
parser = VulnerabilitiesParser(attribute)
|
||||||
parser.parse_vulnerabilities(vulnerabilities)
|
parser.parse_vulnerabilities(vulnerabilities)
|
||||||
return parser.get_result()
|
return parser.get_result()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue