diff --git a/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py b/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py index e4adabc9..cbb0841b 100644 --- a/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py +++ b/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py @@ -1,5 +1,7 @@ import json +import requests from pymisp import MISPAttribute, MISPEvent, MISPObject +from typing import Iterator class VulnerabilityMapping: @@ -118,3 +120,385 @@ class VulnerabilityParser: self.misp_attribute.uuid, 'related-to' ) self.misp_event.add_object(vulnerability_object) + + +class VulnerabilityLookupMapping(VulnerabilityMapping): + __csaf_mapping = { + 'id': 'id', + 'initial_release_date': 'published', + 'current_release_date': 'modified' + } + __cve_mapping = { + 'cveId': 'id', + 'datePublished': 'published', + 'dateUpdated': 'modified', + 'state': 'state' + } + __cwe_mapping = { + 'cweId': 'id', + 'description': 'description', + 'name': 'name' + } + __gsd_mapping = { + 'id': 'id', + 'details': 'description', + 'modified': 'modified' + } + __jvn_mapping = { + 'sec:identifier': 'id', + 'description': 'description', + 'title': 'summary', + 'link': 'references', + 'dcterms:issued': 'published', + 'dcterms:modified': 'modified' + } + __nvd_mapping = { + 'id': 'id', + 'published': 'published', + 'lastModified': 'modified' + } + __ossf_mapping = { + 'id': 'id', + 'summary': 'summary', + 'details': 'description', + 'published': 'published', + 'modified': 'modified' + } + __related_vuln_mapping = { + 'cve': 'id', + 'title': 'summary', + 'discovery_date': 'published' + } + __source_mapping = { + 'cve': '_parse_cve_description', + 'ghsa': '_parse_standard_description', + 'gsd': '_parse_gsd_description', + 'jvndb': '_parse_jvn_description', + 'mal': '_parse_ossf_description', + 'pysec': '_parse_standard_description', + 'ts': '_parse_tailscale_description', + 'var': '_parse_variot_description' + } + __source_mapping.update( + dict.fromkeys( + ( + 'cisco', 'icsa', 'icsma', 'ncsc', 'nn', 'oxas', + 'rhba', 'rhea', 'rhsa', 'sca', 'ssa', 'va', 'wid' + ), + '_parse_csaf_description' + ) + ) + __standard_mapping = { + 'id': 'id', + 'details': 'description', + 'published': 'published', + 'modified': 'modified' + } + __tailscale_mapping = { + 'title': 'id', + 'link': 'references', + 'summary': 'summary', + 'published': 'published' + } + + @classmethod + def csaf_mapping(cls) -> dict: + return cls.__csaf_mapping + + @classmethod + def cve_mapping(cls) -> dict: + return cls.__cve_mapping + + @classmethod + def cwe_mapping(cls) -> dict: + return cls.__cwe_mapping + + @classmethod + def gsd_mapping(cls) -> dict: + return cls.__gsd_mapping + + @classmethod + def jvn_mapping(cls) -> dict: + return cls.__jvn_mapping + + @classmethod + def nvd_mapping(cls) -> dict: + return cls.__nvd_mapping + + @classmethod + def ossf_mapping(cls) -> dict: + return cls.__ossf_mapping + + @classmethod + def related_vuln_mapping(cls) -> dict: + return cls.__related_vuln_mapping + + @classmethod + def source_mapping(cls, field: str) -> str: + return cls.__source_mapping.get(field) + + @classmethod + def standard_mapping(cls) -> dict: + return cls.__standard_mapping + + @classmethod + def tailscale_mapping(cls) -> dict: + return cls.__tailscale_mapping + + +class VulnerabilityLookupParser(VulnerabilityParser): + def __init__(self, attribute: dict, api_url: str): + super().__init__(attribute) + self.__api_url = api_url + self.__mapping = VulnerabilityLookupMapping + self.__errors = [] + + @property + def api_url(self) -> str: + return self.__api_url + + @property + def errors(self) -> list: + return self.__errors + + @property + def mapping(self) -> VulnerabilityLookupMapping: + return self.__mapping + + def parse_lookup_result(self, lookup_result: dict): + feature = self.mapping.source_mapping( + self.misp_attribute.value.split('-')[0].lower() + ) + getattr(self, feature)(lookup_result) + + def _parse_aliases(self, *aliases: tuple) -> Iterator[str]: + for alias in aliases: + query = requests.get(f"{self.api_url}/api/vulnerability/{alias}") + if query.status_code != 200: + self.errors.append( + f'Unable to query related vulnerability id {alias}' + ) + continue + vulnerability = query.json() + if not vulnerability: + self.errors.append( + f'No results for related vulnerability id{alias}' + ) + continue + feature = self.mapping.source_mapping(alias.split('-')[0].lower()) + yield getattr(self, feature)(vulnerability) + + def _parse_csaf_branch(self, branch: list) -> Iterator[str]: + for sub_branch in branch: + if sub_branch.get('branches'): + yield from self._parse_csaf_branch(sub_branch['branches']) + else: + cpe = sub_branch.get('product', {}).get('product_identification_helper', {}).get('cpe') + if cpe is not None: + yield cpe + + def _parse_csaf_description(self, lookup_result: dict) -> str: + description = lookup_result['document'] + + tracking = description['tracking'] + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.csaf_mapping().items(): + misp_object.add_attribute(relation, tracking[field]) + misp_object.add_attribute('summary', description['title']) + for reference in description.get('references', []): + misp_object.add_attribute('references', reference['url']) + misp_object.add_attribute('credit', description['publisher']['name']) + branches = lookup_result.get('product_tree', {}).get('branches', []) + if branches: + for cpe in set(self._parse_csaf_branch(branches)): + misp_object.add_attribute('vulnerable-configuration', cpe) + misp_object.add_reference(self.misp_attribute.uuid, 'describes') + vulnerability_object = self.misp_event.add_object(misp_object) + + for vulnerability in lookup_result['vulnerabilities']: + related = MISPObject('vulnerability') + for field, relation in self.mapping.related_vuln_mapping().items(): + if vulnerability.get(field): + related.add_attribute(relation, vulnerability[field]) + for score in vulnerability.get('scores', []): + cvss_v3 = score['cvss_v3'] + related.add_attribute('cvss-score', cvss_v3['baseScore']) + related.add_attribute('cvss-string', cvss_v3['vectorString']) + for reference in vulnerability.get('references', []): + related.add_attribute('references', reference['url']) + related.add_reference(vulnerability_object.uuid, 'related-to') + related_vulnerability = self.misp_event.add_object(related) + if vulnerability.get('cwe'): + cwe = vulnerability['cwe'] + weakness = MISPObject('weakness') + for field, value in cwe.items(): + weakness.add_attribute(field, value) + self.misp_event.add_object(weakness) + related_vulnerability.add_reference( + weakness.uuid, 'weakened-by' + ) + + return vulnerability_object.uuid + + def _parse_cve_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + cveMetaData = lookup_result['cveMetadata'] + for field, relation in self.mapping.cve_mapping().items(): + misp_object.add_attribute(relation, cveMetaData[field]) + containers = lookup_result['containers'] + for reference in containers.get('cna', {}).get('references', []): + misp_object.add_attribute('references', reference['url']) + for adp in containers.get('adp', []): + for affected in adp.get('affected', []): + for cpe in affected.get('cpes', []): + misp_object.add_attribute('vulnerable-configuration', cpe) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + return vulnerability_object.uuid + + def _parse_cve_related_description(self, cve_description: dict) -> str: + misp_object = MISPObject('vulnerability') + misp_object.add_attribute( + 'id', cve_description['CVE_data_meta']['ID'] + ) + misp_object.add_attribute( + 'description', + cve_description['description']['description_data'][0]['value'] + ) + for cvss in cve_description.get('impact', {}).get('cvss', []): + misp_object.add_attribute('cvss-score', cvss['baseScore']) + misp_object.add_attribute('cvss-string', cvss['vectorString']) + for reference in misp_object.get('references', {}).get('reference_data', []): + misp_object.add_attribute('references', reference['url']) + return self.misp_event.add_object(misp_object).uuid + + def _parse_gsd_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + gsd_details = lookup_result['gsd']['osvSchema'] + for field, relation in self.mapping.gsd_mapping().items(): + if gsd_details.get(field): + misp_object.add_attribute(relation, gsd_details[field]) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + + for field, values in lookup_result['namespaces'].items(): + if field == 'cve.org': + vulnerability_object.add_reference( + self._parse_cve_related_description(values), 'related-to' + ) + continue + if field == 'nvd.nist.gov' and values.get('cve'): + vulnerability_object.add_reference( + self._parse_nvd_related_description(values['cve']), + 'related-to' + ) + + return vulnerability_object.uuid + + def _parse_jvn_description(self, lookup_result: dict) -> str: + vulnerability = MISPObject('vulnerability') + for field, relation in self.mapping.jvn_mapping().items(): + vulnerability.add_attribute(relation, lookup_result[field]) + for cpe in lookup_result.get('sec:cpe', []): + cpe_value = cpe.get('#text') + if cpe_value is not None: + vulnerability.add_attribute('vulnerable-configuration', cpe_value) + misp_object = self.misp_event.add_object(vulnerability) + for reference in lookup_result.get('sec:references', []): + source = reference.get('@source') + if source is None and reference.get('@id', '').startswith('CWE-'): + title = reference.get('@title') + if title is not None: + weakness = MISPObject('weakness') + weakness.add_attribute('id', reference['@id']) + weakness.add_attribute('description', title) + misp_object.add_reference( + self.misp_event.add_object(weakness).uuid, 'weakened-by' + ) + else: + misp_object.add_reference( + self.misp_event.add_attribute( + type='weakness', value=reference['@id']).uuid, + 'weakened-by' + ) + continue + if source == 'JVN': + misp_object.add_attribute('references', reference['#text']) + elif source == 'CVE': + for referenced_uuid in self._parse_aliases(reference['@id']): + misp_object.add_reference(referenced_uuid, 'related-to') + return misp_object.uuid + + def _parse_nvd_related_description(self, nvd_description: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.nvd_mapping().items(): + misp_object.add_attribute(relation, nvd_description[field]) + misp_object.add_attribute( + 'description', nvd_description['descriptions'][0]['value'] + ) + for cvss in nvd_description.get('metrics', {}).get('cvssMetricV31', []): + misp_object.add_attribute( + 'cvss-score', cvss['cvssData']['baseScore'] + ) + misp_object.add_attribute( + 'cvss-string', cvss['cvssData']['vectorString'] + ) + for reference in nvd_description.get('references', []): + misp_object.add_attribute('references', reference['url']) + return self.misp_event.add_object(misp_object).uuid + + def _parse_ossf_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.ossf_mapping().items(): + misp_object.add_attribute(relation, lookup_result[field]) + for reference in lookup_result['references']: + misp_object.add_attribute('references', reference['url']) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + for affected in lookup_result.get('affected', []): + for cwe in affected.get('database_specific', {}).get('cwes', []): + cwe_id = cwe.get('cweId') + if cwe_id is not None: + weakness = MISPObject('weakness') + for field, relation in self.mapping.cwe_mapping().items(): + if cwe.get(field): + weakness.add_attribute(relation, cwe[field]) + self.misp_event.add_object(weakness) + vulnerability_object.add_reference( + weakness.uuid, 'weakened-by' + ) + + if lookup_result.get('aliases'): + for vuln_uuid in self._parse_aliases(*lookup_result['aliases']): + vulnerability_object.add_reference(vuln_uuid, 'related-to') + + return vulnerability_object.uuid + + def _parse_standard_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.standard_mapping().items(): + misp_object.add_attribute(relation, lookup_result[field]) + for cvss in lookup_result.get('severity', []): + misp_object.add_attribute('cvss-string', cvss['score']) + for reference in lookup_result['references']: + misp_object.add_attribute('references', reference['url']) + for cwe_id in lookup_result.get('database_specific', {}).get('cwe_ids', []): + attribute = self.misp_event.add_attribute( + type='weakness', value=cwe_id + ) + misp_object.add_reference(attribute.uuid, 'weakened-by') + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + vulnerability_object = self.misp_event.add_object(misp_object) + + if lookup_result.get('aliases'): + for vuln_uuid in self._parse_aliases(*lookup_result['aliases']): + vulnerability_object.add_reference(vuln_uuid, 'related-to') + + return vulnerability_object.uuid + + def _parse_tailscale_description(self, lookup_result: dict) -> str: + misp_object = MISPObject('vulnerability') + for field, relation in self.mapping.tailscale_mapping().items(): + misp_object.add_attribute(relation, lookup_result[field]) + misp_object.add_reference(self.misp_attribute.uuid, 'related-to') + self.misp_event.add_object(misp_object) diff --git a/misp_modules/modules/expansion/vulnerability_lookup.py b/misp_modules/modules/expansion/vulnerability_lookup.py index 0527b08e..f457883d 100644 --- a/misp_modules/modules/expansion/vulnerability_lookup.py +++ b/misp_modules/modules/expansion/vulnerability_lookup.py @@ -1,15 +1,12 @@ import json import requests from . import check_input_attribute, standard_error_message -from ._vulnerability_parser.vulnerability_parser import ( - VulnerabilityMapping, VulnerabilityParser) -from pymisp import MISPObject -from typing import Iterator +from ._vulnerability_parser.vulnerability_parser import VulnerabilityLookupParser misperrors = {'error': 'Error'} mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'} moduleinfo = { - 'version': '1', + 'version': '2', 'author': 'Christian Studer', 'description': 'An expansion module to query Vulnerability Lookup', 'module-type': ['expansion', 'hover'], @@ -21,384 +18,7 @@ moduleinfo = { 'input': 'Vulnerability Attribute', 'output': 'Additional information on the vulnerability, gathered from the Vulnerability Lookup API.', } -api_url = 'https://vulnerability.circl.lu/api' - - -class VulnerabilityLookupMapping(VulnerabilityMapping): - __csaf_mapping = { - 'id': 'id', - 'initial_release_date': 'published', - 'current_release_date': 'modified' - } - __cve_mapping = { - 'cveId': 'id', - 'datePublished': 'published', - 'dateUpdated': 'modified', - 'state': 'state' - } - __cwe_mapping = { - 'cweId': 'id', - 'description': 'description', - 'name': 'name' - } - __gsd_mapping = { - 'id': 'id', - 'details': 'description', - 'modified': 'modified' - } - __jvn_mapping = { - 'sec:identifier': 'id', - 'description': 'description', - 'title': 'summary', - 'link': 'references', - 'dcterms:issued': 'published', - 'dcterms:modified': 'modified' - } - __nvd_mapping = { - 'id': 'id', - 'published': 'published', - 'lastModified': 'modified' - } - __ossf_mapping = { - 'id': 'id', - 'summary': 'summary', - 'details': 'description', - 'published': 'published', - 'modified': 'modified' - } - __related_vuln_mapping = { - 'cve': 'id', - 'title': 'summary', - 'discovery_date': 'published' - } - __source_mapping = { - 'cve': '_parse_cve_description', - 'ghsa': '_parse_standard_description', - 'gsd': '_parse_gsd_description', - 'jvndb': '_parse_jvn_description', - 'mal': '_parse_ossf_description', - 'pysec': '_parse_standard_description', - 'ts': '_parse_tailscale_description', - 'var': '_parse_variot_description' - } - __source_mapping.update( - dict.fromkeys( - ( - 'cisco', 'icsa', 'icsma', 'ncsc', 'nn', 'oxas', - 'rhba', 'rhea', 'rhsa', 'sca', 'ssa', 'va', 'wid' - ), - '_parse_csaf_description' - ) - ) - __standard_mapping = { - 'id': 'id', - 'details': 'description', - 'published': 'published', - 'modified': 'modified' - } - __tailscale_mapping = { - 'title': 'id', - 'link': 'references', - 'summary': 'summary', - 'published': 'published' - } - - @classmethod - def csaf_mapping(cls) -> dict: - return cls.__csaf_mapping - - @classmethod - def cve_mapping(cls) -> dict: - return cls.__cve_mapping - - @classmethod - def cwe_mapping(cls) -> dict: - return cls.__cwe_mapping - - @classmethod - def gsd_mapping(cls) -> dict: - return cls.__gsd_mapping - - @classmethod - def jvn_mapping(cls) -> dict: - return cls.__jvn_mapping - - @classmethod - def nvd_mapping(cls) -> dict: - return cls.__nvd_mapping - - @classmethod - def ossf_mapping(cls) -> dict: - return cls.__ossf_mapping - - @classmethod - def related_vuln_mapping(cls) -> dict: - return cls.__related_vuln_mapping - - @classmethod - def source_mapping(cls, field: str) -> str: - return cls.__source_mapping.get(field) - - @classmethod - def standard_mapping(cls) -> dict: - return cls.__standard_mapping - - @classmethod - def tailscale_mapping(cls) -> dict: - return cls.__tailscale_mapping - - -class VulnerabilityLookupParser(VulnerabilityParser): - def __init__(self, attribute: dict): - super().__init__(attribute) - self.__mapping = VulnerabilityLookupMapping - self.__errors = [] - - @property - def errors(self) -> list: - return self.__errors - - @property - def mapping(self) -> VulnerabilityLookupMapping: - return self.__mapping - - def parse_lookup_result(self, lookup_result: dict): - feature = self.mapping.source_mapping( - self.misp_attribute.value.split('-')[0].lower() - ) - getattr(self, feature)(lookup_result) - - def _parse_aliases(self, *aliases: tuple) -> Iterator[str]: - for alias in aliases: - query = requests.get(f"{api_url}/vulnerability/{alias}") - if query.status_code != 200: - self.errors.append( - f'Unable to query related vulnerability id {alias}' - ) - continue - vulnerability = query.json() - if not vulnerability: - self.errors.append( - f'No results for related vulnerability id{alias}' - ) - continue - feature = self.mapping.source_mapping(alias.split('-')[0].lower()) - yield getattr(self, feature)(vulnerability) - - def _parse_csaf_branch(self, branch: list) -> Iterator[str]: - for sub_branch in branch: - if sub_branch.get('branches'): - yield from self._parse_csaf_branch(sub_branch['branches']) - else: - cpe = sub_branch.get('product', {}).get('product_identification_helper', {}).get('cpe') - if cpe is not None: - yield cpe - - def _parse_csaf_description(self, lookup_result: dict) -> str: - description = lookup_result['document'] - - tracking = description['tracking'] - misp_object = MISPObject('vulnerability') - for field, relation in self.mapping.csaf_mapping().items(): - misp_object.add_attribute(relation, tracking[field]) - misp_object.add_attribute('summary', description['title']) - for reference in description.get('references', []): - misp_object.add_attribute('references', reference['url']) - misp_object.add_attribute('credit', description['publisher']['name']) - branches = lookup_result.get('product_tree', {}).get('branches', []) - if branches: - for cpe in set(self._parse_csaf_branch(branches)): - misp_object.add_attribute('vulnerable-configuration', cpe) - misp_object.add_reference(self.misp_attribute.uuid, 'describes') - vulnerability_object = self.misp_event.add_object(misp_object) - - for vulnerability in lookup_result['vulnerabilities']: - related = MISPObject('vulnerability') - for field, relation in self.mapping.related_vuln_mapping().items(): - if vulnerability.get(field): - related.add_attribute(relation, vulnerability[field]) - for score in vulnerability.get('scores', []): - cvss_v3 = score['cvss_v3'] - related.add_attribute('cvss-score', cvss_v3['baseScore']) - related.add_attribute('cvss-string', cvss_v3['vectorString']) - for reference in vulnerability.get('references', []): - related.add_attribute('references', reference['url']) - related.add_reference(vulnerability_object.uuid, 'related-to') - related_vulnerability = self.misp_event.add_object(related) - if vulnerability.get('cwe'): - cwe = vulnerability['cwe'] - weakness = MISPObject('weakness') - for field, value in cwe.items(): - weakness.add_attribute(field, value) - self.misp_event.add_object(weakness) - related_vulnerability.add_reference( - weakness.uuid, 'weakened-by' - ) - - return vulnerability_object.uuid - - def _parse_cve_description(self, lookup_result: dict) -> str: - misp_object = MISPObject('vulnerability') - cveMetaData = lookup_result['cveMetadata'] - for field, relation in self.mapping.cve_mapping().items(): - misp_object.add_attribute(relation, cveMetaData[field]) - containers = lookup_result['containers'] - for reference in containers.get('cna', {}).get('references', []): - misp_object.add_attribute('references', reference['url']) - for adp in containers.get('adp', []): - for affected in adp.get('affected', []): - for cpe in affected.get('cpes', []): - misp_object.add_attribute('vulnerable-configuration', cpe) - misp_object.add_reference(self.misp_attribute.uuid, 'related-to') - vulnerability_object = self.misp_event.add_object(misp_object) - return vulnerability_object.uuid - - def _parse_cve_related_description(self, cve_description: dict) -> str: - misp_object = MISPObject('vulnerability') - misp_object.add_attribute( - 'id', cve_description['CVE_data_meta']['ID'] - ) - misp_object.add_attribute( - 'description', - cve_description['description']['description_data'][0]['value'] - ) - for cvss in cve_description.get('impact', {}).get('cvss', []): - misp_object.add_attribute('cvss-score', cvss['baseScore']) - misp_object.add_attribute('cvss-string', cvss['vectorString']) - for reference in misp_object.get('references', {}).get('reference_data', []): - misp_object.add_attribute('references', reference['url']) - return self.misp_event.add_object(misp_object).uuid - - def _parse_gsd_description(self, lookup_result: dict) -> str: - misp_object = MISPObject('vulnerability') - gsd_details = lookup_result['gsd']['osvSchema'] - for field, relation in self.mapping.gsd_mapping().items(): - if gsd_details.get(field): - misp_object.add_attribute(relation, gsd_details[field]) - misp_object.add_reference(self.misp_attribute.uuid, 'related-to') - vulnerability_object = self.misp_event.add_object(misp_object) - - for field, values in lookup_result['namespaces'].items(): - if field == 'cve.org': - vulnerability_object.add_reference( - self._parse_cve_related_description(values), 'related-to' - ) - continue - if field == 'nvd.nist.gov' and values.get('cve'): - vulnerability_object.add_reference( - self._parse_nvd_related_description(values['cve']), - 'related-to' - ) - - return vulnerability_object.uuid - - def _parse_jvn_description(self, lookup_result: dict) -> str: - vulnerability = MISPObject('vulnerability') - for field, relation in self.mapping.jvn_mapping().items(): - vulnerability.add_attribute(relation, lookup_result[field]) - for cpe in lookup_result.get('sec:cpe', []): - cpe_value = cpe.get('#text') - if cpe_value is not None: - vulnerability.add_attribute('vulnerable-configuration', cpe_value) - misp_object = self.misp_event.add_object(vulnerability) - for reference in lookup_result.get('sec:references', []): - source = reference.get('@source') - if source is None and reference.get('@id', '').startswith('CWE-'): - title = reference.get('@title') - if title is not None: - weakness = MISPObject('weakness') - weakness.add_attribute('id', reference['@id']) - weakness.add_attribute('description', title) - misp_object.add_reference( - self.misp_event.add_object(weakness).uuid, 'weakened-by' - ) - else: - misp_object.add_reference( - self.misp_event.add_attribute( - type='weakness', value=reference['@id']).uuid, - 'weakened-by' - ) - continue - if source == 'JVN': - misp_object.add_attribute('references', reference['#text']) - elif source == 'CVE': - for referenced_uuid in self._parse_aliases(reference['@id']): - misp_object.add_reference(referenced_uuid, 'related-to') - return misp_object.uuid - - def _parse_nvd_related_description(self, nvd_description: dict) -> str: - misp_object = MISPObject('vulnerability') - for field, relation in self.mapping.nvd_mapping().items(): - misp_object.add_attribute(relation, nvd_description[field]) - misp_object.add_attribute( - 'description', nvd_description['descriptions'][0]['value'] - ) - for cvss in nvd_description.get('metrics', {}).get('cvssMetricV31', []): - misp_object.add_attribute( - 'cvss-score', cvss['cvssData']['baseScore'] - ) - misp_object.add_attribute( - 'cvss-string', cvss['cvssData']['vectorString'] - ) - for reference in nvd_description.get('references', []): - misp_object.add_attribute('references', reference['url']) - return self.misp_event.add_object(misp_object).uuid - - def _parse_ossf_description(self, lookup_result: dict) -> str: - misp_object = MISPObject('vulnerability') - for field, relation in self.mapping.ossf_mapping().items(): - misp_object.add_attribute(relation, lookup_result[field]) - for reference in lookup_result['references']: - misp_object.add_attribute('references', reference['url']) - misp_object.add_reference(self.misp_attribute.uuid, 'related-to') - vulnerability_object = self.misp_event.add_object(misp_object) - for affected in lookup_result.get('affected', []): - for cwe in affected.get('database_specific', {}).get('cwes', []): - cwe_id = cwe.get('cweId') - if cwe_id is not None: - weakness = MISPObject('weakness') - for field, relation in self.mapping.cwe_mapping().items(): - if cwe.get(field): - weakness.add_attribute(relation, cwe[field]) - self.misp_event.add_object(weakness) - vulnerability_object.add_reference( - weakness.uuid, 'weakened-by' - ) - - if lookup_result.get('aliases'): - for vuln_uuid in self._parse_aliases(*lookup_result['aliases']): - vulnerability_object.add_reference(vuln_uuid, 'related-to') - - return vulnerability_object.uuid - - def _parse_standard_description(self, lookup_result: dict) -> str: - misp_object = MISPObject('vulnerability') - for field, relation in self.mapping.standard_mapping().items(): - misp_object.add_attribute(relation, lookup_result[field]) - for cvss in lookup_result.get('severity', []): - misp_object.add_attribute('cvss-string', cvss['score']) - for reference in lookup_result['references']: - misp_object.add_attribute('references', reference['url']) - for cwe_id in lookup_result.get('database_specific', {}).get('cwe_ids', []): - attribute = self.misp_event.add_attribute( - type='weakness', value=cwe_id - ) - misp_object.add_reference(attribute.uuid, 'weakened-by') - misp_object.add_reference(self.misp_attribute.uuid, 'related-to') - vulnerability_object = self.misp_event.add_object(misp_object) - - if lookup_result.get('aliases'): - for vuln_uuid in self._parse_aliases(*lookup_result['aliases']): - vulnerability_object.add_reference(vuln_uuid, 'related-to') - - return vulnerability_object.uuid - - def _parse_tailscale_description(self, lookup_result: dict) -> str: - misp_object = MISPObject('vulnerability') - for field, relation in self.mapping.tailscale_mapping().items(): - misp_object.add_attribute(relation, lookup_result[field]) - misp_object.add_reference(self.misp_attribute.uuid, 'related-to') - self.misp_event.add_object(misp_object) +api_url = 'https://vulnerability.circl.lu' def handler(q=False): @@ -414,7 +34,7 @@ def handler(q=False): if attribute.get('type') != 'vulnerability': misperrors['error'] = 'Vulnerability ID missing' return misperrors - lookup = requests.get(f"{api_url}/vulnerability/{attribute['value']}") + lookup = requests.get(f"{api_url}/api/vulnerability/{attribute['value']}") if lookup.status_code == 200: vulnerability = lookup.json() if not vulnerability: @@ -424,7 +44,7 @@ def handler(q=False): misperrors['error'] = 'Vulnerability Lookup API not accessible.' return misperrors parser = VulnerabilityLookupParser(attribute) - parser.parse_lookup_result(vulnerability) + parser.parse_lookup_result(vulnerability, api_url) return parser.get_results()