From 0b9781b4daa026e8e9d9ee0098237529cfb133e8 Mon Sep 17 00:00:00 2001 From: Jens Thom Date: Mon, 30 Nov 2020 12:11:44 +0100 Subject: [PATCH] update `vmray_automation` to stay compatible with the changes made to `vmray_import` MISP modules --- examples/vmray_automation.py | 421 +++++++++++++++++++++-------------- 1 file changed, 250 insertions(+), 171 deletions(-) diff --git a/examples/vmray_automation.py b/examples/vmray_automation.py index 0eb3ac8..b87ba31 100644 --- a/examples/vmray_automation.py +++ b/examples/vmray_automation.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -''' -Koen Van Impe +""" +Jens Thom (VMRay), Koen Van Impe VMRay automatic import Put this script in crontab to run every /15 or /60 @@ -9,194 +9,273 @@ Put this script in crontab to run every /15 or /60 Calls "vmray_import" for all events that have an 'incomplete' VMray analysis -Do inline config in "main" +Do inline config in "main". +If your MISP user is not an admin, you cannot use `get_config`, +use `overwrite_config` instead. +Example config: + config = { + "vmray_import_enabled": True, + "vmray_import_apikey": vmray_api_key, + "vmray_import_url": vmray_server, + "vmray_import_disable_tags": False, + "vmray_import_disable_misp_objects": False, + "vmray_import_ignore_analysis_finished": False, + "services_port": 6666, + "services_url": "http://localhost", + "Artifacts": "1", + "VTI": "1", + "IOCs": "1", + "Analysis Details": "1", + } +""" -''' +import logging +import urllib -from pymisp import ExpandedPyMISP, MISPAttribute -from keys import misp_url, misp_key, misp_verifycert -import argparse -import os -import json -import datetime -import time +from typing import Any, Dict, List, Optional import requests -import sys + +from keys import misp_key, misp_url, misp_verifycert +from pymisp import ExpandedPyMISP # Suppress those "Unverified HTTPS request is being made" import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -def get_vmray_config(url, key, misp_verifycert, default_wait_period): +def is_url(url: str) -> bool: try: - misp_headers = {'Content-Type': 'application/json', 'Accept': 'application/json', 'Authorization': key} - req = requests.get(url + 'servers/serverSettings.json', verify=misp_verifycert, headers=misp_headers) + result = urllib.parse.urlparse(url) + return result.scheme and result.netloc + except ValueError: + return False - if req.status_code == 200: - req_json = req.json() - if 'finalSettings' in req_json: - finalSettings = req_json['finalSettings'] - vmray_api = '' - vmray_url = '' - vmray_wait_period = 0 - for el in finalSettings: - # Is the vmray import module enabled? - if el['setting'] == 'Plugin.Import_vmray_import_enabled': - vmray_import_enabled = el['value'] - if vmray_import_enabled is False: - break - # Get the VMRay API key from the MISP settings - elif el['setting'] == 'Plugin.Import_vmray_import_apikey': - vmray_api = el['value'] - # The VMRay URL to query - elif el['setting'] == 'Plugin.Import_vmray_import_url': - vmray_url = el['value'].replace('/', '\\/') - # MISP modules - Port? - elif el['setting'] == 'Plugin.Import_services_port': - module_import_port = el['value'] - if module_import_port: - module_import_port = str(module_import_port) - else: - module_import_port = "6666" - # MISP modules - URL - elif el['setting'] == 'Plugin.Import_services_url': - module_import_url = el['value'].replace('\/\/', '//') - # Wait period - elif el['setting'] == 'Plugin.Import_vmray_import_wait_period': - vmray_wait_period = abs(int(el['value'])) +class VMRayAutomationException(Exception): + pass - if vmray_wait_period < 1: - vmray_wait_period = default_wait_period + +class VMRayAutomation: + def __init__( + self, + misp_url: str, + misp_key: str, + verify_cert: bool = False, + debug: bool = False, + ) -> None: + # setup logging + log_level = logging.DEBUG if debug else logging.INFO + log_format = "%(asctime)s - %(name)s - %(levelname)8s - %(message)s" + + logging.basicConfig(level=log_level, format=log_format) + logging.getLogger("pymisp").setLevel(log_level) + self.logger = logging.getLogger(self.__class__.__name__) + + self.misp_url = misp_url.rstrip("/") + self.misp_key = misp_key + self.verifycert = verify_cert + self.misp = ExpandedPyMISP(misp_url, misp_key, ssl=verify_cert, debug=debug) + self.config = {} + self.tag_incomplete = 'workflow:state="incomplete"' + + @staticmethod + def _setting_enabled(value: bool) -> bool: + if not value: + raise VMRayAutomationException( + "VMRay import is disabled. " + "Please enable `vmray_import` in the MISP settings." + ) + + return True + + @staticmethod + def _setting_apikey(value: str) -> str: + if not value: + raise VMRayAutomationException( + "VMRay API key not set. Please set the API key in the MISP settings." + ) + + return value + + @staticmethod + def _setting_url(value: str) -> str: + if not value: + raise VMRayAutomationException( + "VMRay URL not set. Please set the URL in the MISP settings." + ) + + if not is_url(value): + raise VMRayAutomationException("Not a valid URL") + + return value + + @staticmethod + def _setting_disabled(value: str) -> bool: + return value.lower() in ["no", "false"] + + @staticmethod + def _services_port(value: int) -> bool: + if value == 0: + return 6666 + return value + + @staticmethod + def services_url(value: str) -> bool: + if not is_url(value): + raise VMRayAutomationException("Services URL is not valid.") + + return value + + @property + def vmray_settings(self) -> Dict[str, Any]: + return { + "vmray_import_enabled": self._setting_enabled, + "vmray_import_apikey": self._setting_apikey, + "vmray_import_url": self._setting_url, + "vmray_import_disable_tags": self._setting_disabled, + "vmray_import_disable_misp_objects": self._setting_disabled, + "vmray_import_ignore_analysis_finished": self._setting_disabled, + "services_port": self._services_port, + "services_url": self.services_url, + } + + def _get_misp_settings(self) -> List[Dict[str, Any]]: + misp_headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": self.misp_key, + } + + response = requests.get( + f"{self.misp_url}/servers/serverSettings.json", + verify=self.verifycert, + headers=misp_headers, + ) + + if response.status_code == 200: + settings = response.json() + if "finalSettings" in settings: + return settings["finalSettings"] + + raise VMRayAutomationException("Could not get settings from MISP server.") + + def get_config(self) -> None: + self.logger.debug("Loading confing...") + # get settings from MISP server + settings = self._get_misp_settings() + for setting in settings: + config_name = setting["setting"].replace("Plugin.Import_", "") + if config_name in self.vmray_settings: + func = self.vmray_settings[config_name] + value = func(setting["value"]) + self.config[config_name] = value + + # set default `vmray_import` settings + self.config.setdefault("VTI", "1") + self.config.setdefault("IOCs", "1") + self.config.setdefault("Artifacts", "0") + self.config.setdefault("Analysis Details", "1") + + self.logger.info("Loading config: Done.") + + def overwrite_config(self, config: Dict[str, Any]) -> None: + self.config.update(config) + + def _get_sample_id(self, value: str) -> Optional[int]: + vmray_sample_id_text = "VMRay Sample ID: " + if not value.startswith(vmray_sample_id_text): + self.logger.warning("Invalid Sample ID: %s.", value) + return None + + return int(value.replace(vmray_sample_id_text, "")) + + def _call_vmray_import(self, sample_id: int, event_id: str) -> Dict[str, Any]: + url = f"{self.config['services_url']}:{self.config['services_port']}/query" + + config = {"Sample ID": sample_id} + for key, value in self.config.items(): + vmray_config_key = key.replace("vmray_import_", "") + config[vmray_config_key] = str(value) + + data = { + "module": "vmray_import", + "event_id": event_id, + "config": config, + "data": "", + } + + self.logger.debug("calling `vmray_import`: url=%s, config=%s", url, config) + response = requests.post(url, json=data) + if response.status_code != 200: + raise VMRayAutomationException( + f"MISP modules returned status code `{response.status_code}`" + ) + + json_response = response.json() + if "error" in json_response: + error = json_response["error"] + raise VMRayAutomationException(f"MISP modules returned error: {error}") + + return json_response + + def _add_event_attributes(self, event_id: int, attributes: Dict[str, Any]) -> None: + event = self.misp.get_event(event_id, pythonify=True) + for attr in attributes["Attribute"]: + event.add_attribute(**attr) + + self.misp.update_event(event) + + def _add_event_objects(self, event_id: int, objects: Dict[str, Any]) -> None: + event = self.misp.get_event(event_id, pythonify=True) + for obj in objects["Object"]: + event.add_object(**obj) + + if "Tag" in objects: + for tag in objects["Tag"]: + event.add_tag(tag["name"]) + + self.misp.update_event(event) + + def _add_misp_event(self, event_id: int, response: Dict[str, Any]) -> None: + if self.config["vmray_import_disable_misp_objects"]: + self._add_event_attributes(event_id, response["results"]) else: - sys.exit('Did not receive a 200 code from MISP') + self._add_event_objects(event_id, response["results"]) - if vmray_import_enabled and vmray_api and vmray_url and module_import_port and module_import_url: - return {'vmray_wait_period': vmray_wait_period, 'vmray_api': vmray_api, 'vmray_url': vmray_url, 'module_import_port': module_import_port, 'module_import_url': module_import_url} - sys.exit('Did not receive all the necessary configuration information from MISP') + def import_incomplete_analyses(self) -> None: + self.logger.info("Searching for attributes with tag='%s'", self.tag_incomplete) + result = self.misp.search("attributes", tags=self.tag_incomplete) + attributes = result["Attribute"] - except Exception as e: - sys.exit('Unable to get VMRay config from MISP') + for attr in attributes: + event_id = int(attr["event_id"]) + self.logger.info("Processing event ID `%d`.", event_id) + + sample_id = self._get_sample_id(attr["value"]) + if not sample_id: + continue + + response = self._call_vmray_import(sample_id, event_id) + self._add_misp_event(event_id, response) + self.misp.untag(attr["uuid"], self.tag_incomplete) -def search_vmray_incomplete(m, url, wait_period, module_import_url, module_import_port, vmray_url, vmray_api, vmray_attribute_category, vmray_include_analysisid, vmray_include_imphash_ssdeep, vmray_include_extracted_files, vmray_include_analysisdetails, vmray_include_vtidetails, custom_tags_incomplete, custom_tags_complete): - controller = 'attributes' - vmray_value = 'VMRay Sample ID:' # How sample IDs are stored in MISP - req = None +def main(): + debug = False + config = { + "Artifacts": "0", + "VTI": "1", + "IOCs": "1", + "Analysis Details": "0", + "vmray_import_disable_misp_objects": False, + } - # Search for the events - try: - result = m.search(controller, tags=custom_tags_incomplete) - - attribute = result['Attribute'] - - if len(attribute) == 0: - sys.exit("No VMRay attributes found that match %s" % custom_tags_incomplete) - - timestamp = int(attribute[0]["timestamp"]) - # Not enough time has gone by to lookup the analysis jobs - if int((time.time() - timestamp) / 60) < int(wait_period): - if module_DEBUG: - r_timestamp = datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') - print("Attribute to recent for wait_period (%s minutes) - timestamp attribute: %s (%s minutes old)" % (wait_period, r_timestamp, round((int(time.time() - timestamp) / 60), 2))) - return False - - if module_DEBUG: - print("All attributes older than %s" % int(wait_period)) - - for att in attribute: - value = att['value'] - - if vmray_value in value: # We found a sample ID - att_id = att['id'] - att_uuid = att['uuid'] - - # VMRay Sample IDs are stored as VMRay Sample ID: 2796577 - vmray_sample_id = value.split(vmray_value)[1].strip() - if vmray_sample_id.isdigit(): - event_id = att['event_id'] - - if module_DEBUG: - print("Found event %s with matching tags %s for sample id %s " % (event_id, custom_tags_incomplete, vmray_sample_id)) - - # Prepare request to send to vmray_import via misp modules - misp_modules_url = module_import_url + ':' + module_import_port + '/query' - misp_modules_headers = {'Content-Type': 'application/json'} - misp_modules_body = '{ "sample_id":"' + vmray_sample_id + '","module":"vmray_import","event_id":"' + event_id + '","config":{"apikey":"' + vmray_api + '","url":"' + vmray_url + '","include_analysisid":"' + vmray_include_analysisid + '","include_analysisdetails":"' + vmray_include_analysisdetails + '","include_extracted_files":"' + vmray_include_extracted_files + '","include_imphash_ssdeep":"' + vmray_include_imphash_ssdeep + '","include_vtidetails":"' + vmray_include_vtidetails + '","sample_id":"' + vmray_sample_id + '"},"data":""}' - req = requests.post(misp_modules_url, data=misp_modules_body, headers=misp_modules_headers) - if module_DEBUG and req is not None: - print("Response code from submitting to MISP modules %s" % (req.status_code)) - - # Successful response from the misp modules? - if req.status_code == 200: - req_json = req.json() - if "error" in req_json: - print("Error code in reply %s " % req_json["error"]) - continue - else: - results = req_json["results"] - - # Walk through all results in the misp-module reply - for el in results: - to_ids = True - values = el['values'] - types = el['types'] - if "to_ids" in el: - to_ids = el['to_ids'] - if "text" in types: - to_ids = False - comment = el['comment'] - if len(comment) < 1: - comment = "Enriched via the vmray_import module" - - # Attribute can belong in different types - for attr_type in types: - try: - new_attribute = MISPAttribute() - new_attribute.type = attr_type - new_attribute.category = vmray_attribute_category - new_attribute.value = values - new_attribute.to_ids = to_ids - new_attribute.comment = comment - r = m.add_attribute(event_id, new_attribute) - if module_DEBUG: - print("Add event %s: %s as %s (%s) (toids: %s)" % (event_id, values, attr_type, comment, to_ids)) - except Exception as e: - if module_DEBUG: - print("Unable to add attribute %s as type %s for event %s" % (values, attr_type, event_id)) - continue - - # Remove 'incomplete' state tags - m.untag(att_uuid, custom_tags_incomplete) - # Update tags to 'complete' state - m.tag(att_uuid, custom_tags_complete) - if module_DEBUG: - print("Updated event %s" % event_id) - - else: - sys.exit('MISP modules did not return HTTP 200 code (event %s ; sampleid %s)' % (event_id, vmray_sample_id)) - - except Exception as e: - sys.exit("Invalid response received from MISP : %s", e) + automation = VMRayAutomation(misp_url, misp_key, misp_verifycert, debug) + automation.get_config() # only possible with admin user + automation.overwrite_config(config) + automation.import_incomplete_analyses() -if __name__ == '__main__': - - module_DEBUG = True - - # Set some defaults to be used in this module - vmray_attribute_category = 'External analysis' - vmray_include_analysisid = '0' - vmray_include_imphash_ssdeep = '0' - vmray_include_extracted_files = '0' - vmray_include_analysisdetails = '0' - vmray_include_vtidetails = '0' - custom_tags_incomplete = 'workflow:state="incomplete"' - custom_tags_complete = 'workflow:state="complete"' - default_wait_period = 30 - - misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert, debug=module_DEBUG) - vmray_config = get_vmray_config(misp_url, misp_key, misp_verifycert, default_wait_period) - search_vmray_incomplete(misp, misp_url, vmray_config['vmray_wait_period'], vmray_config['module_import_url'], vmray_config['module_import_port'], vmray_config['vmray_url'], vmray_config['vmray_api'], vmray_attribute_category, vmray_include_analysisid, vmray_include_imphash_ssdeep, vmray_include_extracted_files, vmray_include_analysisdetails, vmray_include_vtidetails, custom_tags_incomplete, custom_tags_complete) +if __name__ == "__main__": + main()