2020-10-23 21:19:26 +02:00
|
|
|
import json
|
|
|
|
import requests
|
|
|
|
from . import check_input_attribute, standard_error_message
|
2020-10-24 02:40:31 +02:00
|
|
|
from pymisp import MISPEvent, MISPObject
|
2020-10-23 21:19:26 +02:00
|
|
|
|
|
|
|
misperrors = {'error': 'Error'}
|
2020-10-24 02:40:31 +02:00
|
|
|
mispattributes = {'input': ['cpe'], 'format': 'misp_standard'}
|
2020-10-23 21:19:26 +02:00
|
|
|
moduleinfo = {
|
2020-11-10 17:53:47 +01:00
|
|
|
'version': '2',
|
2020-10-23 21:19:26 +02:00
|
|
|
'author': 'Christian Studer',
|
2020-10-24 02:40:31 +02:00
|
|
|
'description': 'An expansion module to enrich a CPE attribute with its related vulnerabilities.',
|
2020-10-23 21:19:26 +02:00
|
|
|
'module-type': ['expansion', 'hover']
|
|
|
|
}
|
2020-10-24 02:40:31 +02:00
|
|
|
moduleconfig = ["custom_API_URL", "limit"]
|
2020-11-10 17:53:47 +01:00
|
|
|
cveapi_url = 'https://cvepremium.circl.lu/api/query'
|
2020-11-13 15:46:41 +01:00
|
|
|
DEFAULT_LIMIT = 10
|
2020-10-23 21:19:26 +02:00
|
|
|
|
|
|
|
|
2020-10-24 02:40:31 +02:00
|
|
|
class VulnerabilitiesParser():
|
2020-11-10 17:53:47 +01:00
|
|
|
def __init__(self, attribute):
|
2020-10-23 21:19:26 +02:00
|
|
|
self.attribute = attribute
|
|
|
|
self.misp_event = MISPEvent()
|
|
|
|
self.misp_event.add_attribute(**attribute)
|
|
|
|
self.vulnerability_mapping = {
|
|
|
|
'id': {
|
|
|
|
'type': 'vulnerability',
|
|
|
|
'object_relation': 'id'
|
|
|
|
},
|
|
|
|
'summary': {
|
|
|
|
'type': 'text',
|
|
|
|
'object_relation': 'summary'
|
|
|
|
},
|
|
|
|
'vulnerable_configuration': {
|
|
|
|
'type': 'cpe',
|
2020-11-13 15:49:58 +01:00
|
|
|
'object_relation': 'vulnerable-configuration'
|
2020-10-23 21:19:26 +02:00
|
|
|
},
|
|
|
|
'vulnerable_configuration_cpe_2_2': {
|
|
|
|
'type': 'cpe',
|
2020-11-13 15:49:58 +01:00
|
|
|
'object_relation': 'vulnerable-configuration'
|
2020-10-23 21:19:26 +02:00
|
|
|
},
|
|
|
|
'Modified': {
|
|
|
|
'type': 'datetime',
|
|
|
|
'object_relation': 'modified'
|
|
|
|
},
|
|
|
|
'Published': {
|
|
|
|
'type': 'datetime',
|
|
|
|
'object_relation': 'published'
|
|
|
|
},
|
|
|
|
'references': {
|
|
|
|
'type': 'link',
|
|
|
|
'object_relation': 'references'
|
|
|
|
},
|
|
|
|
'cvss': {
|
|
|
|
'type': 'float',
|
|
|
|
'object_relation': 'cvss-score'
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def parse_vulnerabilities(self, vulnerabilities):
|
|
|
|
for vulnerability in vulnerabilities:
|
|
|
|
vulnerability_object = MISPObject('vulnerability')
|
|
|
|
for feature in ('id', 'summary', 'Modified', 'Published', 'cvss'):
|
|
|
|
if vulnerability.get(feature):
|
|
|
|
attribute = {'value': vulnerability[feature]}
|
2020-10-24 02:40:31 +02:00
|
|
|
attribute.update(self.vulnerability_mapping[feature])
|
2020-10-23 21:19:26 +02:00
|
|
|
vulnerability_object.add_attribute(**attribute)
|
|
|
|
if vulnerability.get('Published'):
|
|
|
|
vulnerability_object.add_attribute(**{
|
|
|
|
'type': 'text',
|
|
|
|
'object_relation': 'state',
|
|
|
|
'value': 'Published'
|
|
|
|
})
|
|
|
|
for feature in ('references', 'vulnerable_configuration', 'vulnerable_configuration_cpe_2_2'):
|
|
|
|
if vulnerability.get(feature):
|
|
|
|
for value in vulnerability[feature]:
|
|
|
|
if isinstance(value, dict):
|
|
|
|
value = value['title']
|
|
|
|
attribute = {'value': value}
|
|
|
|
attribute.update(self.vulnerability_mapping[feature])
|
|
|
|
vulnerability_object.add_attribute(**attribute)
|
|
|
|
vulnerability_object.add_reference(self.attribute['uuid'], 'related-to')
|
|
|
|
self.misp_event.add_object(vulnerability_object)
|
|
|
|
|
|
|
|
def get_result(self):
|
|
|
|
event = json.loads(self.misp_event.to_json())
|
2020-10-24 02:40:31 +02:00
|
|
|
results = {key: event[key] for key in ('Attribute', 'Object')}
|
2020-10-23 21:19:26 +02:00
|
|
|
return {'results': results}
|
|
|
|
|
|
|
|
|
|
|
|
def check_url(url):
|
2020-10-24 23:52:06 +02:00
|
|
|
return url if url.endswith('/') else f"{url}/"
|
2020-10-23 21:19:26 +02:00
|
|
|
|
|
|
|
|
|
|
|
def handler(q=False):
|
|
|
|
if q is False:
|
|
|
|
return False
|
|
|
|
request = json.loads(q)
|
|
|
|
if not request.get('attribute') or not check_input_attribute(request['attribute']):
|
|
|
|
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
|
|
|
|
attribute = request['attribute']
|
|
|
|
if attribute.get('type') != 'cpe':
|
|
|
|
return {'error': 'Wrong input attribute type.'}
|
2020-11-13 15:46:41 +01:00
|
|
|
config = request['config']
|
|
|
|
url = check_url(config['custom_API_URL']) if config.get('custom_API_URL') else cveapi_url
|
|
|
|
limit = int(config['limit']) if config.get('limit') else DEFAULT_LIMIT
|
2020-11-10 17:53:47 +01:00
|
|
|
params = {
|
|
|
|
"retrieve": "cves",
|
|
|
|
"dict_filter": {
|
|
|
|
"vulnerable_configuration": attribute['value']
|
2020-11-13 15:46:41 +01:00
|
|
|
},
|
|
|
|
"limit": limit,
|
|
|
|
"sort": "cvss",
|
|
|
|
"sort_dir": "DESC"
|
2020-11-10 17:53:47 +01:00
|
|
|
}
|
|
|
|
response = requests.post(url, json=params)
|
2020-10-23 21:19:26 +02:00
|
|
|
if response.status_code == 200:
|
2020-11-10 17:53:47 +01:00
|
|
|
vulnerabilities = response.json()['data']
|
2020-10-23 21:19:26 +02:00
|
|
|
if not vulnerabilities:
|
|
|
|
return {'error': 'No related vulnerability for this CPE.'}
|
|
|
|
else:
|
|
|
|
return {'error': 'API not accessible.'}
|
2020-11-10 17:53:47 +01:00
|
|
|
parser = VulnerabilitiesParser(attribute)
|
2020-10-23 21:19:26 +02:00
|
|
|
parser.parse_vulnerabilities(vulnerabilities)
|
|
|
|
return parser.get_result()
|
|
|
|
|
|
|
|
|
|
|
|
def introspection():
|
|
|
|
return mispattributes
|
|
|
|
|
|
|
|
|
|
|
|
def version():
|
|
|
|
moduleinfo['config'] = moduleconfig
|
|
|
|
return moduleinfo
|