#!/usr/bin/env python3 """ Deprecation notice: this module will be deprecated by December 2021, please use vmware_nsx module. Module (type "expansion") to submit files and URLs to Lastline for analysis. """ import base64 import io import json import zipfile import lastline_api misperrors = { "error": "Error", } mispattributes = { "input": [ "attachment", "malware-sample", "url", ], "output": [ "link", ], } moduleinfo = { 'version': '0.1', 'author': 'Stefano Ortolani', 'description': 'Deprecation notice: this module will be deprecated by December 2021, please use vmware_nsx module.\n\nModule to submit a file or URL to Lastline.', 'module-type': ['expansion', 'hover'], 'name': 'Lastline Submit', 'logo': 'lastline.png', 'requirements': [], 'features': 'The module requires a Lastline Analysis `api_token` and `key`.\nWhen the analysis is completed, it is possible to import the generated report by feeding the analysis link to the [lastline_query](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/lastline_query.py) module.', 'references': ['https://www.lastline.com'], 'input': 'File or URL to submit to Lastline.', 'output': 'Link to the report generated by Lastline.', } moduleconfig = [ "url", "api_token", "key", ] DEFAULT_ZIP_PASSWORD = b"infected" def __unzip(zipped_data, password=None): data_file_object = io.BytesIO(zipped_data) with zipfile.ZipFile(data_file_object) as zip_file: sample_hashname = zip_file.namelist()[0] data_zipped = zip_file.read(sample_hashname, password) return data_zipped def __str_to_bool(x): return x in ("True", "true", True) def introspection(): return mispattributes def version(): moduleinfo["config"] = moduleconfig return moduleinfo def handler(q=False): if q is False: return False request = json.loads(q) # Parse the init parameters try: config = request.get("config", {}) auth_data = lastline_api.LastlineAbstractClient.get_login_params_from_dict(config) api_url = config.get("url", lastline_api.DEFAULT_LL_ANALYSIS_API_URL) except Exception as e: misperrors["error"] = "Error parsing configuration: {}".format(e) return misperrors # Parse the call parameters try: call_args = {} if "url" in request: # URLs are text strings api_method = lastline_api.AnalysisClient.submit_url call_args["url"] = request.get("url") else: data = request.get("data") # Malware samples are zip-encrypted and then base64 encoded if "malware-sample" in request: api_method = lastline_api.AnalysisClient.submit_file call_args["file_data"] = __unzip(base64.b64decode(data), DEFAULT_ZIP_PASSWORD) call_args["file_name"] = request.get("malware-sample").split("|", 1)[0] call_args["password"] = DEFAULT_ZIP_PASSWORD # Attachments are just base64 encoded elif "attachment" in request: api_method = lastline_api.AnalysisClient.submit_file call_args["file_data"] = base64.b64decode(data) call_args["file_name"] = request.get("attachment") else: raise ValueError("Input parameters do not specify either an URL or a file") except Exception as e: misperrors["error"] = "Error processing input parameters: {}".format(e) return misperrors # Make the API call try: api_client = lastline_api.AnalysisClient(api_url, auth_data) response = api_method(api_client, **call_args) task_uuid = response.get("task_uuid") if not task_uuid: raise ValueError("Unable to process returned data") if response.get("score") is not None: tags = ["workflow:state='complete'"] else: tags = ["workflow:state='incomplete'"] except Exception as e: misperrors["error"] = "Error issuing the API call: {}".format(e) return misperrors # Assemble and return analysis_link = lastline_api.get_task_link(task_uuid, analysis_url=api_url) return { "results": [ { "types": "link", "categories": ["External analysis"], "values": analysis_link, "tags": tags, }, ] } if __name__ == "__main__": """Test submitting a test subject to the Lastline backend.""" import argparse import configparser parser = argparse.ArgumentParser() parser.add_argument("-c", "--config-file", dest="config_file") parser.add_argument("-s", "--section-name", dest="section_name") args = parser.parse_args() c = configparser.ConfigParser() c.read(args.config_file) a = lastline_api.LastlineAbstractClient.get_login_params_from_conf(c, args.section_name) j = json.dumps( { "config": a, "url": "https://www.google.exe.com", } ) print(json.dumps(handler(j), indent=4, sort_keys=True)) with open("./tests/test_files/test.docx", "rb") as f: data = f.read() j = json.dumps( { "config": a, "data": base64.b64encode(data).decode("utf-8"), "attachment": "test.docx", } ) print(json.dumps(handler(j), indent=4, sort_keys=True))