mirror of https://github.com/MISP/misp-modules
119 lines
4.4 KiB
Python
119 lines
4.4 KiB
Python
import json
|
|
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
|
|
|
|
|
class VulnerabilityMapping:
|
|
__variot_data_mapping = {
|
|
'credits': 'credit',
|
|
'description': 'description',
|
|
'title': 'summary'
|
|
}
|
|
__variot_flat_mapping = {
|
|
'cve': 'id', 'id': 'id'
|
|
}
|
|
|
|
@classmethod
|
|
def exploit_mapping(cls) -> dict:
|
|
return cls.__exploit_mapping
|
|
|
|
@classmethod
|
|
def exploit_multiple_mapping(cls) -> dict:
|
|
return cls.__exploit_multiple_mapping
|
|
|
|
@classmethod
|
|
def variot_data_mapping(cls) -> dict:
|
|
return cls.__variot_data_mapping
|
|
|
|
@classmethod
|
|
def variot_flat_mapping(cls) -> dict:
|
|
return cls.__variot_flat_mapping
|
|
|
|
|
|
class VulnerabilityParser:
|
|
def __init__(self, attribute: dict):
|
|
misp_attribute = MISPAttribute()
|
|
misp_attribute.from_dict(**attribute)
|
|
misp_event = MISPEvent()
|
|
misp_event.add_attribute(**misp_attribute)
|
|
self.__misp_attribute = misp_attribute
|
|
self.__misp_event = misp_event
|
|
|
|
@property
|
|
def misp_attribute(self):
|
|
return self.__misp_attribute
|
|
|
|
@property
|
|
def misp_event(self):
|
|
return self.__misp_event
|
|
|
|
def get_results(self) -> dict:
|
|
event = json.loads(self.misp_event.to_json())
|
|
return {
|
|
'results': {
|
|
key: value for key, value in event.items()
|
|
if key in ('Attribute', 'Object')
|
|
}
|
|
}
|
|
|
|
def _parse_variot_description(self, query_results):
|
|
vulnerability_object = MISPObject('vulnerability')
|
|
for field, relation in self.mapping.variot_flat_mapping().items():
|
|
if query_results.get(field):
|
|
vulnerability_object.add_attribute(
|
|
relation, query_results[field]
|
|
)
|
|
for field, relation in self.mapping.variot_data_mapping().items():
|
|
if query_results.get(field, {}).get('data'):
|
|
vulnerability_object.add_attribute(
|
|
relation, query_results[field]['data']
|
|
)
|
|
if query_results.get('configurations', {}).get('data'):
|
|
for configuration in query_results['configurations']['data']:
|
|
for node in configuration['nodes']:
|
|
for cpe_match in node['cpe_match']:
|
|
if cpe_match['vulnerable']:
|
|
vulnerability_object.add_attribute(
|
|
'vulnerable-configuration',
|
|
cpe_match['cpe23Uri']
|
|
)
|
|
if query_results.get('cvss', {}).get('data'):
|
|
cvss = {}
|
|
for cvss_data in query_results['cvss']['data']:
|
|
for cvss_v3 in cvss_data['cvssV3']:
|
|
cvss[float(cvss_v3['trust'])] = cvss_v3
|
|
if cvss:
|
|
cvss = cvss[max(cvss)]
|
|
vulnerability_object.add_attribute(
|
|
'cvss-score', cvss['baseScore']
|
|
)
|
|
vulnerability_object.add_attribute(
|
|
'cvss-string', cvss['vectorString']
|
|
)
|
|
if query_results.get('references', {}).get('data'):
|
|
for reference in query_results['references']['data']:
|
|
vulnerability_object.add_attribute(
|
|
'references', reference['url']
|
|
)
|
|
if query_results.get('sources_release_date', {}).get('data'):
|
|
for release_date in query_results['sources_release_date']['data']:
|
|
if release_date['db'] != 'NVD':
|
|
continue
|
|
if release_date['id'] == self.misp_attribute.value:
|
|
vulnerability_object.add_attribute(
|
|
'published', release_date['date']
|
|
)
|
|
break
|
|
if query_results.get('sources_update_date', {}).get('data'):
|
|
for update_date in query_results['sources_update_date']['data']:
|
|
if update_date['db'] != 'NVD':
|
|
continue
|
|
if update_date['id'] == self.misp_attribute.value:
|
|
vulnerability_object.add_attribute(
|
|
'modified', update_date['date']
|
|
)
|
|
break
|
|
vulnerability_object.add_reference(
|
|
self.misp_attribute.uuid, 'related-to'
|
|
)
|
|
self.misp_event.add_object(vulnerability_object)
|