mirror of https://github.com/MISP/misp-modules
Merge branch 'MISP:main' into main
commit
5daf7c6628
|
@ -34,7 +34,7 @@ MISP_ATTRIBUTES = {
|
|||
}
|
||||
|
||||
MODULE_INFO = {
|
||||
'version': '1',
|
||||
'version': '2',
|
||||
'author': 'Google Threat Intelligence team',
|
||||
'description': ('An expansion module to have the observable\'s threat'
|
||||
' score assessed by Google Threat Intelligence.'),
|
||||
|
@ -68,10 +68,23 @@ class GoogleThreatIntelligenceParser:
|
|||
'md5': self.parse_hash,
|
||||
'sha1': self.parse_hash,
|
||||
'sha256': self.parse_hash,
|
||||
'url': self.parse_url
|
||||
'url': self.parse_url,
|
||||
'ip-src|port': self.parse_ip_port,
|
||||
'ip-dst|port': self.parse_ip_port,
|
||||
}
|
||||
self.proxies = None
|
||||
|
||||
@staticmethod
|
||||
def get_total_analysis(analysis: dict,
|
||||
known_distributors: dict = None) -> int:
|
||||
"""Get total """
|
||||
if not analysis:
|
||||
return 0
|
||||
count = sum([analysis['undetected'],
|
||||
analysis['suspicious'],
|
||||
analysis['harmless']])
|
||||
return count if known_distributors else count + analysis['malicious']
|
||||
|
||||
def query_api(self, attribute: dict) -> None:
|
||||
"""Get data from the API and parse it."""
|
||||
self.attribute.from_dict(**attribute)
|
||||
|
@ -86,86 +99,205 @@ class GoogleThreatIntelligenceParser:
|
|||
}
|
||||
return {'results': results}
|
||||
|
||||
def create_gti_report_object(self, report):
|
||||
"""Create GTI report object."""
|
||||
report = report.to_dict()
|
||||
permalink = ('https://www.virustotal.com/gui/'
|
||||
f"{report['type']}/{report['id']}")
|
||||
report_object = pymisp.MISPObject('Google-Threat-Intel-report')
|
||||
report_object.add_attribute('permalink', type='link', value=permalink)
|
||||
report_object.add_attribute(
|
||||
'Threat Score', type='text',
|
||||
value=get_key(
|
||||
report, 'attributes.gti_assessment.threat_score.value'))
|
||||
report_object.add_attribute(
|
||||
'Verdict', type='text',
|
||||
value=get_key(
|
||||
report, 'attributes.gti_assessment.verdict.value').replace(
|
||||
|
||||
def add_gti_report(self, report: vt.Object) -> str:
|
||||
analysis = report.get('last_analysis_stats')
|
||||
total = self.get_total_analysis(analysis,
|
||||
report.get('known_distributors'))
|
||||
if report.type == 'ip_address':
|
||||
rtype = 'ip-address'
|
||||
else:
|
||||
rtype = report.type
|
||||
permalink = f'https://www.virustotal.com/gui/{rtype}/{report.id}'
|
||||
|
||||
gti_object = pymisp.MISPObject('google-threat-intelligence-report')
|
||||
gti_object.add_attribute('permalink', type='link', value=permalink)
|
||||
ratio = f"{analysis['malicious']}/{total}" if analysis else '-/-'
|
||||
gti_object.add_attribute('detection-ratio',
|
||||
type='text',
|
||||
value=ratio,
|
||||
disable_correlation=True)
|
||||
report_dict = report.to_dict()
|
||||
gti_object.add_attribute(
|
||||
'threat-score', type='text',
|
||||
value=get_key(report_dict,
|
||||
'attributes.gti_assessment.threat_score.value'))
|
||||
gti_object.add_attribute(
|
||||
'verdict', type='text',
|
||||
value=get_key(report_dict,
|
||||
'attributes.gti_assessment.verdict.value').replace(
|
||||
'VERDICT_', ''))
|
||||
report_object.add_attribute(
|
||||
'Severity', type='text',
|
||||
value=get_key(
|
||||
report, 'attributes.gti_assessment.severity.value').replace(
|
||||
gti_object.add_attribute(
|
||||
'severity', type='text',
|
||||
value=get_key(report_dict,
|
||||
'attributes.gti_assessment.severity.value').replace(
|
||||
'SEVERITY_', ''))
|
||||
report_object.add_attribute(
|
||||
'Threat Label', type='text',
|
||||
value=get_key(
|
||||
report, ('attributes.popular_threat_classification'
|
||||
'.suggested_threat_label')))
|
||||
self.misp_event.add_object(**report_object)
|
||||
return report_object.uuid
|
||||
self.misp_event.add_object(**gti_object)
|
||||
return gti_object.uuid
|
||||
|
||||
def create_misp_object(self, report: vt.Object) -> pymisp.MISPObject:
|
||||
misp_object = None
|
||||
gti_uuid = self.add_gti_report(report)
|
||||
|
||||
if report.type == 'file':
|
||||
misp_object = pymisp.MISPObject('file')
|
||||
for hash_type in ('md5', 'sha1', 'sha256', 'tlsh',
|
||||
'vhash', 'ssdeep', 'imphash'):
|
||||
misp_object.add_attribute(hash_type,
|
||||
**{'type': hash_type,
|
||||
'value': report.get(hash_type)})
|
||||
elif report.type == 'domain':
|
||||
misp_object = pymisp.MISPObject('domain-ip')
|
||||
misp_object.add_attribute('domain', type='domain', value=report.id)
|
||||
elif report.type == 'ip_address':
|
||||
misp_object = pymisp.MISPObject('domain-ip')
|
||||
misp_object.add_attribute('ip', type='ip-dst', value=report.id)
|
||||
elif report.type == 'url':
|
||||
misp_object = pymisp.MISPObject('url')
|
||||
misp_object.add_attribute('url', type='url', value=report.id)
|
||||
misp_object.add_reference(gti_uuid, 'analyzed-with')
|
||||
return misp_object
|
||||
|
||||
def parse_domain(self, domain: str) -> str:
|
||||
"""Create domain MISP object."""
|
||||
domain_report = self.client.get_object(f'/domains/{domain}')
|
||||
|
||||
# DOMAIN
|
||||
domain_object = pymisp.MISPObject('domain-ip')
|
||||
domain_object.add_attribute(
|
||||
'domain', type='domain', value=domain_report.id)
|
||||
domain_object = self.create_misp_object(domain_report)
|
||||
|
||||
# WHOIS
|
||||
if domain_report.whois:
|
||||
whois_object = pymisp.MISPObject('whois')
|
||||
whois_object.add_attribute('text', type='text',
|
||||
value=domain_report.whois)
|
||||
self.misp_event.add_object(**whois_object)
|
||||
|
||||
# SIBLINGS AND SUBDOMAINS
|
||||
for relationship_name, misp_name in [
|
||||
('siblings', 'sibling-of'), ('subdomains', 'subdomain')]:
|
||||
rel_iterator = self.client.iterator(
|
||||
f'/domains/{domain_report.id}/{relationship_name}',
|
||||
limit=self.limit)
|
||||
for item in rel_iterator:
|
||||
attr = pymisp.MISPAttribute()
|
||||
attr.from_dict(**dict(type='domain', value=item.id))
|
||||
self.misp_event.add_attribute(**attr)
|
||||
domain_object.add_reference(attr.uuid, misp_name)
|
||||
|
||||
# RESOLUTIONS
|
||||
resolutions_iterator = self.client.iterator(
|
||||
f'/domains/{domain_report.id}/resolutions', limit=self.limit)
|
||||
for resolution in resolutions_iterator:
|
||||
domain_object.add_attribute('ip', type='ip-dst',
|
||||
value=resolution.ip_address)
|
||||
|
||||
# COMMUNICATING, DOWNLOADED AND REFERRER FILES
|
||||
for relationship_name, misp_name in [
|
||||
('communicating_files', 'communicates-with'),
|
||||
('downloaded_files', 'downloaded-from'),
|
||||
('referrer_files', 'referring')
|
||||
]:
|
||||
files_iterator = self.client.iterator(
|
||||
f'/domains/{domain_report.id}/{relationship_name}',
|
||||
limit=self.limit)
|
||||
for file in files_iterator:
|
||||
file_object = self.create_misp_object(file)
|
||||
file_object.add_reference(domain_object.uuid, misp_name)
|
||||
self.misp_event.add_object(**file_object)
|
||||
|
||||
# URLS
|
||||
urls_iterator = self.client.iterator(
|
||||
f'/domains/{domain_report.id}/urls', limit=self.limit)
|
||||
for url in urls_iterator:
|
||||
url_object = self.create_misp_object(url)
|
||||
url_object.add_reference(domain_object.uuid, 'hosted-in')
|
||||
self.misp_event.add_object(**url_object)
|
||||
|
||||
report_uuid = self.create_gti_report_object(domain_report)
|
||||
domain_object.add_reference(report_uuid, 'analyzed-with')
|
||||
self.misp_event.add_object(**domain_object)
|
||||
return domain_object.uuid
|
||||
|
||||
def parse_hash(self, file_hash: str) -> str:
|
||||
"""Create hash MISP object."""
|
||||
file_report = self.client.get_object(f'/files/{file_hash}')
|
||||
file_object = pymisp.MISPObject('file')
|
||||
for hash_type in ('md5', 'sha1', 'sha256'):
|
||||
file_object.add_attribute(
|
||||
hash_type,
|
||||
**{'type': hash_type, 'value': file_report.get(hash_type)})
|
||||
file_object = self.create_misp_object(file_report)
|
||||
|
||||
# ITW URLS
|
||||
urls_iterator = self.client.iterator(
|
||||
f'/files/{file_report.id}/itw_urls', limit=self.limit)
|
||||
for url in urls_iterator:
|
||||
url_object = self.create_misp_object(url)
|
||||
url_object.add_reference(file_object.uuid, 'downloaded')
|
||||
self.misp_event.add_object(**url_object)
|
||||
|
||||
# COMMUNICATING, DOWNLOADED AND REFERRER FILES
|
||||
for relationship_name, misp_name in [
|
||||
('contacted_urls', 'communicates-with'),
|
||||
('contacted_domains', 'communicates-with'),
|
||||
('contacted_ips', 'communicates-with')
|
||||
]:
|
||||
related_files_iterator = self.client.iterator(
|
||||
f'/files/{file_report.id}/{relationship_name}', limit=self.limit)
|
||||
for related_file in related_files_iterator:
|
||||
related_file_object = self.create_misp_object(related_file)
|
||||
related_file_object.add_reference(file_object.uuid, misp_name)
|
||||
self.misp_event.add_object(**related_file_object)
|
||||
|
||||
report_uuid = self.create_gti_report_object(file_report)
|
||||
file_object.add_reference(report_uuid, 'analyzed-with')
|
||||
self.misp_event.add_object(**file_object)
|
||||
return file_object.uuid
|
||||
|
||||
def parse_ip_port(self, ipport: str) -> str:
|
||||
ip = ipport.split('|')[0]
|
||||
self.parse_ip(ip)
|
||||
|
||||
def parse_ip(self, ip: str) -> str:
|
||||
"""Create ip MISP object."""
|
||||
ip_report = self.client.get_object(f'/ip_addresses/{ip}')
|
||||
|
||||
# IP
|
||||
ip_object = pymisp.MISPObject('domain-ip')
|
||||
ip_object.add_attribute('ip', type='ip-dst', value=ip_report.id)
|
||||
ip_object = self.create_misp_object(ip_report)
|
||||
|
||||
# ASN
|
||||
asn_object = pymisp.MISPObject('asn')
|
||||
asn_object.add_attribute('asn', type='AS', value=ip_report.asn)
|
||||
asn_object.add_attribute('subnet-announced', type='ip-src',
|
||||
value=ip_report.network)
|
||||
asn_object.add_attribute('country', type='text',
|
||||
value=ip_report.country)
|
||||
self.misp_event.add_object(**asn_object)
|
||||
|
||||
# RESOLUTIONS
|
||||
resolutions_iterator = self.client.iterator(
|
||||
f'/ip_addresses/{ip_report.id}/resolutions', limit=self.limit)
|
||||
for resolution in resolutions_iterator:
|
||||
ip_object.add_attribute('domain', type='domain',
|
||||
value=resolution.host_name)
|
||||
|
||||
# URLS
|
||||
urls_iterator = self.client.iterator(
|
||||
f'/ip_addresses/{ip_report.id}/urls', limit=self.limit)
|
||||
for url in urls_iterator:
|
||||
url_object = self.create_misp_object(url)
|
||||
url_object.add_reference(ip_object.uuid, 'hosted-in')
|
||||
self.misp_event.add_object(**url_object)
|
||||
|
||||
report_uuid = self.create_gti_report_object(ip_report)
|
||||
ip_object.add_reference(report_uuid, 'analyzed-with')
|
||||
self.misp_event.add_object(**ip_object)
|
||||
return ip_object.uuid
|
||||
|
||||
def parse_url(self, url: str) -> str:
|
||||
"""Create URL MISP object."""
|
||||
url_id = vt.url_id(url)
|
||||
url_report = self.client.get_object(f'/urls/{url_id}')
|
||||
url_object = self.create_misp_object(url_report)
|
||||
|
||||
url_object = pymisp.MISPObject('url')
|
||||
url_object.add_attribute('url', type='url', value=url_report.url)
|
||||
# COMMUNICATING, DOWNLOADED AND REFERRER FILES
|
||||
for relationship_name, misp_name in [
|
||||
('communicating_files', 'communicates-with'),
|
||||
('downloaded_files', 'downloaded-from'),
|
||||
('referrer_files', 'referring')
|
||||
]:
|
||||
files_iterator = self.client.iterator(
|
||||
f'/urls/{url_report.id}/{relationship_name}', limit=self.limit)
|
||||
for file in files_iterator:
|
||||
file_object = self.create_misp_object(file)
|
||||
file_object.add_reference(url_object.uuid, misp_name)
|
||||
self.misp_event.add_object(**file_object)
|
||||
|
||||
report_uuid = self.create_gti_report_object(url_report)
|
||||
url_object.add_reference(report_uuid, 'analyzed-with')
|
||||
self.misp_event.add_object(**url_object)
|
||||
return url_object.uuid
|
||||
|
||||
|
|
Loading…
Reference in New Issue