From 410aaaeb2896bfb69d14b7ac2f293a22f914fadc Mon Sep 17 00:00:00 2001 From: chrisr3d Date: Fri, 23 Oct 2020 21:19:26 +0200 Subject: [PATCH] add: First shot of an expansio module to query cve-search with a cpe to get the related vulnerabilities --- misp_modules/modules/expansion/cpe.py | 122 ++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 misp_modules/modules/expansion/cpe.py diff --git a/misp_modules/modules/expansion/cpe.py b/misp_modules/modules/expansion/cpe.py new file mode 100644 index 0000000..74cb1b6 --- /dev/null +++ b/misp_modules/modules/expansion/cpe.py @@ -0,0 +1,122 @@ +import json +import requests +from . import check_input_attribute, standard_error_message + +misperrors = {'error': 'Error'} +mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'} +moduleinfo = { + 'version': '1', + 'author': 'Christian Studer', + 'description': 'An expansion module to enrich a CPE attribute with the related vulnerabilities.', + 'module-type': ['expansion', 'hover'] +} +moduleconfig = ["custom_API_URL"] + + +class VulnerabilityParser(): + def __init__(self, attribute, api_url): + self.attribute = attribute + self.api_url = api_url + self.misp_event = MISPEvent() + self.misp_event.add_attribute(**attribute) + self.vulnerability_mapping = { + 'id': { + 'type': 'vulnerability', + 'object_relation': 'id' + }, + 'summary': { + 'type': 'text', + 'object_relation': 'summary' + }, + 'vulnerable_configuration': { + 'type': 'cpe', + 'object_relation': 'vulnerable_configuration' + }, + 'vulnerable_configuration_cpe_2_2': { + 'type': 'cpe', + 'object_relation': 'vulnerable_configuration' + }, + 'Modified': { + 'type': 'datetime', + 'object_relation': 'modified' + }, + 'Published': { + 'type': 'datetime', + 'object_relation': 'published' + }, + 'references': { + 'type': 'link', + 'object_relation': 'references' + }, + 'cvss': { + 'type': 'float', + 'object_relation': 'cvss-score' + } + } + + def parse_vulnerabilities(self, vulnerabilities): + for vulnerability in vulnerabilities: + vulnerability_object = MISPObject('vulnerability') + for feature in ('id', 'summary', 'Modified', 'Published', 'cvss'): + if vulnerability.get(feature): + attribute = {'value': vulnerability[feature]} + atttribute.update(self.vulnerability_mapping[feature]) + vulnerability_object.add_attribute(**attribute) + if vulnerability.get('Published'): + vulnerability_object.add_attribute(**{ + 'type': 'text', + 'object_relation': 'state', + 'value': 'Published' + }) + for feature in ('references', 'vulnerable_configuration', 'vulnerable_configuration_cpe_2_2'): + if vulnerability.get(feature): + for value in vulnerability[feature]: + if isinstance(value, dict): + value = value['title'] + attribute = {'value': value} + attribute.update(self.vulnerability_mapping[feature]) + vulnerability_object.add_attribute(**attribute) + vulnerability_object.add_reference(self.attribute['uuid'], 'related-to') + self.misp_event.add_object(vulnerability_object) + + def get_result(self): + event = json.loads(self.misp_event.to_json()) + results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])} + return {'results': results} + + +def check_url(url): + return "{}/".format(url) if not url.endswith('/') else url + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + if not request.get('attribute') or not check_input_attribute(request['attribute']): + return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} + attribute = request['attribute'] + if attribute.get('type') != 'cpe': + return {'error': 'Wrong input attribute type.'} + if not request.get('config') or not request['config'].get('custom_API_URL'): + return {'error': 'Missing API URL'} + api_url = check_url(request['config']['custom_API_URL']) + response = requests.get("{}{}".format(api_url, attribute['value'])) + if response.status_code == 200: + vulnerabilities = response.json() + if not vulnerabilities: + return {'error': 'No related vulnerability for this CPE.'} + else: + return {'error': 'API not accessible.'} + parser = VulnerabilitiesParser(attribute, api_url) + parser.parse_vulnerabilities(vulnerabilities) + return parser.get_result() + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo['config'] = moduleconfig + return moduleinfo