PyMISP/pymisp/tools/vtreportobject.py

87 lines
3.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
from __future__ import annotations
import re
from typing import Any
import requests
try:
2024-03-22 09:59:32 +01:00
import validators
has_validators = True
except ImportError:
has_validators = False
from .abstractgenerator import AbstractMISPObjectGenerator
from .. import InvalidMISPObject
class VTReportObject(AbstractMISPObjectGenerator):
'''
VirusTotal Report
:apikey: VirusTotal API key (private works, but only public features are supported right now)
:indicator: IOC to search VirusTotal for
'''
def __init__(self, apikey: str, indicator: str, vt_proxies: dict[str, str] | None = None, **kwargs) -> None: # type: ignore[no-untyped-def]
2021-10-26 02:37:12 +02:00
super().__init__('virustotal-report', **kwargs)
indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator)
if self._resource_type:
2017-12-04 18:43:44 +01:00
self._proxies = vt_proxies
self._report = self.__query_virustotal(apikey, indicator)
self.generate_attributes()
else:
error_msg = f"A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{indicator}' instead"
raise InvalidMISPObject(error_msg)
def get_report(self) -> dict[str, Any]:
2017-12-04 18:43:44 +01:00
return self._report
def generate_attributes(self) -> None:
''' Parse the VirusTotal report for relevant attributes '''
self.add_attribute("last-submission", value=self._report["scan_date"])
self.add_attribute("permalink", value=self._report["permalink"])
ratio = "{}/{}".format(self._report["positives"], self._report["total"])
self.add_attribute("detection-ratio", value=ratio)
def __validate_resource(self, ioc: str) -> str | bool:
'''
Validate the data type of an indicator.
Domains and IP addresses aren't supported because
they don't return the same type of data as the URLs/files do
:ioc: Indicator to search VirusTotal for
'''
if not has_validators:
raise Exception('You need to install validators: pip install validators')
if validators.url(ioc):
return "url"
elif re.match(r"\b([a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})\b", ioc):
return "file"
return False
def __query_virustotal(self, apikey: str, resource: str) -> dict[str, Any]:
'''
Query VirusTotal for information about an indicator
:apikey: VirusTotal API key
:resource: Indicator to search in VirusTotal
'''
url = f"https://www.virustotal.com/vtapi/v2/{self._resource_type}/report"
params = {"apikey": apikey, "resource": resource}
# for now assume we're using a public API key - we'll figure out private keys later
2017-12-04 18:43:44 +01:00
if self._proxies:
report = requests.get(url, params=params, proxies=self._proxies)
else:
report = requests.get(url, params=params)
2020-01-23 10:27:40 +01:00
report_json = report.json()
if report_json["response_code"] == 1:
return report_json
else:
2020-01-23 10:27:40 +01:00
error_msg = "{}: {}".format(resource, report_json["verbose_msg"])
raise InvalidMISPObject(error_msg)