misp-modules/misp_modules/modules/expansion/_vulnerability_parser/vulnerability_parser.py

119 lines
4.4 KiB
Python

import json
from pymisp import MISPAttribute, MISPEvent, MISPObject
class VulnerabilityMapping:
__variot_data_mapping = {
'credits': 'credit',
'description': 'description',
'title': 'summary'
}
__variot_flat_mapping = {
'cve': 'id', 'id': 'id'
}
@classmethod
def exploit_mapping(cls) -> dict:
return cls.__exploit_mapping
@classmethod
def exploit_multiple_mapping(cls) -> dict:
return cls.__exploit_multiple_mapping
@classmethod
def variot_data_mapping(cls) -> dict:
return cls.__variot_data_mapping
@classmethod
def variot_flat_mapping(cls) -> dict:
return cls.__variot_flat_mapping
class VulnerabilityParser:
def __init__(self, attribute: dict):
misp_attribute = MISPAttribute()
misp_attribute.from_dict(**attribute)
misp_event = MISPEvent()
misp_event.add_attribute(**misp_attribute)
self.__misp_attribute = misp_attribute
self.__misp_event = misp_event
@property
def misp_attribute(self):
return self.__misp_attribute
@property
def misp_event(self):
return self.__misp_event
def get_results(self) -> dict:
event = json.loads(self.misp_event.to_json())
return {
'results': {
key: value for key, value in event.items()
if key in ('Attribute', 'Object')
}
}
def _parse_variot_description(self, query_results):
vulnerability_object = MISPObject('vulnerability')
for field, relation in self.mapping.variot_flat_mapping().items():
if query_results.get(field):
vulnerability_object.add_attribute(
relation, query_results[field]
)
for field, relation in self.mapping.variot_data_mapping().items():
if query_results.get(field, {}).get('data'):
vulnerability_object.add_attribute(
relation, query_results[field]['data']
)
if query_results.get('configurations', {}).get('data'):
for configuration in query_results['configurations']['data']:
for node in configuration['nodes']:
for cpe_match in node['cpe_match']:
if cpe_match['vulnerable']:
vulnerability_object.add_attribute(
'vulnerable-configuration',
cpe_match['cpe23Uri']
)
if query_results.get('cvss', {}).get('data'):
cvss = {}
for cvss_data in query_results['cvss']['data']:
for cvss_v3 in cvss_data['cvssV3']:
cvss[float(cvss_v3['trust'])] = cvss_v3
if cvss:
cvss = cvss[max(cvss)]
vulnerability_object.add_attribute(
'cvss-score', cvss['baseScore']
)
vulnerability_object.add_attribute(
'cvss-string', cvss['vectorString']
)
if query_results.get('references', {}).get('data'):
for reference in query_results['references']['data']:
vulnerability_object.add_attribute(
'references', reference['url']
)
if query_results.get('sources_release_date', {}).get('data'):
for release_date in query_results['sources_release_date']['data']:
if release_date['db'] != 'NVD':
continue
if release_date['id'] == self.misp_attribute.value:
vulnerability_object.add_attribute(
'published', release_date['date']
)
break
if query_results.get('sources_update_date', {}).get('data'):
for update_date in query_results['sources_update_date']['data']:
if update_date['db'] != 'NVD':
continue
if update_date['id'] == self.misp_attribute.value:
vulnerability_object.add_attribute(
'modified', update_date['date']
)
break
vulnerability_object.add_reference(
self.misp_attribute.uuid, 'related-to'
)
self.misp_event.add_object(vulnerability_object)