mirror of https://github.com/MISP/misp-modules
add: First shot of an expansio module to query cve-search with a cpe to get the related vulnerabilities
parent
c00349e198
commit
410aaaeb28
|
@ -0,0 +1,122 @@
|
|||
import json
|
||||
import requests
|
||||
from . import check_input_attribute, standard_error_message
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'}
|
||||
moduleinfo = {
|
||||
'version': '1',
|
||||
'author': 'Christian Studer',
|
||||
'description': 'An expansion module to enrich a CPE attribute with the related vulnerabilities.',
|
||||
'module-type': ['expansion', 'hover']
|
||||
}
|
||||
moduleconfig = ["custom_API_URL"]
|
||||
|
||||
|
||||
class VulnerabilityParser():
|
||||
def __init__(self, attribute, api_url):
|
||||
self.attribute = attribute
|
||||
self.api_url = api_url
|
||||
self.misp_event = MISPEvent()
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
self.vulnerability_mapping = {
|
||||
'id': {
|
||||
'type': 'vulnerability',
|
||||
'object_relation': 'id'
|
||||
},
|
||||
'summary': {
|
||||
'type': 'text',
|
||||
'object_relation': 'summary'
|
||||
},
|
||||
'vulnerable_configuration': {
|
||||
'type': 'cpe',
|
||||
'object_relation': 'vulnerable_configuration'
|
||||
},
|
||||
'vulnerable_configuration_cpe_2_2': {
|
||||
'type': 'cpe',
|
||||
'object_relation': 'vulnerable_configuration'
|
||||
},
|
||||
'Modified': {
|
||||
'type': 'datetime',
|
||||
'object_relation': 'modified'
|
||||
},
|
||||
'Published': {
|
||||
'type': 'datetime',
|
||||
'object_relation': 'published'
|
||||
},
|
||||
'references': {
|
||||
'type': 'link',
|
||||
'object_relation': 'references'
|
||||
},
|
||||
'cvss': {
|
||||
'type': 'float',
|
||||
'object_relation': 'cvss-score'
|
||||
}
|
||||
}
|
||||
|
||||
def parse_vulnerabilities(self, vulnerabilities):
|
||||
for vulnerability in vulnerabilities:
|
||||
vulnerability_object = MISPObject('vulnerability')
|
||||
for feature in ('id', 'summary', 'Modified', 'Published', 'cvss'):
|
||||
if vulnerability.get(feature):
|
||||
attribute = {'value': vulnerability[feature]}
|
||||
atttribute.update(self.vulnerability_mapping[feature])
|
||||
vulnerability_object.add_attribute(**attribute)
|
||||
if vulnerability.get('Published'):
|
||||
vulnerability_object.add_attribute(**{
|
||||
'type': 'text',
|
||||
'object_relation': 'state',
|
||||
'value': 'Published'
|
||||
})
|
||||
for feature in ('references', 'vulnerable_configuration', 'vulnerable_configuration_cpe_2_2'):
|
||||
if vulnerability.get(feature):
|
||||
for value in vulnerability[feature]:
|
||||
if isinstance(value, dict):
|
||||
value = value['title']
|
||||
attribute = {'value': value}
|
||||
attribute.update(self.vulnerability_mapping[feature])
|
||||
vulnerability_object.add_attribute(**attribute)
|
||||
vulnerability_object.add_reference(self.attribute['uuid'], 'related-to')
|
||||
self.misp_event.add_object(vulnerability_object)
|
||||
|
||||
def get_result(self):
|
||||
event = json.loads(self.misp_event.to_json())
|
||||
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
|
||||
return {'results': results}
|
||||
|
||||
|
||||
def check_url(url):
|
||||
return "{}/".format(url) if not url.endswith('/') else url
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
if q is False:
|
||||
return False
|
||||
request = json.loads(q)
|
||||
if not request.get('attribute') or not check_input_attribute(request['attribute']):
|
||||
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
|
||||
attribute = request['attribute']
|
||||
if attribute.get('type') != 'cpe':
|
||||
return {'error': 'Wrong input attribute type.'}
|
||||
if not request.get('config') or not request['config'].get('custom_API_URL'):
|
||||
return {'error': 'Missing API URL'}
|
||||
api_url = check_url(request['config']['custom_API_URL'])
|
||||
response = requests.get("{}{}".format(api_url, attribute['value']))
|
||||
if response.status_code == 200:
|
||||
vulnerabilities = response.json()
|
||||
if not vulnerabilities:
|
||||
return {'error': 'No related vulnerability for this CPE.'}
|
||||
else:
|
||||
return {'error': 'API not accessible.'}
|
||||
parser = VulnerabilitiesParser(attribute, api_url)
|
||||
parser.parse_vulnerabilities(vulnerabilities)
|
||||
return parser.get_result()
|
||||
|
||||
|
||||
def introspection():
|
||||
return mispattributes
|
||||
|
||||
|
||||
def version():
|
||||
moduleinfo['config'] = moduleconfig
|
||||
return moduleinfo
|
Loading…
Reference in New Issue