#!/usr/bin/env python3 """ Expansion module integrating with VMware NSX Defender. """ import argparse import base64 import configparser import datetime import hashlib import io import ipaddress import json import logging import pymisp import sys import vt import zipfile from urllib import parse from typing import Any, Dict, List, Optional, Tuple, Union import tau_clients from tau_clients import exceptions from tau_clients import nsx_defender logger = logging.getLogger("vmware_nsx") logger.setLevel(logging.DEBUG) misperrors = { "error": "Error", } mispattributes = { "input": [ "attachment", "malware-sample", "url", "md5", "sha1", "sha256", ], "format": "misp_standard", } moduleinfo = { "version": "0.2", "author": "Jason Zhang", "description": "Enrich a file or URL with VMware NSX Defender", "module-type": ["expansion", "hover"], } moduleconfig = [ "analysis_url", # optional, defaults to hard-coded values "analysis_verify_ssl", # optional, defaults to True "analysis_key", # required "analysis_api_token", # required "vt_key", # optional "misp_url", # optional "misp_verify_ssl", # optional, defaults to True "misp_key", # optional ] DEFAULT_ZIP_PASSWORD = b"infected" DEFAULT_ENDPOINT = tau_clients.NSX_DEFENDER_DC_WESTUS WORKFLOW_COMPLETE_TAG = "workflow:state='complete'" WORKFLOW_INCOMPLETE_TAG = "workflow:state='incomplete'" VT_DOWNLOAD_TAG = "vt:download" GALAXY_ATTACK_PATTERNS_UUID = "c4e851fa-775f-11e7-8163-b774922098cd" class ResultParser: """This is a parser to extract *basic* information from a result dictionary.""" def __init__(self, techniques_galaxy: Optional[Dict[str, str]] = None): """Constructor.""" self.techniques_galaxy = techniques_galaxy or {} def parse(self, analysis_link: str, result: Dict[str, Any]) -> pymisp.MISPEvent: """ Parse the analysis result into a MISP event. :param str analysis_link: the analysis link :param dict[str, any] result: the JSON returned by the analysis client. :rtype: pymisp.MISPEvent :return: a MISP event """ misp_event = pymisp.MISPEvent() # Add analysis subject info if "url" in result["analysis_subject"]: o = pymisp.MISPObject("url") o.add_attribute("url", result["analysis_subject"]["url"]) else: o = pymisp.MISPObject("file") o.add_attribute("md5", type="md5", value=result["analysis_subject"]["md5"]) o.add_attribute("sha1", type="sha1", value=result["analysis_subject"]["sha1"]) o.add_attribute("sha256", type="sha256", value=result["analysis_subject"]["sha256"]) o.add_attribute( "mimetype", category="Payload delivery", type="mime-type", value=result["analysis_subject"]["mime_type"] ) misp_event.add_object(o) # Add HTTP requests from url analyses network_dict = result.get("report", {}).get("analysis", {}).get("network", {}) for request in network_dict.get("requests", []): parsed_uri = parse.urlparse(request["url"]) o = pymisp.MISPObject(name="http-request") o.add_attribute("host", parsed_uri.netloc) o.add_attribute("method", "GET") o.add_attribute("uri", request["url"]) o.add_attribute("ip-dst", request["ip"]) misp_event.add_object(o) # Add network behaviors from files for subject in result.get("report", {}).get("analysis_subjects", []): # Add DNS requests for dns_query in subject.get("dns_queries", []): hostname = dns_query.get("hostname") # Skip if it is an IP address try: if hostname == "wpad" or hostname == "localhost": continue # Invalid hostname, e.g., hostname: '2.2.0.10.in-addr.arpa. if hostname[-1] == ".": continue _ = ipaddress.ip_address(hostname) continue except ValueError: pass o = pymisp.MISPObject(name="domain-ip") o.add_attribute("hostname", type="hostname", value=hostname) for ip in dns_query.get("results", []): o.add_attribute("ip", type="ip-dst", value=ip) misp_event.add_object(o) # Add HTTP conversations (as network connection and as http request) for http_conversation in subject.get("http_conversations", []): o = pymisp.MISPObject(name="network-connection") o.add_attribute("ip-src", http_conversation["src_ip"]) o.add_attribute("ip-dst", http_conversation["dst_ip"]) o.add_attribute("src-port", http_conversation["src_port"]) o.add_attribute("dst-port", http_conversation["dst_port"]) o.add_attribute("hostname-dst", http_conversation["dst_host"]) o.add_attribute("layer3-protocol", "IP") o.add_attribute("layer4-protocol", "TCP") o.add_attribute("layer7-protocol", "HTTP") misp_event.add_object(o) method, path, http_version = http_conversation["url"].split(" ") if http_conversation["dst_port"] == 80: uri = "http://{}{}".format(http_conversation["dst_host"], path) else: uri = "http://{}:{}{}".format( http_conversation["dst_host"], http_conversation["dst_port"], path ) o = pymisp.MISPObject(name="http-request") o.add_attribute("host", http_conversation["dst_host"]) o.add_attribute("method", method) o.add_attribute("uri", uri) o.add_attribute("ip-dst", http_conversation["dst_ip"]) misp_event.add_object(o) # Add sandbox info like score and sandbox type o = pymisp.MISPObject(name="sandbox-report") sandbox_type = "saas" if tau_clients.is_task_hosted(analysis_link) else "on-premise" o.add_attribute("score", result["score"]) o.add_attribute("sandbox-type", sandbox_type) o.add_attribute("{}-sandbox".format(sandbox_type), "vmware-nsx-defender") o.add_attribute("permalink", analysis_link) misp_event.add_object(o) # Add behaviors o = pymisp.MISPObject(name="sb-signature") o.add_attribute("software", "VMware NSX Defender") for activity in result.get("malicious_activity", []): a = pymisp.MISPAttribute() a.from_dict(type="text", value=activity) o.add_attribute("signature", **a) misp_event.add_object(o) # Add mitre techniques for techniques in result.get("activity_to_mitre_techniques", {}).values(): for technique in techniques: for misp_technique_id, misp_technique_name in self.techniques_galaxy.items(): if technique["id"].casefold() in misp_technique_id.casefold(): # If report details a sub-technique, trust the match # Otherwise trust it only if the MISP technique is not a sub-technique if "." in technique["id"] or "." not in misp_technique_id: misp_event.add_tag(misp_technique_name) break return misp_event def _parse_submission_response(response: Dict[str, Any]) -> Tuple[str, List[str]]: """ Parse the response from "submit_*" methods. :param dict[str, any] response: the client response :rtype: tuple(str, list[str]) :return: the task_uuid and whether the analysis is available :raises ValueError: in case of any error """ task_uuid = response.get("task_uuid") if not task_uuid: raise ValueError("Submission failed, unable to process the data") if response.get("score") is not None: tags = [WORKFLOW_COMPLETE_TAG] else: tags = [WORKFLOW_INCOMPLETE_TAG] return task_uuid, tags def _unzip(zipped_data: bytes, password: bytes = DEFAULT_ZIP_PASSWORD) -> bytes: """ Unzip the data. :param bytes zipped_data: the zipped data :param bytes password: the password :rtype: bytes :return: the unzipped data :raises ValueError: in case of any error """ try: data_file_object = io.BytesIO(zipped_data) with zipfile.ZipFile(data_file_object) as zip_file: sample_hash_name = zip_file.namelist()[0] return zip_file.read(sample_hash_name, password) except (IOError, ValueError) as e: raise ValueError(str(e)) def _download_from_vt(client: vt.Client, file_hash: str) -> bytes: """ Download file from VT. :param vt.Client client: the VT client :param str file_hash: the file hash :rtype: bytes :return: the downloaded data :raises ValueError: in case of any error """ try: buffer = io.BytesIO() client.download_file(file_hash, buffer) buffer.seek(0, 0) return buffer.read() except (IOError, vt.APIError) as e: raise ValueError(str(e)) finally: # vt.Client likes to free resources at shutdown, and it can be used as context to ease that # Since the structure of the module does not play well with how MISP modules are organized # let's play nice and close connections pro-actively (opened by "download_file") if client: client.close() def _get_analysis_tags( clients: Dict[str, nsx_defender.AnalysisClient], task_uuid: str, ) -> List[str]: """ Get the analysis tags of a task. :param dict[str, nsx_defender.AnalysisClient] clients: the analysis clients :param str task_uuid: the task uuid :rtype: list[str] :return: the analysis tags :raises exceptions.ApiError: in case of client errors :raises exceptions.CommunicationError: in case of client communication errors """ client = clients[DEFAULT_ENDPOINT] response = client.get_analysis_tags(task_uuid) tags = set([]) for tag in response.get("analysis_tags", []): tag_header = None tag_type = tag["data"]["type"] if tag_type == "av_family": tag_header = "av-fam" elif tag_type == "av_class": tag_header = "av-cls" elif tag_type == "lastline_malware": tag_header = "nsx" if tag_header: tags.add("{}:{}".format(tag_header, tag["data"]["value"])) return sorted(tags) def _get_latest_analysis( clients: Dict[str, nsx_defender.AnalysisClient], file_hash: str, ) -> Optional[str]: """ Get the latest analysis. :param dict[str, nsx_defender.AnalysisClient] clients: the analysis clients :param str file_hash: the hash of the file :rtype: str|None :return: the task uuid if present, None otherwise :raises exceptions.ApiError: in case of client errors :raises exceptions.CommunicationError: in case of client communication errors """ def _parse_expiration(task_info: Dict[str, str]) -> datetime.datetime: """ Parse expiration time of a task :param dict[str, str] task_info: the task :rtype: datetime.datetime :return: the parsed datetime object """ return datetime.datetime.strptime(task_info["expires"], "%Y-%m-%d %H:%M:%S") results = [] for data_center, client in clients.items(): response = client.query_file_hash(file_hash=file_hash) for task in response.get("tasks", []): results.append(task) if results: return sorted(results, key=_parse_expiration)[-1]["task_uuid"] else: return None def _get_mitre_techniques_galaxy(misp_client: pymisp.PyMISP) -> Dict[str, str]: """ Get all the MITRE techniques from the MISP galaxy. :param pymisp.PyMISP misp_client: the MISP client :rtype: dict[str, str] :return: all techniques indexed by their id """ galaxy_attack_patterns = misp_client.get_galaxy( galaxy=GALAXY_ATTACK_PATTERNS_UUID, withCluster=True, pythonify=True, ) ret = {} for cluster in galaxy_attack_patterns.clusters: ret[cluster.value] = cluster.tag_name return ret def introspection() -> Dict[str, Union[str, List[str]]]: """ Implement interface. :return: the supported MISP attributes :rtype: dict[str, list[str]] """ return mispattributes def version() -> Dict[str, Union[str, List[str]]]: """ Implement interface. :return: the module config inside another dictionary :rtype: dict[str, list[str]] """ moduleinfo["config"] = moduleconfig return moduleinfo def handler(q: Union[bool, str] = False) -> Union[bool, Dict[str, Any]]: """ Implement interface. :param bool|str q: the input received :rtype: bool|dict[str, any] """ if q is False: return False request = json.loads(q) config = request.get("config", {}) # Load the client to connect to VMware NSX ATA (hard-fail) try: analysis_url = config.get("analysis_url") login_params = { "key": config["analysis_key"], "api_token": config["analysis_api_token"], } # If 'analysis_url' is specified we are connecting on-premise if analysis_url: analysis_clients = { DEFAULT_ENDPOINT: nsx_defender.AnalysisClient( api_url=analysis_url, login_params=login_params, verify_ssl=bool(config.get("analysis_verify_ssl", True)), ) } logger.info("Connected NSX AnalysisClient to on-premise infrastructure") else: analysis_clients = { data_center: nsx_defender.AnalysisClient( api_url=tau_clients.NSX_DEFENDER_ANALYSIS_URLS[data_center], login_params=login_params, verify_ssl=bool(config.get("analysis_verify_ssl", True)), ) for data_center in [ tau_clients.NSX_DEFENDER_DC_WESTUS, tau_clients.NSX_DEFENDER_DC_NLEMEA, ] } logger.info("Connected NSX AnalysisClient to hosted infrastructure") except KeyError as ke: logger.error("Integration with VMware NSX ATA failed to connect: %s", str(ke)) return {"error": "Error connecting to VMware NSX ATA: {}".format(ke)} # Load the client to connect to MISP (soft-fail) try: misp_client = pymisp.PyMISP( url=config["misp_url"], key=config["misp_key"], ssl=bool(config.get("misp_verify_ssl", True)), ) except (KeyError, pymisp.PyMISPError): logger.error("Integration with pyMISP disabled: no MITRE techniques tags") misp_client = None # Load the client to connect to VT (soft-fail) try: vt_client = vt.Client(apikey=config["vt_key"]) except (KeyError, ValueError): logger.error("Integration with VT disabled: no automatic download of samples") vt_client = None # Decode and issue the request try: if request["attribute"]["type"] == "url": sample_url = request["attribute"]["value"] response = analysis_clients[DEFAULT_ENDPOINT].submit_url(sample_url) task_uuid, tags = _parse_submission_response(response) else: if request["attribute"]["type"] == "malware-sample": # Raise TypeError file_data = _unzip(base64.b64decode(request["attribute"]["data"])) file_name = request["attribute"]["value"].split("|", 1)[0] hash_value = hashlib.sha1(file_data).hexdigest() elif request["attribute"]["type"] == "attachment": # Raise TypeError file_data = base64.b64decode(request["attribute"]["data"]) file_name = request["attribute"].get("value") hash_value = hashlib.sha1(file_data).hexdigest() else: hash_value = request["attribute"]["value"] file_data = None file_name = "{}.bin".format(hash_value) # Check whether we have a task for that file tags = [] task_uuid = _get_latest_analysis(analysis_clients, hash_value) if not task_uuid: # If we have no analysis, download the sample from VT if not file_data: if not vt_client: raise ValueError("No file available locally and VT is disabled") file_data = _download_from_vt(vt_client, hash_value) tags.append(VT_DOWNLOAD_TAG) # ... and submit it (_download_from_vt fails if no sample availabe) response = analysis_clients[DEFAULT_ENDPOINT].submit_file(file_data, file_name) task_uuid, _tags = _parse_submission_response(response) tags.extend(_tags) except KeyError as e: logger.error("Error parsing input: %s", request["attribute"]) return {"error": "Error parsing input: {}".format(e)} except TypeError as e: logger.error("Error decoding input: %s", request["attribute"]) return {"error": "Error decoding input: {}".format(e)} except ValueError as e: logger.error("Error processing input: %s", request["attribute"]) return {"error": "Error processing input: {}".format(e)} except (exceptions.CommunicationError, exceptions.ApiError) as e: logger.error("Error issuing API call: %s", str(e)) return {"error": "Error issuing API call: {}".format(e)} else: analysis_link = tau_clients.get_task_link( uuid=task_uuid, analysis_url=analysis_clients[DEFAULT_ENDPOINT].base, prefer_load_balancer=True, ) # Return partial results if the analysis has yet to terminate try: tags.extend(_get_analysis_tags(analysis_clients, task_uuid)) report = analysis_clients[DEFAULT_ENDPOINT].get_result(task_uuid) except (exceptions.CommunicationError, exceptions.ApiError) as e: logger.error("Error retrieving the report: %s", str(e)) return { "results": { "types": "link", "categories": ["External analysis"], "values": analysis_link, "tags": tags, } } # Return the enrichment try: techniques_galaxy = None if misp_client: techniques_galaxy = _get_mitre_techniques_galaxy(misp_client) result_parser = ResultParser(techniques_galaxy=techniques_galaxy) misp_event = result_parser.parse(analysis_link, report) for tag in tags: if tag not in frozenset([WORKFLOW_COMPLETE_TAG]): misp_event.add_tag(tag) return { "results": { key: json.loads(misp_event.to_json())[key] for key in ("Attribute", "Object", "Tag") if (key in misp_event and misp_event[key]) } } except pymisp.PyMISPError as e: logger.error("Error parsing the report: %s", str(e)) return {"error": "Error parsing the report: {}".format(e)} def main(): """Main function used to test basic functionalities of the module.""" parser = argparse.ArgumentParser() parser.add_argument( "-c", "--config-file", dest="config_file", required=True, help="the configuration file used for testing", ) parser.add_argument( "-t", "--test-attachment", dest="test_attachment", default=None, help="the path to a test attachment", ) args = parser.parse_args() conf = configparser.ConfigParser() conf.read(args.config_file) config = { "analysis_verify_ssl": conf.getboolean("analysis", "analysis_verify_ssl"), "analysis_key": conf.get("analysis", "analysis_key"), "analysis_api_token": conf.get("analysis", "analysis_api_token"), "vt_key": conf.get("vt", "vt_key"), "misp_url": conf.get("misp", "misp_url"), "misp_verify_ssl": conf.getboolean("misp", "misp_verify_ssl"), "misp_key": conf.get("misp", "misp_key"), } # TEST 1: submit a URL j = json.dumps( { "config": config, "attribute": { "type": "url", "value": "https://www.google.com", } } ) print(json.dumps(handler(j), indent=4, sort_keys=True)) # TEST 2: submit a file attachment if args.test_attachment: with open(args.test_attachment, "rb") as f: data = f.read() j = json.dumps( { "config": config, "attribute": { "type": "attachment", "value": "test.docx", "data": base64.b64encode(data).decode("utf-8"), } } ) print(json.dumps(handler(j), indent=4, sort_keys=True)) # TEST 3: submit a file hash that is known by NSX ATA j = json.dumps( { "config": config, "attribute": { "type": "md5", "value": "002c56165a0e78369d0e1023ce044bf0", } } ) print(json.dumps(handler(j), indent=4, sort_keys=True)) # TEST 4 : submit a file hash that is NOT known byt NSX ATA j = json.dumps( { "config": config, "attribute": { "type": "sha1", "value": "2aac25ecdccf87abf6f1651ef2ffb30fcf732250", } } ) print(json.dumps(handler(j), indent=4, sort_keys=True)) return 0 if __name__ == "__main__": sys.exit(main())