diff --git a/examples/vt_to_misp.py b/examples/vt_to_misp.py new file mode 100644 index 0000000..5554aba --- /dev/null +++ b/examples/vt_to_misp.py @@ -0,0 +1,182 @@ +''' Convert a VirusTotal report into MISP objects ''' +import argparse +import json +import logging +from datetime import datetime +from urllib.parse import urlsplit + +import pymisp +from pymisp.tools import VTReportObject + +logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(module)s.%(funcName)s.%(lineno)d | %(message)s") + + +def build_cli(): + ''' + Build the command-line arguments + ''' + desc = "Take an indicator or list of indicators to search VT for and import the results into MISP" + post_desc = """ +config.json: Should be a JSON file containing MISP and VirusTotal credentials with the following format: +{"misp": {"url": "", "key": ""}, "virustotal": {"key": ""}} +Please note: Only public API features work in the VTReportObject for now. I don't have a quarter million to spare ;) + +Example: + python vt_to_misp.py -i 719c97a8cd8db282586c1416894dcaf8 -c ./config.json + """ + parser = argparse.ArgumentParser(description=desc, epilog=post_desc, formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument("-e", "--event", help="MISP event id to add to") + parser.add_argument("-c", "--config", default="config.json", help="Path to JSON configuration file to read") + indicators = parser.add_mutually_exclusive_group(required=True) + indicators.add_argument("-i", "--indicator", help="Single indicator to look up") + indicators.add_argument("-f", "--file", help="File of indicators to look up - one on each line") + indicators.add_argument("-l", "--link", help="Link to a VirusTotal report") + return parser.parse_args() + + +def build_config(path=None): + ''' + Read a configuration file path. File is expected to be + + :path: Path to a configuration file + ''' + try: + with open(path, "r") as ifile: + return json.load(ifile) + except OSError: + raise OSError("Couldn't find path to configuration file: %s", path) + except json.JSONDecodeError: + raise IOError("Couldn't parse configuration file. Please make sure it is a proper JSON document") + + +def generate_report(indicator, apikey): + ''' + Build our VirusTotal report object, File object, and AV signature objects + and link them appropriately + + :indicator: Indicator hash to search in VT for + ''' + report_objects = [] + vt_report = VTReportObject(apikey, indicator) + report_objects.append(vt_report) + raw_report = vt_report._report + if vt_report._resource_type == "file": + file_object = pymisp.MISPObject(name="file") + file_object.add_attribute("md5", value=raw_report["md5"]) + file_object.add_attribute("sha1", value=raw_report["sha1"]) + file_object.add_attribute("sha256", value=raw_report["sha256"]) + vt_report.add_reference(referenced_uuid=file_object.uuid, relationship_type="report of") + report_objects.append(file_object) + elif vt_report._resource_type == "url": + parsed = urlsplit(indicator) + url_object = pymisp.MISPObject(name="url") + url_object.add_attribute("url", value=parsed.geturl()) + url_object.add_attribute("host", value=parsed.hostname) + url_object.add_attribute("scheme", value=parsed.scheme) + url_object.add_attribute("port", value=parsed.port) + vt_report.add_reference(referenced_uuid=url_object.uuid, relationship_type="report of") + report_objects.append(url_object) + for antivirus in raw_report["scans"]: + if raw_report["scans"][antivirus]["detected"]: + av_object = pymisp.MISPObject(name="av-signature") + av_object.add_attribute("software", value=antivirus) + signature_name = raw_report["scans"][antivirus]["result"] + av_object.add_attribute("signature", value=signature_name, disable_correlation=True) + vt_report.add_reference(referenced_uuid=av_object.uuid, relationship_type="included-in") + report_objects.append(av_object) + return report_objects + + +def get_misp_event(event_id=None, info=None): + ''' + Smaller helper function for generating a new MISP event or using a preexisting one + + :event_id: The event id of the MISP event to upload objects to + + :info: The event's title/info + ''' + if event_id: + event = misp.get_event(event_id) + elif info: + event = misp.new_event(info=info) + else: + event = misp.new_event(info="VirusTotal Report") + misp_event = pymisp.MISPEvent() + misp_event.load(event) + return misp_event + + +def main(misp, config, args): + ''' + Main program logic + + :misp: PyMISP API object for interfacing with MISP + + :config: Configuration dictionary + + :args: Argparse CLI object + ''' + if args.indicator: + misp_objects = generate_report(args.indicator, config["virustotal"]["key"]) + if misp_objects: + misp_event = get_misp_event(args.event, "VirusTotal Report for {}".format(args.indicator)) + submit_to_misp(misp, misp_event, misp_objects) + elif args.file: + try: + reports = [] + with open(args.file, "r") as ifile: + for indicator in ifile: + try: + misp_objects = generate_report(indicator, config["virustotal"]["key"]) + if misp_objects: + reports.append(misp_objects) + except pymisp.exceptions.InvalidMISPObject as err: + logging.error(err) + if reports: + current_time = datetime.now().strftime("%x %X") + misp_event = get_misp_event(args.event, "VirusTotal Reports: {}".format(current_time)) + for report in reports: + submit_to_misp(misp, misp_event, report) + except OSError: + logging.error("Couldn't open indicators file at '%s'. Check path", args.file) + elif args.link: + # https://www.virustotal.com/#/file//detection + indicator = args.link.split("/")[5] + misp_objects = generate_report(indicator, config["virustotal"]["key"]) + if misp_objects: + misp_event = get_misp_event(args.event, "VirusTotal Report for {}".format(indicator)) + submit_to_misp(misp, misp_event, misp_objects) + + +def submit_to_misp(misp, misp_event, misp_objects): + ''' + Submit a list of MISP objects to a MISP event + + :misp: PyMISP API object for interfacing with MISP + + :misp_event: MISPEvent object + + :misp_objects: List of MISPObject objects. Must be a list + ''' +# go through round one and only add MISP objects + for misp_object in misp_objects: + template_id = misp.get_object_template_id(misp_object.template_uuid) + misp.add_object(misp_event.id, template_id, misp_object) + # go through round two and add all the object references for each object + for misp_object in misp_objects: + for reference in misp_object.ObjectReference: + misp.add_object_reference(reference) + + +if __name__ == "__main__": + try: + args = build_cli() + config = build_config(args.config) + # change the 'ssl' value if you want to verify your MISP's SSL instance + misp = pymisp.PyMISP(url=config["misp"]["url"], key=config["misp"]["key"], ssl=False) + # finally, let's start checking VT and converting the reports + main(misp, config, args) + except KeyboardInterrupt: + print("Bye Felicia") + except pymisp.exceptions.InvalidMISPObject as err: + logging.error(err) diff --git a/pymisp/tools/__init__.py b/pymisp/tools/__init__.py index a0c667a..0412fde 100644 --- a/pymisp/tools/__init__.py +++ b/pymisp/tools/__init__.py @@ -1,3 +1,4 @@ +from .vtreportobject import VTReportObject # noqa from .neo4j import Neo4j # noqa from .fileobject import FileObject # noqa from .peobject import PEObject, PESectionObject # noqa diff --git a/pymisp/tools/vtreportobject.py b/pymisp/tools/vtreportobject.py new file mode 100644 index 0000000..5a4858b --- /dev/null +++ b/pymisp/tools/vtreportobject.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import re + +import requests +import validators + +from .abstractgenerator import AbstractMISPObjectGenerator +from .. import InvalidMISPObject + + +class VTReportObject(AbstractMISPObjectGenerator): + ''' + VirusTotal Report + + :apikey: VirusTotal API key (private works, but only public features are supported right now) + + :indicator: IOC to search VirusTotal for + ''' + def __init__(self, apikey, indicator): + # PY3 way: + # super().__init__("virustotal-report") + super(VTReportObject, self).__init__("virustotal-report") + indicator = indicator.strip() + self._resource_type = self.__validate_resource(indicator) + if self._resource_type: + self._report = self.__query_virustotal(apikey, indicator) + self.generate_attributes() + else: + error_msg = "A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{}' instead".format(indicator) + raise InvalidMISPObject(error_msg) + # Mark as non_jsonable because we need to add the references manually after the object(s) have been created + self.update_not_jsonable('ObjectReference') + + def generate_attributes(self): + ''' Parse the VirusTotal report for relevant attributes ''' + self.add_attribute("last-submission", value=self._report["scan_date"]) + self.add_attribute("permalink", value=self._report["permalink"]) + ratio = "{}/{}".format(self._report["positives"], self._report["total"]) + self.add_attribute("detection-ratio", value=ratio) + + def __validate_resource(self, ioc): + ''' + Validate the data type of an indicator. + Domains and IP addresses aren't supported because + they don't return the same type of data as the URLs/files do + + :ioc: Indicator to search VirusTotal for + ''' + if validators.url(ioc): + return "url" + elif re.match(r"\b([a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})\b", ioc): + return "file" + return False + + def __query_virustotal(self, apikey, resource): + ''' + Query VirusTotal for information about an indicator + + :apikey: VirusTotal API key + + :resource: Indicator to search in VirusTotal + ''' + url = "https://www.virustotal.com/vtapi/v2/{}/report".format(self._resource_type) + params = {"apikey": apikey, "resource": resource} + # for now assume we're using a public API key - we'll figure out private keys later + report = requests.get(url, params=params) + report = report.json() + if report["response_code"] == 1: + return report + else: + error_msg = "{}: {}".format(resource, report["verbose_msg"]) + raise InvalidMISPObject(error_msg)