mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			152 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python3
 | 
						|
"""
 | 
						|
Module (type "import") to import a Lastline report from an analysis link.
 | 
						|
"""
 | 
						|
import json
 | 
						|
 | 
						|
import lastline_api
 | 
						|
 | 
						|
 | 
						|
misperrors = {
 | 
						|
    "error": "Error",
 | 
						|
}
 | 
						|
 | 
						|
userConfig = {
 | 
						|
    "analysis_link": {
 | 
						|
        "type": "String",
 | 
						|
        "errorMessage": "Expected analysis link",
 | 
						|
        "message": "The link to a Lastline analysis"
 | 
						|
    },
 | 
						|
}
 | 
						|
 | 
						|
inputSource = []
 | 
						|
 | 
						|
moduleinfo = {
 | 
						|
    "version": "0.1",
 | 
						|
    "author": "Stefano Ortolani",
 | 
						|
    "description": "Import a Lastline report from an analysis link.",
 | 
						|
    "module-type": ["import"]
 | 
						|
}
 | 
						|
 | 
						|
moduleconfig = [
 | 
						|
    "username",
 | 
						|
    "password",
 | 
						|
    "verify_ssl",
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def introspection():
 | 
						|
    modulesetup = {}
 | 
						|
    try:
 | 
						|
        userConfig
 | 
						|
        modulesetup["userConfig"] = userConfig
 | 
						|
    except NameError:
 | 
						|
        pass
 | 
						|
    try:
 | 
						|
        inputSource
 | 
						|
        modulesetup["inputSource"] = inputSource
 | 
						|
    except NameError:
 | 
						|
        pass
 | 
						|
    modulesetup["format"] = "misp_standard"
 | 
						|
    return modulesetup
 | 
						|
 | 
						|
 | 
						|
def version():
 | 
						|
    moduleinfo["config"] = moduleconfig
 | 
						|
    return moduleinfo
 | 
						|
 | 
						|
 | 
						|
def handler(q=False):
 | 
						|
    if q is False:
 | 
						|
        return False
 | 
						|
 | 
						|
    request = json.loads(q)
 | 
						|
 | 
						|
    # Parse the init parameters
 | 
						|
    try:
 | 
						|
        config = request["config"]
 | 
						|
        auth_data = lastline_api.LastlineAbstractClient.get_login_params_from_dict(config)
 | 
						|
        analysis_link = request["config"]["analysis_link"]
 | 
						|
        # The API url changes based on the analysis link host name
 | 
						|
        api_url = lastline_api.get_portal_url_from_task_link(analysis_link)
 | 
						|
    except Exception as e:
 | 
						|
        misperrors["error"] = "Error parsing configuration: {}".format(e)
 | 
						|
        return misperrors
 | 
						|
 | 
						|
    # Parse the call parameters
 | 
						|
    try:
 | 
						|
        task_uuid = lastline_api.get_uuid_from_task_link(analysis_link)
 | 
						|
    except (KeyError, ValueError) as e:
 | 
						|
        misperrors["error"] = "Error processing input parameters: {}".format(e)
 | 
						|
        return misperrors
 | 
						|
 | 
						|
    # Make the API calls
 | 
						|
    try:
 | 
						|
        api_client = lastline_api.PortalClient(api_url, auth_data, verify_ssl=config.get('verify_ssl', True).lower() in ("true"))
 | 
						|
        response = api_client.get_progress(task_uuid)
 | 
						|
        if response.get("completed") != 1:
 | 
						|
            raise ValueError("Analysis is not finished yet.")
 | 
						|
 | 
						|
        response = api_client.get_result(task_uuid)
 | 
						|
        if not response:
 | 
						|
            raise ValueError("Analysis report is empty.")
 | 
						|
 | 
						|
    except Exception as e:
 | 
						|
        misperrors["error"] = "Error issuing the API call: {}".format(e)
 | 
						|
        return misperrors
 | 
						|
 | 
						|
    # Parse and return
 | 
						|
    result_parser = lastline_api.LastlineResultBaseParser()
 | 
						|
    result_parser.parse(analysis_link, response)
 | 
						|
 | 
						|
    event = result_parser.misp_event
 | 
						|
    event_dictionary = json.loads(event.to_json())
 | 
						|
 | 
						|
    return {
 | 
						|
        "results": {
 | 
						|
            key: event_dictionary[key]
 | 
						|
            for key in ("Attribute", "Object", "Tag")
 | 
						|
            if (key in event and event[key])
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    """Test importing information from a Lastline analysis link."""
 | 
						|
    import argparse
 | 
						|
    import configparser
 | 
						|
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
    parser.add_argument("-c", "--config-file", dest="config_file")
 | 
						|
    parser.add_argument("-s", "--section-name", dest="section_name")
 | 
						|
    args = parser.parse_args()
 | 
						|
    c = configparser.ConfigParser()
 | 
						|
    c.read(args.config_file)
 | 
						|
    a = lastline_api.LastlineAbstractClient.get_login_params_from_conf(c, args.section_name)
 | 
						|
 | 
						|
    j = json.dumps(
 | 
						|
        {
 | 
						|
            "config": {
 | 
						|
                **a,
 | 
						|
                "analysis_link": (
 | 
						|
                    "https://user.lastline.com/portal#/analyst/task/"
 | 
						|
                    "1fcbcb8f7fb400100772d6a7b62f501b/overview"
 | 
						|
                )
 | 
						|
            }
 | 
						|
        }
 | 
						|
    )
 | 
						|
    print(json.dumps(handler(j), indent=4, sort_keys=True))
 | 
						|
 | 
						|
    j = json.dumps(
 | 
						|
        {
 | 
						|
            "config": {
 | 
						|
                **a,
 | 
						|
                "analysis_link": (
 | 
						|
                    "https://user.lastline.com/portal#/analyst/task/"
 | 
						|
                    "f3c0ae115d51001017ff8da768fa6049/overview"
 | 
						|
                )
 | 
						|
            }
 | 
						|
        }
 | 
						|
    )
 | 
						|
    print(json.dumps(handler(j), indent=4, sort_keys=True))
 |