mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			149 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			149 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python3
 | 
						|
"""Python client library for VMRay REST API"""
 | 
						|
 | 
						|
import base64
 | 
						|
import datetime
 | 
						|
import os.path
 | 
						|
import requests
 | 
						|
import urllib.parse
 | 
						|
 | 
						|
# disable nasty certification warning
 | 
						|
# pylint: disable=no-member
 | 
						|
try:
 | 
						|
    requests.packages.urllib3.disable_warnings()
 | 
						|
except AttributeError:
 | 
						|
    try:
 | 
						|
        import urllib3
 | 
						|
        try:
 | 
						|
            urllib3.disable_warnings()
 | 
						|
        except AttributeError:
 | 
						|
            pass
 | 
						|
    except ImportError:
 | 
						|
        pass
 | 
						|
 | 
						|
# pylint: disable=
 | 
						|
 | 
						|
 | 
						|
class VMRayRESTAPIError(Exception):
 | 
						|
    """Exception class that is used when API returns an error"""
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        self.status_code = kwargs.pop("status_code", None)
 | 
						|
        Exception.__init__(self, *args, **kwargs)
 | 
						|
 | 
						|
 | 
						|
def handle_rest_api_result(result):
 | 
						|
    """Handle result of API request (check for errors)"""
 | 
						|
 | 
						|
    if (result.status_code < 200) or (result.status_code > 299):
 | 
						|
        try:
 | 
						|
            json_result = result.json()
 | 
						|
        except ValueError:
 | 
						|
            raise VMRayRESTAPIError("API returned error %u: %s" % (result.status_code, result.text), status_code=result.status_code)
 | 
						|
 | 
						|
        raise VMRayRESTAPIError(json_result.get("error_msg", "Unknown error"), status_code=result.status_code)
 | 
						|
 | 
						|
 | 
						|
class VMRayRESTAPI(object):
 | 
						|
    """VMRay REST API class"""
 | 
						|
 | 
						|
    def __init__(self, server, api_key, verify_cert=True):
 | 
						|
        # split server URL into components
 | 
						|
        url_desc = urllib.parse.urlsplit(server)
 | 
						|
 | 
						|
        # assume HTTPS if no scheme is specified
 | 
						|
        if url_desc.scheme == "":
 | 
						|
            server = "https://" + server
 | 
						|
 | 
						|
        # save variables
 | 
						|
        self.server = server
 | 
						|
        self.api_key = api_key
 | 
						|
        self.verify_cert = verify_cert
 | 
						|
 | 
						|
    def call(self, http_method, api_path, params=None, raw_data=False):
 | 
						|
        """Call VMRay REST API"""
 | 
						|
 | 
						|
        # get function of requests package
 | 
						|
        requests_func = getattr(requests, http_method.lower())
 | 
						|
 | 
						|
        # parse parameters
 | 
						|
        req_params = {}
 | 
						|
        file_params = {}
 | 
						|
 | 
						|
        if params is not None:
 | 
						|
            for key, value in params.items():
 | 
						|
                if isinstance(value, (datetime.date,
 | 
						|
                                      datetime.datetime,
 | 
						|
                                      float,
 | 
						|
                                      int)):
 | 
						|
                    req_params[key] = str(value)
 | 
						|
                elif isinstance(value, str):
 | 
						|
                    req_params[key] = str(value)
 | 
						|
                elif isinstance(value, dict):
 | 
						|
                    filename = value["filename"]
 | 
						|
                    sample = value["data"]
 | 
						|
                    file_params[key] = (filename, sample, "application/octet-stream")
 | 
						|
                elif hasattr(value, "read"):
 | 
						|
                    filename = os.path.split(value.name)[1]
 | 
						|
                    # For the following block refer to DEV-1820
 | 
						|
                    try:
 | 
						|
                        filename.decode("ASCII")
 | 
						|
                    except (UnicodeDecodeError, UnicodeEncodeError):
 | 
						|
                        b64_key = key + "name_b64enc"
 | 
						|
                        byte_value = filename.encode("utf-8")
 | 
						|
                        b64_value = base64.b64encode(byte_value)
 | 
						|
 | 
						|
                        filename = "@param=%s" % b64_key
 | 
						|
                        req_params[b64_key] = b64_value
 | 
						|
                    file_params[key] = (filename, value, "application/octet-stream")
 | 
						|
                else:
 | 
						|
                    raise VMRayRESTAPIError("Parameter \"%s\" has unknown type \"%s\"" % (key, type(value)))
 | 
						|
 | 
						|
        # construct request
 | 
						|
        if file_params:
 | 
						|
            files = file_params
 | 
						|
        else:
 | 
						|
            files = None
 | 
						|
 | 
						|
        # we need to adjust some stuff for POST requests
 | 
						|
        if http_method.lower() == "post":
 | 
						|
            req_data = req_params
 | 
						|
            req_params = None
 | 
						|
        else:
 | 
						|
            req_data = None
 | 
						|
 | 
						|
        # do request
 | 
						|
        result = requests_func(self.server + api_path, data=req_data, params=req_params, headers={"Authorization": "api_key " + self.api_key}, files=files, verify=self.verify_cert, stream=raw_data)
 | 
						|
        handle_rest_api_result(result)
 | 
						|
 | 
						|
        if raw_data:
 | 
						|
            return result.raw
 | 
						|
 | 
						|
        # parse result
 | 
						|
        try:
 | 
						|
            json_result = result.json()
 | 
						|
        except ValueError:
 | 
						|
            raise ValueError("API returned invalid JSON: %s" % (result.text))
 | 
						|
 | 
						|
        # if there are no cached elements then return the data
 | 
						|
        if "continuation_id" not in json_result:
 | 
						|
            return json_result.get("data", None)
 | 
						|
 | 
						|
        data = json_result["data"]
 | 
						|
 | 
						|
        # get cached results
 | 
						|
        while "continuation_id" in json_result:
 | 
						|
            # send request to server
 | 
						|
            result = requests.get("%s/rest/continuation/%u" % (self.server, json_result["continuation_id"]), headers={"Authorization": "api_key " + self.api_key}, verify=self.verify_cert)
 | 
						|
            handle_rest_api_result(result)
 | 
						|
 | 
						|
            # parse result
 | 
						|
            try:
 | 
						|
                json_result = result.json()
 | 
						|
            except ValueError:
 | 
						|
                raise ValueError("API returned invalid JSON: %s" % (result.text))
 | 
						|
 | 
						|
            data.extend(json_result["data"])
 | 
						|
 | 
						|
        return data
 |