2024-08-05 12:58:19 +02:00
import json
from pymisp import MISPAttribute , MISPEvent
from urllib . parse import urlparse
import logging
import vysion . client as vysion
import vysion . dto as dto
from vysion . dto . util import MISPProcessor
2024-08-08 15:40:25 +02:00
from . import standard_error_message
2024-08-05 12:58:19 +02:00
misperrors = { " error " : " Error " }
mispattributes = {
" input " : [
" email " ,
" domain " ,
" hostname " ,
" url " ,
" text " ,
" btc " ,
" phone-number " ,
" target-org " ,
2024-08-07 15:53:15 +02:00
" xmr " ,
" dash " ,
2024-08-05 12:58:19 +02:00
] ,
" format " : " misp_standard " ,
}
# possible module-types: 'expansion', 'hover' or both
moduleinfo = {
2024-08-12 11:23:10 +02:00
' version ' : ' 1 ' ,
' author ' : ' Byron Labs ' ,
' description ' : ' Module to enrich the information by making use of the Vysion API. ' ,
' module-type ' : [ ' expansion ' ] ,
' name ' : ' Vysion Enrich ' ,
' logo ' : ' vysion.png ' ,
' requirements ' : [ ' Vysion python library ' , ' Vysion API Key ' ] ,
' features ' : " This module gets correlated information from Byron Labs ' dark web intelligence database. With this you will get several objects containing information related to, for example, an organization victim of a ransomware attack. " ,
' references ' : [ ' https://vysion.ai/ ' , ' https://developers.vysion.ai/ ' , ' https://github.com/ByronLabs/vysion-cti/tree/main ' ] ,
' input ' : ' company(target-org), country, info, BTC, XMR and DASH address. ' ,
' output ' : ' MISP objects containing title, link to our webapp and TOR, i2p or clearnet URLs. ' ,
2024-08-05 12:58:19 +02:00
}
# config fields that your code expects from the site admin
moduleconfig = [
" apikey " ,
" event_limit " ,
" proxy_host " ,
" proxy_port " ,
" proxy_username " ,
" proxy_password " ,
]
LOGGER = logging . getLogger ( " vysion " )
LOGGER . setLevel ( logging . INFO )
2024-08-12 11:43:25 +02:00
LOGGER . debug ( " Starting Vysion " )
2024-08-05 12:58:19 +02:00
DEFAULT_RESULTS_LIMIT = 10
def get_proxy_settings ( config : dict ) - > dict :
""" Returns proxy settings in the requests format.
If no proxy settings are set , return None . """
proxies = None
host = config . get ( " proxy_host " )
port = config . get ( " proxy_port " )
username = config . get ( " proxy_username " )
password = config . get ( " proxy_password " )
if host :
if not port :
misperrors [ " error " ] = (
" The vysion_proxy_host config is set, "
" please also set the vysion_proxy_port. "
)
raise KeyError
parsed = urlparse ( host )
if " http " in parsed . scheme :
scheme = " http "
else :
scheme = parsed . scheme
netloc = parsed . netloc
host = f " { netloc } : { port } "
if username :
if not password :
misperrors [ " error " ] = (
" The vysion_proxy_username config is set, "
" please also set the vysion_proxy_password. "
)
raise KeyError
auth = f " { username } : { password } "
host = auth + " @ " + host
proxies = { " http " : f " { scheme } :// { host } " , " https " : f " { scheme } :// { host } " }
return proxies
def parse_error ( status_code : int ) - > str :
status_mapping = {
500 : " Vysion is blind. " ,
400 : " Incorrect request, please check the arguments. " ,
403 : " You don ' t have enough privileges to make the request. " ,
}
if status_code in status_mapping :
return status_mapping [ status_code ]
return " Vysion may not be accessible. "
def handler ( q = False ) :
if q is False :
return False
request = json . loads ( q )
if not request . get ( " config " ) or not request [ " config " ] . get ( " apikey " ) :
misperrors [ " error " ] = " A Vysion api key is required for this module. "
return misperrors
if not request . get ( " attribute " ) :
return {
" error " : f " { standard_error_message } , which should contain at least a type, a value and an uuid. "
}
if request [ " attribute " ] [ " type " ] not in mispattributes [ " input " ] :
return { " error " : " Unsupported attribute type. " }
# event_limit = request["config"].get("event_limit")
attribute = request [ " attribute " ]
proxy_settings = get_proxy_settings ( request . get ( " config " ) )
try :
client = vysion . Client (
api_key = request [ " config " ] [ " apikey " ] ,
headers = {
" x-tool " : " MISPModuleVysionExpansion " ,
} ,
proxy = proxy_settings [ " http " ] if proxy_settings else None ,
)
LOGGER . debug ( attribute )
misp_attribute = MISPAttribute ( )
misp_attribute . from_dict ( * * attribute )
attribute_type = misp_attribute . type
attribute_value = misp_attribute . value
# https://www.misp-project.org/datamodels/#types
LOGGER . debug ( attribute_type )
result = None
if attribute_type == " email " :
result = client . find_email ( attribute_value )
elif attribute_type == " domain " :
2024-08-12 11:23:10 +02:00
result = client . find_url ( attribute_value )
2024-08-05 12:58:19 +02:00
elif attribute_type == " url " :
result = client . find_url ( attribute_value )
elif attribute_type == " text " :
result = client . search ( attribute_value )
elif attribute_type == " target-org " :
result = client . search ( attribute_value )
elif attribute_type == " phone-number " :
result = client . search ( attribute_value )
2024-08-07 15:45:55 +02:00
elif attribute_type == " btc " :
result = client . find_wallet ( " BTC " , attribute_value )
elif attribute_type == " xmr " :
result = client . find_wallet ( " XMR " , attribute_value )
elif attribute_type == " dash " :
result = client . find_wallet ( " DASH " , attribute_value )
2024-08-05 12:58:19 +02:00
if result is None :
return { " results " : { } }
p = MISPProcessor ( )
misp_event : MISPEvent = p . process ( result , ref_attribute = misp_attribute )
LOGGER . info ( " Vysion client initialized " )
LOGGER . info ( " Vysion result obtained " )
return {
" results " : {
" Object " : [
json . loads ( object . to_json ( ) ) for object in misp_event . objects
] ,
" Attribute " : [
json . loads ( attribute . to_json ( ) )
for attribute in misp_event . attributes
2024-08-12 11:23:10 +02:00
] ,
2024-08-05 12:58:19 +02:00
" Tag " : [
json . loads ( tag . to_json ( ) )
for tag in misp_event . tags
]
}
}
except vysion . APIError as ex :
LOGGER . error ( " Error in Vysion " )
LOGGER . error ( ex )
misperrors [ " error " ] = ex . message
return misperrors
def introspection ( ) :
return mispattributes
def version ( ) :
moduleinfo [ " config " ] = moduleconfig
return moduleinfo