import json import requests from . import check_input_attribute, standard_error_message from ._vulnerability_parser.vulnerability_parser import ( VulnerabilityMapping, VulnerabilityParser) from pymisp import MISPObject from typing import Iterator misperrors = {'error': 'Error'} mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'} moduleinfo = { 'version': '1', 'author': 'Christian Studer', 'description': 'An expansion module to query Vulnerability Lookup', 'module-type': ['expansion', 'hover'], 'name': 'Vulnerability Lookup', 'logo': '', 'requirements': [], 'features': '', 'references': [], 'input': '', 'output': '', } api_url = 'https://vulnerability.circl.lu' class VulnerabilityLookupMapping(VulnerabilityMapping): __csaf_mapping = { 'id': 'id', 'initial_release_date': 'published', 'current_release_date': 'modified' } __cve_mapping = { 'cveId': 'id', 'datePublished': 'published', 'dateUpdated': 'modified', 'state': 'state' } __gsd_mapping = { 'id': 'id', 'details': 'description', 'modified': 'modified' } __nvd_mapping = { 'id': 'id', 'published': 'published', 'lastModified': 'modified' } __ossf_mapping = { 'id': 'id', 'summary': 'summary', 'details': 'description', 'published': 'published', 'modified': 'modified' } __related_vuln_mapping = { 'cve': 'id', 'title': 'summary', 'discovery_date': 'published' } __source_mapping = { 'cve': '_parse_cve_description', 'ghsa': '_parse_standard_description', 'gsd': '_parse_gsd_description', 'mal': '_parse_ossf_description', 'pysec': '_parse_standard_description', 'var': '_parse_variot_description' } __source_mapping.update( dict.fromkeys( ( 'cisco', 'icsa', 'icsma', 'nn', 'oxas', 'rhba', 'rhea', 'rhsa', 'sca', 'ssa', 'wid' ), '_parse_csaf_description' ) ) __standard_mapping = { 'id': 'id', 'details': 'description', 'published': 'published', 'modified': 'modified' } @classmethod def csaf_mapping(cls) -> dict: return cls.__csaf_mapping @classmethod def cve_mapping(cls) -> dict: return cls.__cve_mapping @classmethod def gsd_mapping(cls) -> dict: return cls.__gsd_mapping @classmethod def nvd_mapping(cls) -> dict: return cls.__nvd_mapping @classmethod def ossf_mapping(cls) -> dict: return cls.__ossf_mapping @classmethod def related_vuln_mapping(cls) -> dict: return cls.__related_vuln_mapping @classmethod def source_mapping(cls, field: str) -> str: return cls.__source_mapping.get(field) @classmethod def standard_mapping(cls) -> dict: return cls.__standard_mapping class VulnerabilityLookupParser(VulnerabilityParser): def __init__(self, attribute: dict): super().__init__(attribute) self.__mapping = VulnerabilityLookupMapping self.__errors = [] @property def errors(self) -> list: return self.__errors @property def mapping(self) -> VulnerabilityLookupMapping: return self.__mapping def parse_lookup_result(self, lookup_result: dict): feature = self.mapping.source_mapping( self.misp_attribute.value.split('-')[0].lower() ) getattr(self, feature)(lookup_result) def _parse_aliases(self, aliases: list) -> Iterator[str]: for alias in aliases: query = requests.get(f"{api_url}/vulnerability/{alias}") if query.status_code != 200: self.errors.append( f'Unable to query related vulnerability id {alias}' ) continue vulnerability = query.json() if not vulnerability: self.errors.append( f'No results for related vulnerability id{alias}' ) continue feature = self.mapping.source_mapping(alias.split('-')[0].lower()) yield getattr(self, feature)(vulnerability) def _parse_csaf_description(self, lookup_result: dict) -> str: description = lookup_result['document'] tracking = description['tracking'] misp_object = MISPObject('vulnerability') for field, relation in self.mapping.csaf_mapping().items(): misp_object.add_attribute(relation, tracking[field]) misp_object.add_attribute('summary', description['title']) for reference in description.get('references', []): misp_object.add_attribute('references', reference['url']) misp_object.add_attribute('credit', description['publisher']['name']) misp_object.add_reference(self.misp_attribute.uuid, 'describes') vulnerability_object = self.misp_event.add_object(misp_object) for vulnerability in lookup_result['vulnerabilities']: related = MISPObject('vulnerability') for field, relation in self.mapping.related_vuln_mapping().items(): if vulnerability.get(field): related.add_attribute(relation, vulnerability[field]) for score in vulnerability.get('scores', []): cvss_v3 = score['cvss_v3'] related.add_attribute('cvss-score', cvss_v3['baseScore']) related.add_attribute('cvss-string', cvss_v3['vectorString']) for reference in vulnerability.get('references', []): related.add_attribute('references', reference['url']) related.add_reference(vulnerability_object.uuid, 'related-to') related_vulnerability = self.misp_event.add_object(related) if vulnerability.get('cwe'): cwe = vulnerability['cwe'] weakness = MISPObject('weakness') for field, value in cwe.items(): weakness.add_attribute(field, value) weakness.add_reference(related_vulnerability.uuid, 'leads-to') self.misp_event.add_object(weakness) return vulnerability_object.uuid def _parse_cve_description(self, lookup_result: dict) -> str: misp_object = MISPObject('vulnerability') cveMetaData = lookup_result['cveMetadata'] for field, relation in self.mapping.cve_mapping().items(): misp_object.add_attribute(relation, cveMetaData[field]) for reference in lookup_result['containers']['cna']['references']: misp_object.add_attribute('references', reference['url']) misp_object.add_reference(self.misp_attribute.uuid, 'related-to') vulnerability_object = self.misp_event.add_object(misp_object) return vulnerability_object.uuid def _parse_cve_related_description(self, cve_description: dict) -> str: misp_object = MISPObject('vulnerability') misp_object.add_attribute( 'id', cve_description['CVE_data_meta']['ID'] ) misp_object.add_attribute( 'description', cve_description['description']['description_data'][0]['value'] ) for cvss in cve_description.get('impact', {}).get('cvss', []): misp_object.add_attribute('cvss-score', cvss['baseScore']) misp_object.add_attribute('cvss-string', cvss['vectorString']) for reference in misp_object.get('references', {}).get('reference_data', []): misp_object.add_attribute('references', reference['url']) return self.misp_event.add_object(misp_object).uuid def _parse_gsd_description(self, lookup_result: dict) -> str: misp_object = MISPObject('vulnerability') gsd_details = lookup_result['gsd']['osvSchema'] for field, relation in self.mapping.gsd_mapping().items(): misp_object.add_attribute(relation, gsd_details[field]) misp_object.add_reference(self.misp_attribute.uuid, 'related-to') vulnerability_object = self.misp_event.add_object(misp_object) for field, values in lookup_result['namespaces'].items(): if field == 'cve.org': vulnerability_object.add_reference( self._parse_cve_related_description(values), 'related-to' ) continue if field == 'nvd.nist.gov' and values.get('cve'): vulnerability_object.add_reference( self._parse_nvd_related_description(values['cve']), 'related-to' ) return vulnerability_object.uuid def _parse_nvd_related_description(self, nvd_description: dict) -> str: misp_object = MISPObject('vulnerability') for field, relation in self.mapping.nvd_mapping().items(): misp_object.add_attribute(relation, nvd_description[field]) misp_object.add_attribute( 'description', nvd_description['descriptions'][0]['value'] ) for cvss in nvd_description.get('metrics', {}).get('cvssMetricV31', []): misp_object.add_attribute( 'cvss-score', cvss['cvssData']['baseScore'] ) misp_object.add_attribute( 'cvss-string', cvss['cvssData']['vectorString'] ) for reference in nvd_description.get('references', []): misp_object.add_attribute('references', reference['url']) return self.misp_event.add_object(misp_object).uuid def _parse_ossf_description(self, lookup_result: dict) -> str: misp_object = MISPObject('vulnerability') for field, relation in self.mapping.ossf_mapping().items(): misp_object.add_attribute(relation, lookup_result[field]) for reference in lookup_result['references']: misp_object.add_attribute('references', reference['url']) misp_object.add_reference(self.misp_attribute.uuid, 'related-to') vulnerability_object = self.misp_event.add_object(misp_object) if lookup_result.get('aliases'): for vuln_uuid in self._parse_aliases(lookup_result['aliases']): vulnerability_object.add_reference(vuln_uuid, 'related-to') return vulnerability_object.uuid def _parse_standard_description(self, lookup_result: dict) -> str: misp_object = MISPObject('vulnerability') for field, relation in self.mapping.standard_mapping().items(): misp_object.add_attribute(relation, lookup_result[field]) for cvss in lookup_result.get('severity', []): misp_object.add_attribute('cvss-string', cvss['score']) for reference in lookup_result['references']: misp_object.add_attribute('references', reference['url']) misp_object.add_reference(self.misp_attribute.uuid, 'related-to') vulnerability_object = self.misp_event.add_object(misp_object) if lookup_result.get('aliases'): for vuln_uuid in self._parse_aliases(lookup_result['aliases']): vulnerability_object.add_reference(vuln_uuid, 'related-to') return vulnerability_object.uuid def handler(q=False): if q is False: return q request = json.loads(q) if not check_input_attribute(request.get('attribute', {})): return { 'error': f'{standard_error_message}, which should contain ' 'at least a type, a value and an UUID.' } attribute = request['attribute'] if attribute.get('type') != 'vulnerability': misperrors['error'] = 'Vulnerability ID missing' return misperrors lookup = requests.get(f"{api_url}/vulnerability/{attribute['value']}") if lookup.status_code == 200: vulnerability = lookup.json() if not vulnerability: misperrors['error'] = 'Non existing Vulnerability ID.' return misperrors else: misperrors['error'] = 'Vulnerability Lookup API not accessible.' return misperrors parser = VulnerabilityLookupParser(attribute) parser.parse_lookup_result(vulnerability) return parser.get_results() def introspection(): return mispattributes def version(): return moduleinfo