misp-modules/misp_modules/modules/expansion/vysion.py

212 lines
5.7 KiB
Python

import json
from pymisp import MISPAttribute, MISPEvent
from urllib.parse import urlparse
import logging
import vysion.client as vysion
import vysion.dto as dto
from vysion.dto.util import MISPProcessor
misperrors = {"error": "Error"}
mispattributes = {
"input": [
"email",
"domain",
"hostname",
"url",
"text",
"btc",
"phone-number",
"target-org",
],
"format": "misp_standard",
}
# possible module-types: 'expansion', 'hover' or both
moduleinfo = {
"version": "1",
"author": "Byron Labs",
"description": "Enrich observables with the Vysion API",
"module-type": ["expansion"],
}
# config fields that your code expects from the site admin
moduleconfig = [
"apikey",
"event_limit",
"proxy_host",
"proxy_port",
"proxy_username",
"proxy_password",
]
LOGGER = logging.getLogger("vysion")
LOGGER.setLevel(logging.INFO)
LOGGER.info("Starting Vysion")
DEFAULT_RESULTS_LIMIT = 10
def get_proxy_settings(config: dict) -> dict:
"""Returns proxy settings in the requests format.
If no proxy settings are set, return None."""
proxies = None
host = config.get("proxy_host")
port = config.get("proxy_port")
username = config.get("proxy_username")
password = config.get("proxy_password")
if host:
if not port:
misperrors["error"] = (
"The vysion_proxy_host config is set, "
"please also set the vysion_proxy_port."
)
raise KeyError
parsed = urlparse(host)
if "http" in parsed.scheme:
scheme = "http"
else:
scheme = parsed.scheme
netloc = parsed.netloc
host = f"{netloc}:{port}"
if username:
if not password:
misperrors["error"] = (
"The vysion_proxy_username config is set, "
"please also set the vysion_proxy_password."
)
raise KeyError
auth = f"{username}:{password}"
host = auth + "@" + host
proxies = {"http": f"{scheme}://{host}", "https": f"{scheme}://{host}"}
return proxies
def parse_error(status_code: int) -> str:
status_mapping = {
500: "Vysion is blind.",
400: "Incorrect request, please check the arguments.",
403: "You don't have enough privileges to make the request.",
}
if status_code in status_mapping:
return status_mapping[status_code]
return "Vysion may not be accessible."
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get("config") or not request["config"].get("apikey"):
misperrors["error"] = "A Vysion api key is required for this module."
return misperrors
if not request.get("attribute"):
return {
"error": "The request is missing required attribute information, which should contain at least a type, a value, and a UUID."
}
if request["attribute"]["type"] not in mispattributes["input"]:
return {"error": "Unsupported attribute type."}
# event_limit = request["config"].get("event_limit")
attribute = request["attribute"]
proxy_settings = get_proxy_settings(request.get("config"))
try:
client = vysion.Client(
api_key=request["config"]["apikey"],
headers={
"x-tool": "MISPModuleVysionExpansion",
},
proxy=proxy_settings["http"] if proxy_settings else None,
)
LOGGER.debug(attribute)
misp_attribute = MISPAttribute()
misp_attribute.from_dict(**attribute)
attribute_type = misp_attribute.type
attribute_value = misp_attribute.value
# https://www.misp-project.org/datamodels/#types
LOGGER.debug(attribute_type)
result = None
if attribute_type == "email":
result = client.find_email(attribute_value)
elif attribute_type == "domain":
result = client.search(attribute_value)
elif attribute_type == "url":
result = client.search(
attribute_value
) # TODO result = client.find_url(attribute_value)
elif attribute_type == "text":
result = client.search(attribute_value)
elif attribute_type == "target-org":
result = client.search(attribute_value, exact=True)
elif attribute_type == "btc":
result = client.search(attribute_value) # TODO
elif attribute_type == "phone-number":
result = client.search(attribute_value) # TODO
if result is None:
return {"results": {}}
elif isinstance(result, dto.VysionError):
LOGGER.error(str(result))
return {"results": {}}
p = MISPProcessor()
misp_event: MISPEvent = p.process(result, ref_attribute=misp_attribute)
LOGGER.info("Vysion client initialized")
LOGGER.info("Vysion result obtained")
return {
"results": {
"Object": [
json.loads(object.to_json()) for object in misp_event.objects
],
"Attribute": [
json.loads(attribute.to_json())
for attribute in misp_event.attributes
],
"Tag": [
json.loads(tag.to_json())
for tag in misp_event.tags
]
}
}
except vysion.APIError as ex:
LOGGER.error("Error in Vysion")
LOGGER.error(ex)
misperrors["error"] = ex.message
return misperrors
def introspection():
return mispattributes
def version():
moduleinfo["config"] = moduleconfig
return moduleinfo