mirror of https://github.com/MISP/misp-modules
181 lines
5.4 KiB
Python
181 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Deprecation notice: this module will be deprecated by December 2021, please use vmware_nsx module.
|
|
|
|
Module (type "expansion") to submit files and URLs to Lastline for analysis.
|
|
"""
|
|
import base64
|
|
import io
|
|
import json
|
|
import zipfile
|
|
|
|
import lastline_api
|
|
|
|
|
|
misperrors = {
|
|
"error": "Error",
|
|
}
|
|
|
|
mispattributes = {
|
|
"input": [
|
|
"attachment",
|
|
"malware-sample",
|
|
"url",
|
|
],
|
|
"output": [
|
|
"link",
|
|
],
|
|
}
|
|
|
|
moduleinfo = {
|
|
'version': '0.1',
|
|
'author': 'Stefano Ortolani',
|
|
'description': 'Deprecation notice: this module will be deprecated by December 2021, please use vmware_nsx module.\n\nModule to submit a file or URL to Lastline.',
|
|
'module-type': ['expansion', 'hover'],
|
|
'name': 'Lastline Submit',
|
|
'logo': 'lastline.png',
|
|
'requirements': [],
|
|
'features': 'The module requires a Lastline Analysis `api_token` and `key`.\nWhen the analysis is completed, it is possible to import the generated report by feeding the analysis link to the [lastline_query](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/lastline_query.py) module.',
|
|
'references': ['https://www.lastline.com'],
|
|
'input': 'File or URL to submit to Lastline.',
|
|
'output': 'Link to the report generated by Lastline.',
|
|
}
|
|
|
|
moduleconfig = [
|
|
"url",
|
|
"api_token",
|
|
"key",
|
|
]
|
|
|
|
|
|
DEFAULT_ZIP_PASSWORD = b"infected"
|
|
|
|
|
|
def __unzip(zipped_data, password=None):
|
|
data_file_object = io.BytesIO(zipped_data)
|
|
with zipfile.ZipFile(data_file_object) as zip_file:
|
|
sample_hashname = zip_file.namelist()[0]
|
|
data_zipped = zip_file.read(sample_hashname, password)
|
|
return data_zipped
|
|
|
|
|
|
def __str_to_bool(x):
|
|
return x in ("True", "true", True)
|
|
|
|
|
|
def introspection():
|
|
return mispattributes
|
|
|
|
|
|
def version():
|
|
moduleinfo["config"] = moduleconfig
|
|
return moduleinfo
|
|
|
|
|
|
def handler(q=False):
|
|
if q is False:
|
|
return False
|
|
|
|
request = json.loads(q)
|
|
|
|
# Parse the init parameters
|
|
try:
|
|
config = request.get("config", {})
|
|
auth_data = lastline_api.LastlineAbstractClient.get_login_params_from_dict(config)
|
|
api_url = config.get("url", lastline_api.DEFAULT_LL_ANALYSIS_API_URL)
|
|
except Exception as e:
|
|
misperrors["error"] = "Error parsing configuration: {}".format(e)
|
|
return misperrors
|
|
|
|
# Parse the call parameters
|
|
try:
|
|
call_args = {}
|
|
if "url" in request:
|
|
# URLs are text strings
|
|
api_method = lastline_api.AnalysisClient.submit_url
|
|
call_args["url"] = request.get("url")
|
|
else:
|
|
data = request.get("data")
|
|
# Malware samples are zip-encrypted and then base64 encoded
|
|
if "malware-sample" in request:
|
|
api_method = lastline_api.AnalysisClient.submit_file
|
|
call_args["file_data"] = __unzip(base64.b64decode(data), DEFAULT_ZIP_PASSWORD)
|
|
call_args["file_name"] = request.get("malware-sample").split("|", 1)[0]
|
|
call_args["password"] = DEFAULT_ZIP_PASSWORD
|
|
# Attachments are just base64 encoded
|
|
elif "attachment" in request:
|
|
api_method = lastline_api.AnalysisClient.submit_file
|
|
call_args["file_data"] = base64.b64decode(data)
|
|
call_args["file_name"] = request.get("attachment")
|
|
|
|
else:
|
|
raise ValueError("Input parameters do not specify either an URL or a file")
|
|
|
|
except Exception as e:
|
|
misperrors["error"] = "Error processing input parameters: {}".format(e)
|
|
return misperrors
|
|
|
|
# Make the API call
|
|
try:
|
|
api_client = lastline_api.AnalysisClient(api_url, auth_data)
|
|
response = api_method(api_client, **call_args)
|
|
task_uuid = response.get("task_uuid")
|
|
if not task_uuid:
|
|
raise ValueError("Unable to process returned data")
|
|
if response.get("score") is not None:
|
|
tags = ["workflow:state='complete'"]
|
|
else:
|
|
tags = ["workflow:state='incomplete'"]
|
|
|
|
except Exception as e:
|
|
misperrors["error"] = "Error issuing the API call: {}".format(e)
|
|
return misperrors
|
|
|
|
# Assemble and return
|
|
analysis_link = lastline_api.get_task_link(task_uuid, analysis_url=api_url)
|
|
|
|
return {
|
|
"results": [
|
|
{
|
|
"types": "link",
|
|
"categories": ["External analysis"],
|
|
"values": analysis_link,
|
|
"tags": tags,
|
|
},
|
|
]
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
"""Test submitting a test subject to the Lastline backend."""
|
|
import argparse
|
|
import configparser
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-c", "--config-file", dest="config_file")
|
|
parser.add_argument("-s", "--section-name", dest="section_name")
|
|
args = parser.parse_args()
|
|
c = configparser.ConfigParser()
|
|
c.read(args.config_file)
|
|
a = lastline_api.LastlineAbstractClient.get_login_params_from_conf(c, args.section_name)
|
|
|
|
j = json.dumps(
|
|
{
|
|
"config": a,
|
|
"url": "https://www.google.exe.com",
|
|
}
|
|
)
|
|
print(json.dumps(handler(j), indent=4, sort_keys=True))
|
|
|
|
with open("./tests/test_files/test.docx", "rb") as f:
|
|
data = f.read()
|
|
|
|
j = json.dumps(
|
|
{
|
|
"config": a,
|
|
"data": base64.b64encode(data).decode("utf-8"),
|
|
"attachment": "test.docx",
|
|
}
|
|
)
|
|
print(json.dumps(handler(j), indent=4, sort_keys=True))
|