PyMISP/pymisp/tools/vtreportobject.py

86 lines
3.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
2020-01-23 10:27:40 +01:00
from typing import Optional
import requests
try:
2020-01-23 10:27:40 +01:00
import validators # type: ignore
has_validators = True
except ImportError:
has_validators = False
from .abstractgenerator import AbstractMISPObjectGenerator
from .. import InvalidMISPObject
class VTReportObject(AbstractMISPObjectGenerator):
'''
VirusTotal Report
:apikey: VirusTotal API key (private works, but only public features are supported right now)
:indicator: IOC to search VirusTotal for
'''
2020-07-28 20:05:42 +02:00
def __init__(self, apikey: str, indicator: str, vt_proxies: Optional[dict] = None, **kwargs):
2021-10-26 02:37:12 +02:00
super().__init__('virustotal-report', **kwargs)
indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator)
if self._resource_type:
2017-12-04 18:43:44 +01:00
self._proxies = vt_proxies
self._report = self.__query_virustotal(apikey, indicator)
self.generate_attributes()
else:
error_msg = "A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{}' instead".format(indicator)
raise InvalidMISPObject(error_msg)
2017-12-04 18:43:44 +01:00
def get_report(self):
return self._report
def generate_attributes(self):
''' Parse the VirusTotal report for relevant attributes '''
self.add_attribute("last-submission", value=self._report["scan_date"])
self.add_attribute("permalink", value=self._report["permalink"])
ratio = "{}/{}".format(self._report["positives"], self._report["total"])
self.add_attribute("detection-ratio", value=ratio)
2020-01-23 10:27:40 +01:00
def __validate_resource(self, ioc: str):
'''
Validate the data type of an indicator.
Domains and IP addresses aren't supported because
they don't return the same type of data as the URLs/files do
:ioc: Indicator to search VirusTotal for
'''
if not has_validators:
raise Exception('You need to install validators: pip install validators')
if validators.url(ioc):
return "url"
elif re.match(r"\b([a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})\b", ioc):
return "file"
return False
2020-01-23 10:27:40 +01:00
def __query_virustotal(self, apikey: str, resource: str):
'''
Query VirusTotal for information about an indicator
:apikey: VirusTotal API key
:resource: Indicator to search in VirusTotal
'''
url = "https://www.virustotal.com/vtapi/v2/{}/report".format(self._resource_type)
params = {"apikey": apikey, "resource": resource}
# for now assume we're using a public API key - we'll figure out private keys later
2017-12-04 18:43:44 +01:00
if self._proxies:
report = requests.get(url, params=params, proxies=self._proxies)
else:
report = requests.get(url, params=params)
2020-01-23 10:27:40 +01:00
report_json = report.json()
if report_json["response_code"] == 1:
return report_json
else:
2020-01-23 10:27:40 +01:00
error_msg = "{}: {}".format(resource, report_json["verbose_msg"])
raise InvalidMISPObject(error_msg)