mirror of https://github.com/MISP/misp-modules
new: [vulnerability_lookup] New module to query Vulnerability Lookup
- Reusing the `variotdbs` code to parse the vulnerability description from VariotDBpull/670/head
parent
ea22f7bd9d
commit
42fb1bcf14
|
@ -21,7 +21,7 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c
|
|||
'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring',
|
||||
'clamav', 'jinja_template_rendering','hyasinsight', 'variotdbs', 'crowdsec',
|
||||
'extract_url_components', 'ipinfo', 'whoisfreaks', 'ip2locationio', 'vysion', 'stairwell',
|
||||
'google_threat_intelligence']
|
||||
'google_threat_intelligence', 'vulnerability_lookup']
|
||||
|
||||
|
||||
minimum_required_fields = ('type', 'uuid', 'value')
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
import json
|
||||
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
||||
|
||||
|
||||
class VulnerabilityMapping:
|
||||
__variot_data_mapping = {
|
||||
'credits': 'credit',
|
||||
'description': 'description',
|
||||
'title': 'summary'
|
||||
}
|
||||
__variot_flat_mapping = {
|
||||
'cve': 'id', 'id': 'id'
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def exploit_mapping(cls) -> dict:
|
||||
return cls.__exploit_mapping
|
||||
|
||||
@classmethod
|
||||
def exploit_multiple_mapping(cls) -> dict:
|
||||
return cls.__exploit_multiple_mapping
|
||||
|
||||
@classmethod
|
||||
def variot_data_mapping(cls) -> dict:
|
||||
return cls.__variot_data_mapping
|
||||
|
||||
@classmethod
|
||||
def variot_flat_mapping(cls) -> dict:
|
||||
return cls.__variot_flat_mapping
|
||||
|
||||
|
||||
class VulnerabilityParser:
|
||||
def __init__(self, attribute: dict):
|
||||
misp_attribute = MISPAttribute()
|
||||
misp_attribute.from_dict(**attribute)
|
||||
misp_event = MISPEvent()
|
||||
misp_event.add_attribute(**misp_attribute)
|
||||
self.__misp_attribute = misp_attribute
|
||||
self.__misp_event = misp_event
|
||||
|
||||
@property
|
||||
def misp_attribute(self):
|
||||
return self.__misp_attribute
|
||||
|
||||
@property
|
||||
def misp_event(self):
|
||||
return self.__misp_event
|
||||
|
||||
def get_results(self) -> dict:
|
||||
event = json.loads(self.misp_event.to_json())
|
||||
return {
|
||||
'results': {
|
||||
key: value for key, value in event.items()
|
||||
if key in ('Attribute', 'Object')
|
||||
}
|
||||
}
|
||||
|
||||
def _parse_variot_description(self, query_results):
|
||||
vulnerability_object = MISPObject('vulnerability')
|
||||
for field, relation in self.mapping.variot_flat_mapping().items():
|
||||
if query_results.get(field):
|
||||
vulnerability_object.add_attribute(
|
||||
relation, query_results[field]
|
||||
)
|
||||
for field, relation in self.mapping.variot_data_mapping().items():
|
||||
if query_results.get(field, {}).get('data'):
|
||||
vulnerability_object.add_attribute(
|
||||
relation, query_results[field]['data']
|
||||
)
|
||||
if query_results.get('configurations', {}).get('data'):
|
||||
for configuration in query_results['configurations']['data']:
|
||||
for node in configuration['nodes']:
|
||||
for cpe_match in node['cpe_match']:
|
||||
if cpe_match['vulnerable']:
|
||||
vulnerability_object.add_attribute(
|
||||
'vulnerable-configuration',
|
||||
cpe_match['cpe23Uri']
|
||||
)
|
||||
if query_results.get('cvss', {}).get('data'):
|
||||
cvss = {}
|
||||
for cvss_data in query_results['cvss']['data']:
|
||||
for cvss_v3 in cvss_data['cvssV3']:
|
||||
cvss[float(cvss_v3['trust'])] = cvss_v3
|
||||
if cvss:
|
||||
cvss = cvss[max(cvss)]
|
||||
vulnerability_object.add_attribute(
|
||||
'cvss-score', cvss['baseScore']
|
||||
)
|
||||
vulnerability_object.add_attribute(
|
||||
'cvss-string', cvss['vectorString']
|
||||
)
|
||||
if query_results.get('references', {}).get('data'):
|
||||
for reference in query_results['references']['data']:
|
||||
vulnerability_object.add_attribute(
|
||||
'references', reference['url']
|
||||
)
|
||||
if query_results.get('sources_release_date', {}).get('data'):
|
||||
for release_date in query_results['sources_release_date']['data']:
|
||||
if release_date['db'] != 'NVD':
|
||||
continue
|
||||
if release_date['id'] == self.misp_attribute.value:
|
||||
vulnerability_object.add_attribute(
|
||||
'published', release_date['date']
|
||||
)
|
||||
break
|
||||
if query_results.get('sources_update_date', {}).get('data'):
|
||||
for update_date in query_results['sources_update_date']['data']:
|
||||
if update_date['db'] != 'NVD':
|
||||
continue
|
||||
if update_date['id'] == self.misp_attribute.value:
|
||||
vulnerability_object.add_attribute(
|
||||
'modified', update_date['date']
|
||||
)
|
||||
break
|
||||
vulnerability_object.add_reference(
|
||||
self.misp_attribute.uuid, 'related-to'
|
||||
)
|
||||
self.misp_event.add_object(vulnerability_object)
|
|
@ -1,7 +1,9 @@
|
|||
import json
|
||||
import requests
|
||||
from . import check_input_attribute, standard_error_message
|
||||
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
||||
from ._vulnerability_parser.vulnerability_parser import (
|
||||
VulnerabilityMapping, VulnerabilityParser)
|
||||
from pymisp import MISPObject
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'}
|
||||
|
@ -12,21 +14,14 @@ moduleconfig = ['API_key']
|
|||
variotdbs_url = 'https://www.variotdbs.pl/api'
|
||||
|
||||
|
||||
class VariotdbsParser:
|
||||
def __init__(self, attribute):
|
||||
misp_attribute = MISPAttribute()
|
||||
misp_attribute.from_dict(**attribute)
|
||||
misp_event = MISPEvent()
|
||||
misp_event.add_attribute(**misp_attribute)
|
||||
self.__misp_attribute = misp_attribute
|
||||
self.__misp_event = misp_event
|
||||
self.__exploit_mapping = {
|
||||
class VariotMapping(VulnerabilityMapping):
|
||||
__exploit_mapping = {
|
||||
'credits': 'credit',
|
||||
'description': 'description',
|
||||
'exploit': 'exploit',
|
||||
'title': 'title'
|
||||
}
|
||||
self.__exploit_multiple_mapping = {
|
||||
__exploit_multiple_mapping = {
|
||||
'cve': {
|
||||
'feature': 'cve_id',
|
||||
'relation': 'cve-id'
|
||||
|
@ -36,131 +31,43 @@ class VariotdbsParser:
|
|||
'relation': 'reference'
|
||||
}
|
||||
}
|
||||
self.__vulnerability_data_mapping = {
|
||||
'credits': 'credit',
|
||||
'description': 'description',
|
||||
'title': 'summary'
|
||||
}
|
||||
self.__vulnerability_flat_mapping = {
|
||||
'cve': 'id', 'id': 'id'
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def exploit_mapping(cls) -> dict:
|
||||
return cls.__exploit_mapping
|
||||
|
||||
@classmethod
|
||||
def exploit_multiple_mapping(cls) -> dict:
|
||||
return cls.__exploit_multiple_mapping
|
||||
|
||||
|
||||
class VariotdbsParser(VulnerabilityParser):
|
||||
def __init__(self, attribute):
|
||||
super().__init__(attribute)
|
||||
self.__mapping = VulnerabilityMapping
|
||||
|
||||
@property
|
||||
def exploit_mapping(self) -> dict:
|
||||
return self.__exploit_mapping
|
||||
|
||||
@property
|
||||
def exploit_multiple_mapping(self) -> dict:
|
||||
return self.__exploit_multiple_mapping
|
||||
|
||||
@property
|
||||
def misp_attribute(self) -> MISPAttribute:
|
||||
return self.__misp_attribute
|
||||
|
||||
@property
|
||||
def misp_event(self) -> MISPEvent:
|
||||
return self.__misp_event
|
||||
|
||||
@property
|
||||
def vulnerability_data_mapping(self) -> dict:
|
||||
return self.__vulnerability_data_mapping
|
||||
|
||||
@property
|
||||
def vulnerability_flat_mapping(self) -> dict:
|
||||
return self.__vulnerability_flat_mapping
|
||||
|
||||
def get_results(self):
|
||||
event = json.loads(self.misp_event.to_json())
|
||||
results = {key: event[key] for key in ('Attribute', 'Object') if event.get(key)}
|
||||
return {'results': results}
|
||||
def mapping(self) -> VulnerabilityMapping:
|
||||
return self.__mapping
|
||||
|
||||
def parse_exploit_information(self, query_results):
|
||||
for exploit in query_results:
|
||||
exploit_object = MISPObject('exploit')
|
||||
exploit_object.add_attribute('exploitdb-id', exploit['edb_id'])
|
||||
for feature, relation in self.exploit_mapping.items():
|
||||
if exploit.get(feature):
|
||||
for field, relation in self.mapping.exploit_mapping().items():
|
||||
if exploit.get(field):
|
||||
exploit_object.add_attribute(
|
||||
relation,
|
||||
exploit[feature]['data']
|
||||
relation, exploit[field]['data']
|
||||
)
|
||||
for feature, relation in self.exploit_multiple_mapping.items():
|
||||
if exploit.get(feature):
|
||||
for value in exploit[feature]['data']:
|
||||
for field, relation in self.mapping.exploit_multiple_mapping().items():
|
||||
if exploit.get(field):
|
||||
for value in exploit[field]['data']:
|
||||
exploit_object.add_attribute(
|
||||
relation['relation'],
|
||||
value[relation['feature']]
|
||||
relation['relation'], value[relation['feature']]
|
||||
)
|
||||
exploit_object.add_reference(self.misp_attribute.uuid, 'related-to')
|
||||
self.misp_event.add_object(exploit_object)
|
||||
|
||||
def parse_vulnerability_information(self, query_results):
|
||||
vulnerability_object = MISPObject('vulnerability')
|
||||
for feature, relation in self.vulnerability_flat_mapping.items():
|
||||
if query_results.get(feature):
|
||||
vulnerability_object.add_attribute(
|
||||
relation,
|
||||
query_results[feature]
|
||||
)
|
||||
for feature, relation in self.vulnerability_data_mapping.items():
|
||||
if query_results.get(feature, {}).get('data'):
|
||||
vulnerability_object.add_attribute(
|
||||
relation,
|
||||
query_results[feature]['data']
|
||||
)
|
||||
if query_results.get('configurations', {}).get('data'):
|
||||
for configuration in query_results['configurations']['data']:
|
||||
for node in configuration['nodes']:
|
||||
for cpe_match in node['cpe_match']:
|
||||
if cpe_match['vulnerable']:
|
||||
vulnerability_object.add_attribute(
|
||||
'vulnerable-configuration',
|
||||
cpe_match['cpe23Uri']
|
||||
)
|
||||
if query_results.get('cvss', {}).get('data'):
|
||||
cvss = {}
|
||||
for cvss_data in query_results['cvss']['data']:
|
||||
for cvss_v3 in cvss_data['cvssV3']:
|
||||
cvss[float(cvss_v3['trust'])] = cvss_v3
|
||||
if cvss:
|
||||
cvss = cvss[max(cvss)]
|
||||
vulnerability_object.add_attribute(
|
||||
'cvss-score',
|
||||
cvss['baseScore']
|
||||
)
|
||||
vulnerability_object.add_attribute(
|
||||
'cvss-string',
|
||||
cvss['vectorString']
|
||||
)
|
||||
if query_results.get('references', {}).get('data'):
|
||||
for reference in query_results['references']['data']:
|
||||
vulnerability_object.add_attribute(
|
||||
'references',
|
||||
reference['url']
|
||||
)
|
||||
if query_results.get('sources_release_date', {}).get('data'):
|
||||
for release_date in query_results['sources_release_date']['data']:
|
||||
if release_date['db'] != 'NVD':
|
||||
continue
|
||||
if release_date['id'] == self.misp_attribute.value:
|
||||
vulnerability_object.add_attribute(
|
||||
'published',
|
||||
release_date['date']
|
||||
)
|
||||
break
|
||||
if query_results.get('sources_update_date', {}).get('data'):
|
||||
for update_date in query_results['sources_update_date']['data']:
|
||||
if update_date['db'] != 'NVD':
|
||||
continue
|
||||
if update_date['id'] == self.misp_attribute.value:
|
||||
vulnerability_object.add_attribute(
|
||||
'modified',
|
||||
update_date['date']
|
||||
)
|
||||
break
|
||||
vulnerability_object.add_reference(self.misp_attribute.uuid, 'related-to')
|
||||
self.misp_event.add_object(vulnerability_object)
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
if q is False:
|
||||
|
@ -180,7 +87,7 @@ def handler(q=False):
|
|||
if r.status_code == 200:
|
||||
vulnerability_results = r.json()
|
||||
if vulnerability_results:
|
||||
parser.parse_vulnerability_information(vulnerability_results)
|
||||
parser._parse_variot_description(vulnerability_results)
|
||||
empty = False
|
||||
else:
|
||||
if r.reason != 'Not Found':
|
||||
|
|
|
@ -0,0 +1,317 @@
|
|||
import json
|
||||
import requests
|
||||
from . import check_input_attribute, standard_error_message
|
||||
from ._vulnerability_parser.vulnerability_parser import (
|
||||
VulnerabilityMapping, VulnerabilityParser)
|
||||
from collections.abc import Iterator
|
||||
from pymisp import MISPObject
|
||||
|
||||
misperrors = {'error': 'Error'}
|
||||
mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'}
|
||||
moduleinfo = {
|
||||
'version': '1', 'author': 'Christian Studer',
|
||||
'description': 'An expansion module to query Vulnerability Lookup',
|
||||
'module-type': ['expansion', 'hover']
|
||||
}
|
||||
api_url = 'https://vulnerability.circl.lu'
|
||||
|
||||
|
||||
class VulnerabilityLookupMapping(VulnerabilityMapping):
|
||||
__csaf_mapping = {
|
||||
'id': 'id',
|
||||
'initial_release_date': 'published',
|
||||
'current_release_date': 'modified'
|
||||
}
|
||||
__cve_mapping = {
|
||||
'cveId': 'id',
|
||||
'datePublished': 'published',
|
||||
'dateUpdated': 'modified',
|
||||
'state': 'state'
|
||||
}
|
||||
__gsd_mapping = {
|
||||
'id': 'id',
|
||||
'details': 'description',
|
||||
'modified': 'modified'
|
||||
}
|
||||
__nvd_mapping = {
|
||||
'id': 'id',
|
||||
'published': 'published',
|
||||
'lastModified': 'modified'
|
||||
}
|
||||
__ossf_mapping = {
|
||||
'id': 'id',
|
||||
'summary': 'summary',
|
||||
'details': 'description',
|
||||
'published': 'published',
|
||||
'modified': 'modified'
|
||||
}
|
||||
__related_vuln_mapping = {
|
||||
'cve': 'id',
|
||||
'title': 'summary',
|
||||
'discovery_date': 'published'
|
||||
}
|
||||
__source_mapping = {
|
||||
'cve': '_parse_cve_description',
|
||||
'ghsa': '_parse_standard_description',
|
||||
'gsd': '_parse_gsd_description',
|
||||
'mal': '_parse_ossf_description',
|
||||
'pysec': '_parse_standard_description',
|
||||
'var': '_parse_variot_description'
|
||||
}
|
||||
__source_mapping.update(
|
||||
dict.fromkeys(
|
||||
(
|
||||
'cisco', 'icsa', 'icsma', 'nn', 'oxas',
|
||||
'rhba', 'rhea', 'rhsa', 'sca', 'ssa', 'wid'
|
||||
),
|
||||
'_parse_csaf_description'
|
||||
)
|
||||
)
|
||||
__standard_mapping = {
|
||||
'id': 'id',
|
||||
'details': 'description',
|
||||
'published': 'published',
|
||||
'modified': 'modified'
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def csaf_mapping(cls) -> dict:
|
||||
return cls.__csaf_mapping
|
||||
|
||||
@classmethod
|
||||
def cve_mapping(cls) -> dict:
|
||||
return cls.__cve_mapping
|
||||
|
||||
@classmethod
|
||||
def gsd_mapping(cls) -> dict:
|
||||
return cls.__gsd_mapping
|
||||
|
||||
@classmethod
|
||||
def nvd_mapping(cls) -> dict:
|
||||
return cls.__nvd_mapping
|
||||
|
||||
@classmethod
|
||||
def ossf_mapping(cls) -> dict:
|
||||
return cls.__ossf_mapping
|
||||
|
||||
@classmethod
|
||||
def related_vuln_mapping(cls) -> dict:
|
||||
return cls.__related_vuln_mapping
|
||||
|
||||
@classmethod
|
||||
def source_mapping(cls, field: str) -> str:
|
||||
return cls.__source_mapping.get(field)
|
||||
|
||||
@classmethod
|
||||
def standard_mapping(cls) -> dict:
|
||||
return cls.__standard_mapping
|
||||
|
||||
|
||||
class VulnerabilityLookupParser(VulnerabilityParser):
|
||||
def __init__(self, attribute: dict):
|
||||
super().__init__(attribute)
|
||||
self.__mapping = VulnerabilityLookupMapping
|
||||
self.__errors = []
|
||||
|
||||
@property
|
||||
def errors(self) -> list:
|
||||
return self.__errors
|
||||
|
||||
@property
|
||||
def mapping(self) -> VulnerabilityLookupMapping:
|
||||
return self.__mapping
|
||||
|
||||
def parse_lookup_result(self, lookup_result: dict):
|
||||
feature = self.mapping.source_mapping(
|
||||
self.misp_attribute.value.split('-')[0].lower()
|
||||
)
|
||||
getattr(self, feature)(lookup_result)
|
||||
|
||||
def _parse_aliases(self, aliases: list) -> Iterator[str]:
|
||||
for alias in aliases:
|
||||
query = requests.get(f"{api_url}/vulnerability/{alias}")
|
||||
if query.status_code != 200:
|
||||
self.errors.append(
|
||||
f'Unable to query related vulnerability id {alias}'
|
||||
)
|
||||
continue
|
||||
vulnerability = query.json()
|
||||
if not vulnerability:
|
||||
self.errors.append(
|
||||
f'No results for related vulnerability id{alias}'
|
||||
)
|
||||
continue
|
||||
feature = self.mapping.source_mapping(alias.split('-')[0].lower())
|
||||
yield getattr(self, feature)(vulnerability)
|
||||
|
||||
def _parse_csaf_description(self, lookup_result: dict) -> str:
|
||||
description = lookup_result['document']
|
||||
|
||||
tracking = description['tracking']
|
||||
misp_object = MISPObject('vulnerability')
|
||||
for field, relation in self.mapping.csaf_mapping().items():
|
||||
misp_object.add_attribute(relation, tracking[field])
|
||||
misp_object.add_attribute('summary', description['title'])
|
||||
for reference in description['references']:
|
||||
misp_object.add_attribute('references', reference['url'])
|
||||
misp_object.add_attribute('credit', description['publisher']['name'])
|
||||
misp_object.add_reference(self.misp_attribute.uuid, 'describes')
|
||||
vulnerability_object = self.misp_event.add_object(misp_object)
|
||||
|
||||
for vulnerability in lookup_result['vulnerabilities']:
|
||||
related = MISPObject('vulnerability')
|
||||
for field, relation in self.mapping.related_vuln_mapping().items():
|
||||
if vulnerability.get(field):
|
||||
related.add_attribute(relation, vulnerability[field])
|
||||
for score in vulnerability.get('scores', []):
|
||||
cvss_v3 = score['cvss_v3']
|
||||
related.add_attribute('cvss-score', cvss_v3['baseScore'])
|
||||
related.add_attribute('cvss-string', cvss_v3['vectorString'])
|
||||
for reference in vulnerability.get('references', []):
|
||||
related.add_attribute('references', reference['url'])
|
||||
related.add_reference(vulnerability_object.uuid, 'related-to')
|
||||
related_vulnerability = self.misp_event.add_object(related)
|
||||
if vulnerability.get('cwe'):
|
||||
cwe = vulnerability['cwe']
|
||||
weakness = MISPObject('weakness')
|
||||
for field, value in cwe.items():
|
||||
weakness.add_attribute(field, value)
|
||||
weakness.add_reference(related_vulnerability.uuid, 'leads-to')
|
||||
self.misp_event.add_object(weakness)
|
||||
|
||||
return vulnerability_object.uuid
|
||||
|
||||
def _parse_cve_description(self, lookup_result: dict) -> str:
|
||||
misp_object = MISPObject('vulnerability')
|
||||
cveMetaData = lookup_result['cveMetaData']
|
||||
for field, relation in self.mapping.cve_mapping().items():
|
||||
misp_object.add_attribute(relation, cveMetaData[field])
|
||||
for reference in lookup_result['containers']['cna']['references']:
|
||||
misp_object.add_attribute('references', reference['url'])
|
||||
misp_object.add_reference(self.misp_attribute.uuid, 'related-to')
|
||||
vulnerability_object = self.misp_event.add_object(misp_object)
|
||||
return vulnerability_object.uuid
|
||||
|
||||
def _parse_cve_related_description(self, cve_description: dict) -> str:
|
||||
misp_object = MISPObject('vulnerability')
|
||||
misp_object.add_attribute(
|
||||
'id', cve_description['CVE_data_meta']['ID']
|
||||
)
|
||||
misp_object.add_attribute(
|
||||
'description',
|
||||
cve_description['description']['description_data'][0]['value']
|
||||
)
|
||||
for cvss in cve_description.get('impact', {}).get('cvss', []):
|
||||
misp_object.add_attribute('cvss-score', cvss['baseScore'])
|
||||
misp_object.add_attribute('cvss-string', cvss['vectorString'])
|
||||
for reference in misp_object.get('references', {}).get('reference_data', []):
|
||||
misp_object.add_attribute('references', reference['url'])
|
||||
return self.misp_event.add_object(misp_object).uuid
|
||||
|
||||
def _parse_gsd_description(self, lookup_result: dict) -> str:
|
||||
misp_object = MISPObject('vulnerability')
|
||||
gsd_details = lookup_result['gsd']['osvSchema']
|
||||
for field, relation in self.mapping.gsd_mapping().items():
|
||||
misp_object.add_attribute(relation, gsd_details[field])
|
||||
misp_object.add_reference(self.misp_attribute.uuid, 'related-to')
|
||||
vulnerability_object = self.misp_event.add_object(misp_object)
|
||||
|
||||
for field, values in lookup_result['namespaces'].items():
|
||||
if field == 'cve.org':
|
||||
vulnerability_object.add_reference(
|
||||
self._parse_cve_related_description(values), 'related-to'
|
||||
)
|
||||
continue
|
||||
if field == 'nvd.nist.gov' and values.get('cve'):
|
||||
vulnerability_object.add_reference(
|
||||
self._parse_nvd_related_description(values['cve']),
|
||||
'related-to'
|
||||
)
|
||||
|
||||
return vulnerability_object.uuid
|
||||
|
||||
def _parse_nvd_related_description(self, nvd_description: dict) -> str:
|
||||
misp_object = MISPObject('vulnerability')
|
||||
for field, relation in self.mapping.nvd_mapping().items():
|
||||
misp_object.add_attribute(relation, nvd_description[field])
|
||||
misp_object.add_attribute(
|
||||
'description', nvd_description['descriptions'][0]['value']
|
||||
)
|
||||
for cvss in nvd_description.get('metrics', {}).get('cvssMetricV31', []):
|
||||
misp_object.add_attribute(
|
||||
'cvss-score', cvss['cvssData']['baseScore']
|
||||
)
|
||||
misp_object.add_attribute(
|
||||
'cvss-string', cvss['cvssData']['vectorString']
|
||||
)
|
||||
for reference in nvd_description.get('references', []):
|
||||
misp_object.add_attribute('references', reference['url'])
|
||||
return self.misp_event.add_object(misp_object).uuid
|
||||
|
||||
def _parse_ossf_description(self, lookup_result: dict) -> str:
|
||||
misp_object = MISPObject('vulnerability')
|
||||
for field, relation in self.mapping.ossf_mapping().items():
|
||||
misp_object.add_attribute(relation, lookup_result[field])
|
||||
for reference in lookup_result['references']:
|
||||
misp_object.add_attribute('references', reference['url'])
|
||||
misp_object.add_reference(self.misp_attribute.uuid, 'related-to')
|
||||
vulnerability_object = self.misp_event.add_object(misp_object)
|
||||
|
||||
if lookup_result.get('aliases'):
|
||||
for vuln_uuid in self._parse_aliases(lookup_result['aliases']):
|
||||
vulnerability_object.add_reference(vuln_uuid, 'related-to')
|
||||
|
||||
return vulnerability_object.uuid
|
||||
|
||||
def _parse_standard_description(self, lookup_result: dict) -> str:
|
||||
misp_object = MISPObject('vulnerability')
|
||||
for field, relation in self.mapping.standard_mapping().items():
|
||||
misp_object.add_attribute(relation, lookup_result[field])
|
||||
misp_object.add_attribute(
|
||||
'cvss-string', lookup_result['severity']['score']
|
||||
)
|
||||
for reference in lookup_result['references']:
|
||||
misp_object.add_attribute('references', reference['url'])
|
||||
misp_object.add_reference(self.misp_attribute.uuid, 'related-to')
|
||||
vulnerability_object = self.misp_event.add_object(misp_object)
|
||||
|
||||
if lookup_result.get('aliases'):
|
||||
for vuln_uuid in self._parse_aliases(lookup_result['aliases']):
|
||||
vulnerability_object.add_reference(vuln_uuid, 'related-to')
|
||||
|
||||
return vulnerability_object.uuid
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
if q is False:
|
||||
return q
|
||||
request = json.loads(q)
|
||||
if not check_input_attribute(request.get('attribute', {})):
|
||||
return {
|
||||
'error': f'{standard_error_message}, which should contain '
|
||||
'at least a type, a value and an UUID.'
|
||||
}
|
||||
attribute = request['attribute']
|
||||
if attribute.get('type') != 'vulnerability':
|
||||
misperrors['error'] = 'Vulnerability ID missing'
|
||||
return misperrors
|
||||
lookup = requests.get(f"{api_url}/vulnerability/{attribute['value']}")
|
||||
if lookup.status_code == 200:
|
||||
vulnerability = lookup.json()
|
||||
if not vulnerability:
|
||||
misperrors['error'] = 'Non existing Vulnerability ID.'
|
||||
return misperrors
|
||||
else:
|
||||
misperrors['error'] = 'Vulnerability Lookup API not accessible.'
|
||||
return misperrors
|
||||
parser = VulnerabilityLookupParser(attribute)
|
||||
parser.parse_lookup_result(vulnerability)
|
||||
return parser.get_results()
|
||||
|
||||
|
||||
def introspection():
|
||||
return mispattributes
|
||||
|
||||
|
||||
def version():
|
||||
return moduleinfo
|
Loading…
Reference in New Issue