2017-10-25 15:48:18 +02:00
|
|
|
#!/usr/bin/env python3
|
2024-01-17 13:13:14 +01:00
|
|
|
|
|
|
|
from __future__ import annotations
|
2017-10-25 15:48:18 +02:00
|
|
|
|
|
|
|
import re
|
2024-02-01 14:40:12 +01:00
|
|
|
from typing import Any
|
2017-10-25 15:48:18 +02:00
|
|
|
|
|
|
|
import requests
|
2017-10-25 21:00:00 +02:00
|
|
|
try:
|
2024-03-22 09:59:32 +01:00
|
|
|
import validators
|
2017-10-25 21:00:00 +02:00
|
|
|
has_validators = True
|
|
|
|
except ImportError:
|
|
|
|
has_validators = False
|
|
|
|
|
2017-10-25 15:48:18 +02:00
|
|
|
|
|
|
|
from .abstractgenerator import AbstractMISPObjectGenerator
|
|
|
|
from .. import InvalidMISPObject
|
|
|
|
|
|
|
|
|
|
|
|
class VTReportObject(AbstractMISPObjectGenerator):
|
|
|
|
'''
|
|
|
|
VirusTotal Report
|
|
|
|
|
|
|
|
:apikey: VirusTotal API key (private works, but only public features are supported right now)
|
|
|
|
|
|
|
|
:indicator: IOC to search VirusTotal for
|
|
|
|
'''
|
2024-02-01 14:40:12 +01:00
|
|
|
def __init__(self, apikey: str, indicator: str, vt_proxies: dict[str, str] | None = None, **kwargs) -> None: # type: ignore[no-untyped-def]
|
2021-10-26 02:37:12 +02:00
|
|
|
super().__init__('virustotal-report', **kwargs)
|
2017-10-25 15:48:18 +02:00
|
|
|
indicator = indicator.strip()
|
|
|
|
self._resource_type = self.__validate_resource(indicator)
|
|
|
|
if self._resource_type:
|
2017-12-04 18:43:44 +01:00
|
|
|
self._proxies = vt_proxies
|
2017-10-25 15:48:18 +02:00
|
|
|
self._report = self.__query_virustotal(apikey, indicator)
|
|
|
|
self.generate_attributes()
|
|
|
|
else:
|
2024-01-17 13:13:14 +01:00
|
|
|
error_msg = f"A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{indicator}' instead"
|
2017-10-25 15:48:18 +02:00
|
|
|
raise InvalidMISPObject(error_msg)
|
|
|
|
|
2024-02-01 14:40:12 +01:00
|
|
|
def get_report(self) -> dict[str, Any]:
|
2017-12-04 18:43:44 +01:00
|
|
|
return self._report
|
|
|
|
|
2024-02-01 14:40:12 +01:00
|
|
|
def generate_attributes(self) -> None:
|
2017-10-25 15:48:18 +02:00
|
|
|
''' Parse the VirusTotal report for relevant attributes '''
|
|
|
|
self.add_attribute("last-submission", value=self._report["scan_date"])
|
|
|
|
self.add_attribute("permalink", value=self._report["permalink"])
|
|
|
|
ratio = "{}/{}".format(self._report["positives"], self._report["total"])
|
|
|
|
self.add_attribute("detection-ratio", value=ratio)
|
|
|
|
|
2024-02-01 14:40:12 +01:00
|
|
|
def __validate_resource(self, ioc: str) -> str | bool:
|
2017-10-25 15:48:18 +02:00
|
|
|
'''
|
|
|
|
Validate the data type of an indicator.
|
|
|
|
Domains and IP addresses aren't supported because
|
|
|
|
they don't return the same type of data as the URLs/files do
|
|
|
|
|
|
|
|
:ioc: Indicator to search VirusTotal for
|
|
|
|
'''
|
2017-10-25 21:00:00 +02:00
|
|
|
if not has_validators:
|
|
|
|
raise Exception('You need to install validators: pip install validators')
|
2017-10-25 15:48:18 +02:00
|
|
|
if validators.url(ioc):
|
|
|
|
return "url"
|
|
|
|
elif re.match(r"\b([a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})\b", ioc):
|
|
|
|
return "file"
|
|
|
|
return False
|
|
|
|
|
2024-02-01 14:40:12 +01:00
|
|
|
def __query_virustotal(self, apikey: str, resource: str) -> dict[str, Any]:
|
2017-10-25 15:48:18 +02:00
|
|
|
'''
|
|
|
|
Query VirusTotal for information about an indicator
|
|
|
|
|
|
|
|
:apikey: VirusTotal API key
|
|
|
|
|
|
|
|
:resource: Indicator to search in VirusTotal
|
|
|
|
'''
|
2024-01-17 13:13:14 +01:00
|
|
|
url = f"https://www.virustotal.com/vtapi/v2/{self._resource_type}/report"
|
2017-10-25 15:48:18 +02:00
|
|
|
params = {"apikey": apikey, "resource": resource}
|
|
|
|
# for now assume we're using a public API key - we'll figure out private keys later
|
2017-12-04 18:43:44 +01:00
|
|
|
if self._proxies:
|
|
|
|
report = requests.get(url, params=params, proxies=self._proxies)
|
|
|
|
else:
|
|
|
|
report = requests.get(url, params=params)
|
2020-01-23 10:27:40 +01:00
|
|
|
report_json = report.json()
|
|
|
|
if report_json["response_code"] == 1:
|
2020-04-06 11:46:15 +02:00
|
|
|
return report_json
|
2017-10-25 15:48:18 +02:00
|
|
|
else:
|
2020-01-23 10:27:40 +01:00
|
|
|
error_msg = "{}: {}".format(resource, report_json["verbose_msg"])
|
2017-10-25 15:48:18 +02:00
|
|
|
raise InvalidMISPObject(error_msg)
|