diff --git a/misp_modules/modules/expansion/__init__.py b/misp_modules/modules/expansion/__init__.py index b76ed937..3492365f 100644 --- a/misp_modules/modules/expansion/__init__.py +++ b/misp_modules/modules/expansion/__init__.py @@ -21,7 +21,7 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c 'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring', 'clamav', 'jinja_template_rendering','hyasinsight', 'variotdbs', 'crowdsec', 'extract_url_components', 'ipinfo', 'whoisfreaks', 'ip2locationio', 'vysion', 'stairwell', - 'google_threat_intelligence'] + 'google_threat_intelligence', 'vulnerability_lookup'] minimum_required_fields = ('type', 'uuid', 'value') diff --git a/misp_modules/modules/expansion/_vulnerability_parser/__init__.py b/misp_modules/modules/expansion/_vulnerability_parser/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py b/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py new file mode 100644 index 00000000..5f4d69cf --- /dev/null +++ b/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py @@ -0,0 +1,118 @@ +import json +from pymisp import MISPAttribute, MISPEvent, MISPObject + + +class VulnerabilityMapping: + __variot_data_mapping = { + 'credits': 'credit', + 'description': 'description', + 'title': 'summary' + } + __variot_flat_mapping = { + 'cve': 'id', 'id': 'id' + } + + @classmethod + def exploit_mapping(cls) -> dict: + return cls.__exploit_mapping + + @classmethod + def exploit_multiple_mapping(cls) -> dict: + return cls.__exploit_multiple_mapping + + @classmethod + def variot_data_mapping(cls) -> dict: + return cls.__variot_data_mapping + + @classmethod + def variot_flat_mapping(cls) -> dict: + return cls.__variot_flat_mapping + + +class VulnerabilityParser: + def __init__(self, attribute: dict): + misp_attribute = MISPAttribute() + misp_attribute.from_dict(**attribute) + misp_event = MISPEvent() + misp_event.add_attribute(**misp_attribute) + self.__misp_attribute = misp_attribute + self.__misp_event = misp_event + + @property + def misp_attribute(self): + return self.__misp_attribute + + @property + def misp_event(self): + return self.__misp_event + + def get_results(self) -> dict: + event = json.loads(self.misp_event.to_json()) + return { + 'results': { + key: value for key, value in event.items() + if key in ('Attribute', 'Object') + } + } + + def _parse_variot_description(self, query_results): + vulnerability_object = MISPObject('vulnerability') + for field, relation in self.mapping.variot_flat_mapping().items(): + if query_results.get(field): + vulnerability_object.add_attribute( + relation, query_results[field] + ) + for field, relation in self.mapping.variot_data_mapping().items(): + if query_results.get(field, {}).get('data'): + vulnerability_object.add_attribute( + relation, query_results[field]['data'] + ) + if query_results.get('configurations', {}).get('data'): + for configuration in query_results['configurations']['data']: + for node in configuration['nodes']: + for cpe_match in node['cpe_match']: + if cpe_match['vulnerable']: + vulnerability_object.add_attribute( + 'vulnerable-configuration', + cpe_match['cpe23Uri'] + ) + if query_results.get('cvss', {}).get('data'): + cvss = {} + for cvss_data in query_results['cvss']['data']: + for cvss_v3 in cvss_data['cvssV3']: + cvss[float(cvss_v3['trust'])] = cvss_v3 + if cvss: + cvss = cvss[max(cvss)] + vulnerability_object.add_attribute( + 'cvss-score', cvss['baseScore'] + ) + vulnerability_object.add_attribute( + 'cvss-string', cvss['vectorString'] + ) + if query_results.get('references', {}).get('data'): + for reference in query_results['references']['data']: + vulnerability_object.add_attribute( + 'references', reference['url'] + ) + if query_results.get('sources_release_date', {}).get('data'): + for release_date in query_results['sources_release_date']['data']: + if release_date['db'] != 'NVD': + continue + if release_date['id'] == self.misp_attribute.value: + vulnerability_object.add_attribute( + 'published', release_date['date'] + ) + break + if query_results.get('sources_update_date', {}).get('data'): + for update_date in query_results['sources_update_date']['data']: + if update_date['db'] != 'NVD': + continue + if update_date['id'] == self.misp_attribute.value: + vulnerability_object.add_attribute( + 'modified', update_date['date'] + ) + break + vulnerability_object.add_reference( + self.misp_attribute.uuid, 'related-to' + ) + self.misp_event.add_object(vulnerability_object) diff --git a/misp_modules/modules/expansion/variotdbs.py b/misp_modules/modules/expansion/variotdbs.py index 6dc8880d..387dc24f 100644 --- a/misp_modules/modules/expansion/variotdbs.py +++ b/misp_modules/modules/expansion/variotdbs.py @@ -1,7 +1,9 @@ import json import requests from . import check_input_attribute, standard_error_message -from pymisp import MISPAttribute, MISPEvent, MISPObject +from ._vulnerability_parser.vulnerability_parser import ( + VulnerabilityMapping, VulnerabilityParser) +from pymisp import MISPObject misperrors = {'error': 'Error'} mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'} @@ -12,155 +14,60 @@ moduleconfig = ['API_key'] variotdbs_url = 'https://www.variotdbs.pl/api' -class VariotdbsParser: +class VariotMapping(VulnerabilityMapping): + __exploit_mapping = { + 'credits': 'credit', + 'description': 'description', + 'exploit': 'exploit', + 'title': 'title' + } + __exploit_multiple_mapping = { + 'cve': { + 'feature': 'cve_id', + 'relation': 'cve-id' + }, + 'references': { + 'feature': 'url', + 'relation': 'reference' + } + } + + @classmethod + def exploit_mapping(cls) -> dict: + return cls.__exploit_mapping + + @classmethod + def exploit_multiple_mapping(cls) -> dict: + return cls.__exploit_multiple_mapping + + +class VariotdbsParser(VulnerabilityParser): def __init__(self, attribute): - misp_attribute = MISPAttribute() - misp_attribute.from_dict(**attribute) - misp_event = MISPEvent() - misp_event.add_attribute(**misp_attribute) - self.__misp_attribute = misp_attribute - self.__misp_event = misp_event - self.__exploit_mapping = { - 'credits': 'credit', - 'description': 'description', - 'exploit': 'exploit', - 'title': 'title' - } - self.__exploit_multiple_mapping = { - 'cve': { - 'feature': 'cve_id', - 'relation': 'cve-id' - }, - 'references': { - 'feature': 'url', - 'relation': 'reference' - } - } - self.__vulnerability_data_mapping = { - 'credits': 'credit', - 'description': 'description', - 'title': 'summary' - } - self.__vulnerability_flat_mapping = { - 'cve': 'id', 'id': 'id' - } + super().__init__(attribute) + self.__mapping = VulnerabilityMapping @property - def exploit_mapping(self) -> dict: - return self.__exploit_mapping - - @property - def exploit_multiple_mapping(self) -> dict: - return self.__exploit_multiple_mapping - - @property - def misp_attribute(self) -> MISPAttribute: - return self.__misp_attribute - - @property - def misp_event(self) -> MISPEvent: - return self.__misp_event - - @property - def vulnerability_data_mapping(self) -> dict: - return self.__vulnerability_data_mapping - - @property - def vulnerability_flat_mapping(self) -> dict: - return self.__vulnerability_flat_mapping - - def get_results(self): - event = json.loads(self.misp_event.to_json()) - results = {key: event[key] for key in ('Attribute', 'Object') if event.get(key)} - return {'results': results} + def mapping(self) -> VulnerabilityMapping: + return self.__mapping def parse_exploit_information(self, query_results): for exploit in query_results: exploit_object = MISPObject('exploit') exploit_object.add_attribute('exploitdb-id', exploit['edb_id']) - for feature, relation in self.exploit_mapping.items(): - if exploit.get(feature): + for field, relation in self.mapping.exploit_mapping().items(): + if exploit.get(field): exploit_object.add_attribute( - relation, - exploit[feature]['data'] + relation, exploit[field]['data'] ) - for feature, relation in self.exploit_multiple_mapping.items(): - if exploit.get(feature): - for value in exploit[feature]['data']: + for field, relation in self.mapping.exploit_multiple_mapping().items(): + if exploit.get(field): + for value in exploit[field]['data']: exploit_object.add_attribute( - relation['relation'], - value[relation['feature']] + relation['relation'], value[relation['feature']] ) exploit_object.add_reference(self.misp_attribute.uuid, 'related-to') self.misp_event.add_object(exploit_object) - def parse_vulnerability_information(self, query_results): - vulnerability_object = MISPObject('vulnerability') - for feature, relation in self.vulnerability_flat_mapping.items(): - if query_results.get(feature): - vulnerability_object.add_attribute( - relation, - query_results[feature] - ) - for feature, relation in self.vulnerability_data_mapping.items(): - if query_results.get(feature, {}).get('data'): - vulnerability_object.add_attribute( - relation, - query_results[feature]['data'] - ) - if query_results.get('configurations', {}).get('data'): - for configuration in query_results['configurations']['data']: - for node in configuration['nodes']: - for cpe_match in node['cpe_match']: - if cpe_match['vulnerable']: - vulnerability_object.add_attribute( - 'vulnerable-configuration', - cpe_match['cpe23Uri'] - ) - if query_results.get('cvss', {}).get('data'): - cvss = {} - for cvss_data in query_results['cvss']['data']: - for cvss_v3 in cvss_data['cvssV3']: - cvss[float(cvss_v3['trust'])] = cvss_v3 - if cvss: - cvss = cvss[max(cvss)] - vulnerability_object.add_attribute( - 'cvss-score', - cvss['baseScore'] - ) - vulnerability_object.add_attribute( - 'cvss-string', - cvss['vectorString'] - ) - if query_results.get('references', {}).get('data'): - for reference in query_results['references']['data']: - vulnerability_object.add_attribute( - 'references', - reference['url'] - ) - if query_results.get('sources_release_date', {}).get('data'): - for release_date in query_results['sources_release_date']['data']: - if release_date['db'] != 'NVD': - continue - if release_date['id'] == self.misp_attribute.value: - vulnerability_object.add_attribute( - 'published', - release_date['date'] - ) - break - if query_results.get('sources_update_date', {}).get('data'): - for update_date in query_results['sources_update_date']['data']: - if update_date['db'] != 'NVD': - continue - if update_date['id'] == self.misp_attribute.value: - vulnerability_object.add_attribute( - 'modified', - update_date['date'] - ) - break - vulnerability_object.add_reference(self.misp_attribute.uuid, 'related-to') - self.misp_event.add_object(vulnerability_object) - def handler(q=False): if q is False: @@ -180,7 +87,7 @@ def handler(q=False): if r.status_code == 200: vulnerability_results = r.json() if vulnerability_results: - parser.parse_vulnerability_information(vulnerability_results) + parser._parse_variot_description(vulnerability_results) empty = False else: if r.reason != 'Not Found': diff --git a/misp_modules/modules/expansion/vulnerability_lookup.py b/misp_modules/modules/expansion/vulnerability_lookup.py new file mode 100644 index 00000000..81361ebc --- /dev/null +++ b/misp_modules/modules/expansion/vulnerability_lookup.py @@ -0,0 +1,317 @@ +import json +import requests +from . import check_input_attribute, standard_error_message +from ._vulnerability_parser.vulnerability_parser import ( + VulnerabilityMapping, VulnerabilityParser) +from collections.abc import Iterator +from pymisp import MISPObject + +misperrors = {'error': 'Error'} +mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'} +moduleinfo = { + 'version': '1', 'author': 'Christian Studer', + 'description': 'An expansion module to query Vulnerability Lookup', + 'module-type': ['expansion', 'hover'] +} +api_url = 'https://vulnerability.circl.lu' + + +class VulnerabilityLookupMapping(VulnerabilityMapping): + __csaf_mapping = { + 'id': 'id', + 'initial_release_date': 'published', + 'current_release_date': 'modified' + } + __cve_mapping = { + 'cveId': 'id', + 'datePublished': 'published', + 'dateUpdated': 'modified', + 'state': 'state' + } + __gsd_mapping = { + 'id': 'id', + 'details': 'description', + 'modified': 'modified' + } + __nvd_mapping = { + 'id': 'id', + 'published': 'published', + 'lastModified': 'modified' + } + __ossf_mapping = { + 'id': 'id', + 'summary': 'summary', + 'details': 'description', + 'published': 'published', + 'modified': 'modified' + } + __related_vuln_mapping = { + 'cve': 'id', + 'title': 'summary', + 'discovery_date': 'published' + } + __source_mapping = { + 'cve': '_parse_cve_description', + 'ghsa': '_parse_standard_description', + 'gsd': '_parse_gsd_description', + 'mal': '_parse_ossf_description', + 'pysec': '_parse_standard_description', + 'var': '_parse_variot_description' + } + __source_mapping.update( + dict.fromkeys( + ( + 'cisco', 'icsa', 'icsma', 'nn', 'oxas', + 'rhba', 'rhea', 'rhsa', 'sca', 'ssa', 'wid' + ), + '_parse_csaf_description' + ) + ) + __standard_mapping = { + 'id': 'id', + 'details': 'description', + 'published': 'published', + 'modified': 'modified' + } + + @classmethod + def csaf_mapping(cls) -> dict: + return cls.__csaf_mapping + + @classmethod + def cve_mapping(cls) -> dict: + return cls.__cve_mapping + + @classmethod + def gsd_mapping(cls) -> dict: + return cls.__gsd_mapping + + @classmethod + def nvd_mapping(cls) -> dict: + return cls.__nvd_mapping + + @classmethod + def ossf_mapping(cls) -> dict: + return cls.__ossf_mapping + + @classmethod + def related_vuln_mapping(cls) -> dict: + return cls.__related_vuln_mapping + + @classmethod + def source_mapping(cls, field: str) -> str: + return cls.__source_mapping.get(field) + + @classmethod + def standard_mapping(cls) -> dict: + return cls.__standard_mapping + + +class VulnerabilityLookupParser(VulnerabilityParser): + def __init__(self, attribute: dict): + super().__init__(attribute) + self.__mapping = VulnerabilityLookupMapping + self.__errors = [] + + @property + def errors(self) -> list: + return self.__errors + + @property + def mapping(self) -> VulnerabilityLookupMapping: + return self.__mapping + + def parse_lookup_result(self, lookup_result: dict): + feature = self.mapping.source_mapping( + self.misp_attribute.value.split('-')[0].lower() + ) + getattr(self, feature)(lookup_result) + + def _parse_aliases(self, aliases: list) -> Iterator[str]: + for alias in aliases: + query = requests.get(f"{api_url}/vulnerability/{alias}") + if query.status_code != 200: + self.errors.append( + f'Unable to query related vulnerability id {alias}' + ) + continue + vulnerability = query.json() + if not vulnerability: + self.errors.append( + f'No results for related vulnerability id{alias}' + ) + continue + feature = self.mapping.source_mapping(alias.split('-')[0].lower()) + yield getattr(self, feature)(vulnerability) + + def _parse_csaf_description(self, lookup_result: dict) -> str: + description = lookup_result['document'] + + tracking = description['tracking'] + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.csaf_mapping().items(): + misp_object.add_attribute(relation, tracking[field]) + misp_object.add_attribute('summary', description['title']) + for reference in description['references']: + misp_object.add_attribute('references', reference['url']) + misp_object.add_attribute('credit', description['publisher']['name']) + misp_object.add_reference(self.misp_attribute.uuid, 'describes') + vulnerability_object = self.misp_event.add_object(misp_object) + + for vulnerability in lookup_result['vulnerabilities']: + related = MISPObject('vulnerability') + for field, relation in self.mapping.related_vuln_mapping().items(): + if vulnerability.get(field): + related.add_attribute(relation, vulnerability[field]) + for score in vulnerability.get('scores', []): + cvss_v3 = score['cvss_v3'] + related.add_attribute('cvss-score', cvss_v3['baseScore']) + related.add_attribute('cvss-string', cvss_v3['vectorString']) + for reference in vulnerability.get('references', []): + related.add_attribute('references', reference['url']) + related.add_reference(vulnerability_object.uuid, 'related-to') + related_vulnerability = self.misp_event.add_object(related) + if vulnerability.get('cwe'): + cwe = vulnerability['cwe'] + weakness = MISPObject('weakness') + for field, value in cwe.items(): + weakness.add_attribute(field, value) + weakness.add_reference(related_vulnerability.uuid, 'leads-to') + self.misp_event.add_object(weakness) + + return vulnerability_object.uuid + + def _parse_cve_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + cveMetaData = lookup_result['cveMetaData'] + for field, relation in self.mapping.cve_mapping().items(): + misp_object.add_attribute(relation, cveMetaData[field]) + for reference in lookup_result['containers']['cna']['references']: + misp_object.add_attribute('references', reference['url']) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + return vulnerability_object.uuid + + def _parse_cve_related_description(self, cve_description: dict) -> str: + misp_object = MISPObject('vulnerability') + misp_object.add_attribute( + 'id', cve_description['CVE_data_meta']['ID'] + ) + misp_object.add_attribute( + 'description', + cve_description['description']['description_data'][0]['value'] + ) + for cvss in cve_description.get('impact', {}).get('cvss', []): + misp_object.add_attribute('cvss-score', cvss['baseScore']) + misp_object.add_attribute('cvss-string', cvss['vectorString']) + for reference in misp_object.get('references', {}).get('reference_data', []): + misp_object.add_attribute('references', reference['url']) + return self.misp_event.add_object(misp_object).uuid + + def _parse_gsd_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + gsd_details = lookup_result['gsd']['osvSchema'] + for field, relation in self.mapping.gsd_mapping().items(): + misp_object.add_attribute(relation, gsd_details[field]) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + + for field, values in lookup_result['namespaces'].items(): + if field == 'cve.org': + vulnerability_object.add_reference( + self._parse_cve_related_description(values), 'related-to' + ) + continue + if field == 'nvd.nist.gov' and values.get('cve'): + vulnerability_object.add_reference( + self._parse_nvd_related_description(values['cve']), + 'related-to' + ) + + return vulnerability_object.uuid + + def _parse_nvd_related_description(self, nvd_description: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.nvd_mapping().items(): + misp_object.add_attribute(relation, nvd_description[field]) + misp_object.add_attribute( + 'description', nvd_description['descriptions'][0]['value'] + ) + for cvss in nvd_description.get('metrics', {}).get('cvssMetricV31', []): + misp_object.add_attribute( + 'cvss-score', cvss['cvssData']['baseScore'] + ) + misp_object.add_attribute( + 'cvss-string', cvss['cvssData']['vectorString'] + ) + for reference in nvd_description.get('references', []): + misp_object.add_attribute('references', reference['url']) + return self.misp_event.add_object(misp_object).uuid + + def _parse_ossf_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.ossf_mapping().items(): + misp_object.add_attribute(relation, lookup_result[field]) + for reference in lookup_result['references']: + misp_object.add_attribute('references', reference['url']) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + + if lookup_result.get('aliases'): + for vuln_uuid in self._parse_aliases(lookup_result['aliases']): + vulnerability_object.add_reference(vuln_uuid, 'related-to') + + return vulnerability_object.uuid + + def _parse_standard_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.standard_mapping().items(): + misp_object.add_attribute(relation, lookup_result[field]) + misp_object.add_attribute( + 'cvss-string', lookup_result['severity']['score'] + ) + for reference in lookup_result['references']: + misp_object.add_attribute('references', reference['url']) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + + if lookup_result.get('aliases'): + for vuln_uuid in self._parse_aliases(lookup_result['aliases']): + vulnerability_object.add_reference(vuln_uuid, 'related-to') + + return vulnerability_object.uuid + + +def handler(q=False): + if q is False: + return q + request = json.loads(q) + if not check_input_attribute(request.get('attribute', {})): + return { + 'error': f'{standard_error_message}, which should contain ' + 'at least a type, a value and an UUID.' + } + attribute = request['attribute'] + if attribute.get('type') != 'vulnerability': + misperrors['error'] = 'Vulnerability ID missing' + return misperrors + lookup = requests.get(f"{api_url}/vulnerability/{attribute['value']}") + if lookup.status_code == 200: + vulnerability = lookup.json() + if not vulnerability: + misperrors['error'] = 'Non existing Vulnerability ID.' + return misperrors + else: + misperrors['error'] = 'Vulnerability Lookup API not accessible.' + return misperrors + parser = VulnerabilityLookupParser(attribute) + parser.parse_lookup_result(vulnerability) + return parser.get_results() + + +def introspection(): + return mispattributes + + +def version(): + return moduleinfo