added vtreportobject and vt_to_misp example

pull/126/head
Thomas Gardner 2017-10-25 09:48:18 -04:00
parent 4152435250
commit e2d690d0ef
3 changed files with 257 additions and 0 deletions

182
examples/vt_to_misp.py Normal file
View File

@ -0,0 +1,182 @@
''' Convert a VirusTotal report into MISP objects '''
import argparse
import json
import logging
from datetime import datetime
from urllib.parse import urlsplit
import pymisp
from pymisp.tools import VTReportObject
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(module)s.%(funcName)s.%(lineno)d | %(message)s")
def build_cli():
'''
Build the command-line arguments
'''
desc = "Take an indicator or list of indicators to search VT for and import the results into MISP"
post_desc = """
config.json: Should be a JSON file containing MISP and VirusTotal credentials with the following format:
{"misp": {"url": "<url_to_misp>", "key": "<misp_api_key>"}, "virustotal": {"key": "<vt_api_key>"}}
Please note: Only public API features work in the VTReportObject for now. I don't have a quarter million to spare ;)
Example:
python vt_to_misp.py -i 719c97a8cd8db282586c1416894dcaf8 -c ./config.json
"""
parser = argparse.ArgumentParser(description=desc, epilog=post_desc, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-e", "--event", help="MISP event id to add to")
parser.add_argument("-c", "--config", default="config.json", help="Path to JSON configuration file to read")
indicators = parser.add_mutually_exclusive_group(required=True)
indicators.add_argument("-i", "--indicator", help="Single indicator to look up")
indicators.add_argument("-f", "--file", help="File of indicators to look up - one on each line")
indicators.add_argument("-l", "--link", help="Link to a VirusTotal report")
return parser.parse_args()
def build_config(path=None):
'''
Read a configuration file path. File is expected to be
:path: Path to a configuration file
'''
try:
with open(path, "r") as ifile:
return json.load(ifile)
except OSError:
raise OSError("Couldn't find path to configuration file: %s", path)
except json.JSONDecodeError:
raise IOError("Couldn't parse configuration file. Please make sure it is a proper JSON document")
def generate_report(indicator, apikey):
'''
Build our VirusTotal report object, File object, and AV signature objects
and link them appropriately
:indicator: Indicator hash to search in VT for
'''
report_objects = []
vt_report = VTReportObject(apikey, indicator)
report_objects.append(vt_report)
raw_report = vt_report._report
if vt_report._resource_type == "file":
file_object = pymisp.MISPObject(name="file")
file_object.add_attribute("md5", value=raw_report["md5"])
file_object.add_attribute("sha1", value=raw_report["sha1"])
file_object.add_attribute("sha256", value=raw_report["sha256"])
vt_report.add_reference(referenced_uuid=file_object.uuid, relationship_type="report of")
report_objects.append(file_object)
elif vt_report._resource_type == "url":
parsed = urlsplit(indicator)
url_object = pymisp.MISPObject(name="url")
url_object.add_attribute("url", value=parsed.geturl())
url_object.add_attribute("host", value=parsed.hostname)
url_object.add_attribute("scheme", value=parsed.scheme)
url_object.add_attribute("port", value=parsed.port)
vt_report.add_reference(referenced_uuid=url_object.uuid, relationship_type="report of")
report_objects.append(url_object)
for antivirus in raw_report["scans"]:
if raw_report["scans"][antivirus]["detected"]:
av_object = pymisp.MISPObject(name="av-signature")
av_object.add_attribute("software", value=antivirus)
signature_name = raw_report["scans"][antivirus]["result"]
av_object.add_attribute("signature", value=signature_name, disable_correlation=True)
vt_report.add_reference(referenced_uuid=av_object.uuid, relationship_type="included-in")
report_objects.append(av_object)
return report_objects
def get_misp_event(event_id=None, info=None):
'''
Smaller helper function for generating a new MISP event or using a preexisting one
:event_id: The event id of the MISP event to upload objects to
:info: The event's title/info
'''
if event_id:
event = misp.get_event(event_id)
elif info:
event = misp.new_event(info=info)
else:
event = misp.new_event(info="VirusTotal Report")
misp_event = pymisp.MISPEvent()
misp_event.load(event)
return misp_event
def main(misp, config, args):
'''
Main program logic
:misp: PyMISP API object for interfacing with MISP
:config: Configuration dictionary
:args: Argparse CLI object
'''
if args.indicator:
misp_objects = generate_report(args.indicator, config["virustotal"]["key"])
if misp_objects:
misp_event = get_misp_event(args.event, "VirusTotal Report for {}".format(args.indicator))
submit_to_misp(misp, misp_event, misp_objects)
elif args.file:
try:
reports = []
with open(args.file, "r") as ifile:
for indicator in ifile:
try:
misp_objects = generate_report(indicator, config["virustotal"]["key"])
if misp_objects:
reports.append(misp_objects)
except pymisp.exceptions.InvalidMISPObject as err:
logging.error(err)
if reports:
current_time = datetime.now().strftime("%x %X")
misp_event = get_misp_event(args.event, "VirusTotal Reports: {}".format(current_time))
for report in reports:
submit_to_misp(misp, misp_event, report)
except OSError:
logging.error("Couldn't open indicators file at '%s'. Check path", args.file)
elif args.link:
# https://www.virustotal.com/#/file/<ioc>/detection
indicator = args.link.split("/")[5]
misp_objects = generate_report(indicator, config["virustotal"]["key"])
if misp_objects:
misp_event = get_misp_event(args.event, "VirusTotal Report for {}".format(indicator))
submit_to_misp(misp, misp_event, misp_objects)
def submit_to_misp(misp, misp_event, misp_objects):
'''
Submit a list of MISP objects to a MISP event
:misp: PyMISP API object for interfacing with MISP
:misp_event: MISPEvent object
:misp_objects: List of MISPObject objects. Must be a list
'''
# go through round one and only add MISP objects
for misp_object in misp_objects:
template_id = misp.get_object_template_id(misp_object.template_uuid)
misp.add_object(misp_event.id, template_id, misp_object)
# go through round two and add all the object references for each object
for misp_object in misp_objects:
for reference in misp_object.ObjectReference:
misp.add_object_reference(reference)
if __name__ == "__main__":
try:
args = build_cli()
config = build_config(args.config)
# change the 'ssl' value if you want to verify your MISP's SSL instance
misp = pymisp.PyMISP(url=config["misp"]["url"], key=config["misp"]["key"], ssl=False)
# finally, let's start checking VT and converting the reports
main(misp, config, args)
except KeyboardInterrupt:
print("Bye Felicia")
except pymisp.exceptions.InvalidMISPObject as err:
logging.error(err)

View File

@ -1,3 +1,4 @@
from .vtreportobject import VTReportObject # noqa
from .neo4j import Neo4j # noqa
from .fileobject import FileObject # noqa
from .peobject import PEObject, PESectionObject # noqa

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import requests
import validators
from .abstractgenerator import AbstractMISPObjectGenerator
from .. import InvalidMISPObject
class VTReportObject(AbstractMISPObjectGenerator):
'''
VirusTotal Report
:apikey: VirusTotal API key (private works, but only public features are supported right now)
:indicator: IOC to search VirusTotal for
'''
def __init__(self, apikey, indicator):
# PY3 way:
# super().__init__("virustotal-report")
super(VTReportObject, self).__init__("virustotal-report")
indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator)
if self._resource_type:
self._report = self.__query_virustotal(apikey, indicator)
self.generate_attributes()
else:
error_msg = "A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{}' instead".format(indicator)
raise InvalidMISPObject(error_msg)
# Mark as non_jsonable because we need to add the references manually after the object(s) have been created
self.update_not_jsonable('ObjectReference')
def generate_attributes(self):
''' Parse the VirusTotal report for relevant attributes '''
self.add_attribute("last-submission", value=self._report["scan_date"])
self.add_attribute("permalink", value=self._report["permalink"])
ratio = "{}/{}".format(self._report["positives"], self._report["total"])
self.add_attribute("detection-ratio", value=ratio)
def __validate_resource(self, ioc):
'''
Validate the data type of an indicator.
Domains and IP addresses aren't supported because
they don't return the same type of data as the URLs/files do
:ioc: Indicator to search VirusTotal for
'''
if validators.url(ioc):
return "url"
elif re.match(r"\b([a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{64})\b", ioc):
return "file"
return False
def __query_virustotal(self, apikey, resource):
'''
Query VirusTotal for information about an indicator
:apikey: VirusTotal API key
:resource: Indicator to search in VirusTotal
'''
url = "https://www.virustotal.com/vtapi/v2/{}/report".format(self._resource_type)
params = {"apikey": apikey, "resource": resource}
# for now assume we're using a public API key - we'll figure out private keys later
report = requests.get(url, params=params)
report = report.json()
if report["response_code"] == 1:
return report
else:
error_msg = "{}: {}".format(resource, report["verbose_msg"])
raise InvalidMISPObject(error_msg)