2019-02-04 11:05:51 +01:00
import requests
import json
import sys
2020-07-28 11:47:53 +02:00
from . import check_input_attribute , standard_error_message
2019-11-05 16:43:03 +01:00
from collections import defaultdict
from pymisp import MISPAttribute , MISPEvent , MISPObject
from requests . auth import HTTPBasicAuth
2019-02-04 11:05:51 +01:00
sys . path . append ( ' ./ ' )
misperrors = { ' error ' : ' Error ' }
2019-11-05 16:43:03 +01:00
mispattributes = { ' input ' : [ ' ip-src ' , ' ip-dst ' , ' vulnerability ' , ' md5 ' , ' sha1 ' , ' sha256 ' , ' domain ' , ' hostname ' , ' url ' ] ,
' output ' : [ ' ip-src ' , ' ip-dst ' , ' text ' , ' domain ' ] ,
' format ' : ' misp_standard ' }
2019-02-04 11:05:51 +01:00
# possible module-types: 'expansion', 'hover' or both
2024-08-12 11:23:10 +02:00
moduleinfo = {
' version ' : ' 2 ' ,
' author ' : ' Joerg Stephan (@johest) ' ,
' description ' : ' An expansion module for IBM X-Force Exchange. ' ,
' module-type ' : [ ' expansion ' , ' hover ' ] ,
' name ' : ' IBM X-Force Exchange Lookup ' ,
' logo ' : ' xforce.png ' ,
' requirements ' : [ ' An access to the X-Force API (apikey) ' ] ,
' features ' : ' This module takes a MISP attribute as input to query the X-Force API. The API returns then additional information known in their threats data, that is mapped into MISP attributes. ' ,
' references ' : [ ' https://exchange.xforce.ibmcloud.com/ ' ] ,
' input ' : ' A MISP attribute included in the following list: \n - ip-src \n - ip-dst \n - vulnerability \n - md5 \n - sha1 \n - sha256 ' ,
' output ' : ' MISP attributes mapped from the result of the query on X-Force Exchange. ' ,
2019-02-04 11:05:51 +01:00
# config fields that your code expects from the site admin
2019-11-05 16:43:03 +01:00
moduleconfig = [ " apikey " , " apipassword " ]
class XforceExchange ( ) :
def __init__ ( self , attribute , apikey , apipassword ) :
self . base_url = " https://api.xforce.ibmcloud.com "
self . misp_event = MISPEvent ( )
self . attribute = MISPAttribute ( )
self . attribute . from_dict ( * * attribute )
self . _apikey = apikey
self . _apipassword = apipassword
self . result = { }
self . objects = defaultdict ( dict )
self . status_mapping = { 403 : " Access denied, please check if your authentication is valid and if you did not reach the limit of queries. " ,
404 : " No result found for your query. " }
def parse ( self ) :
mapping = { ' url ' : ' _parse_url ' , ' vulnerability ' : ' _parse_vulnerability ' }
mapping . update ( dict . fromkeys ( ( ' md5 ' , ' sha1 ' , ' sha256 ' ) , ' _parse_hash ' ) )
mapping . update ( dict . fromkeys ( ( ' domain ' , ' hostname ' ) , ' _parse_dns ' ) )
mapping . update ( dict . fromkeys ( ( ' ip-src ' , ' ip-dst ' ) , ' _parse_ip ' ) )
to_call = mapping [ self . attribute . type ]
getattr ( self , to_call ) ( self . attribute . value )
def get_result ( self ) :
if not self . misp_event . objects :
if ' error ' not in self . result :
self . result [ ' error ' ] = " No additional data found on Xforce Exchange. "
return self . result
self . misp_event . add_attribute ( * * self . attribute )
event = json . loads ( self . misp_event . to_json ( ) )
result = { key : event [ key ] for key in ( ' Attribute ' , ' Object ' ) if ( key in event and event [ key ] ) }
return { ' results ' : result }
def _api_call ( self , url ) :
try :
result = requests . get ( url , auth = HTTPBasicAuth ( self . _apikey , self . _apipassword ) )
except Exception as e :
self . result [ ' error ' ] = e
status_code = result . status_code
if status_code != 200 :
try :
self . result [ ' error ' ] = self . status_mapping [ status_code ]
except KeyError :
self . result [ ' error ' ] = ' An error with the API has occurred. '
return result . json ( )
def _create_file ( self , malware , relationship ) :
file_object = MISPObject ( ' file ' )
for key , relation in zip ( ( ' filepath ' , ' md5 ' ) , ( ' filename ' , ' md5 ' ) ) :
file_object . add_attribute ( relation , malware [ key ] )
file_object . add_reference ( self . attribute . uuid , relationship )
return file_object
def _create_url ( self , malware ) :
url_object = MISPObject ( ' url ' )
for key , relation in zip ( ( ' uri ' , ' domain ' ) , ( ' url ' , ' domain ' ) ) :
url_object . add_attribute ( relation , malware [ key ] )
attributes = tuple ( f ' { attribute . object_relation } _ { attribute . value } ' for attribute in url_object . attributes )
if attributes in self . objects [ ' url ' ] :
del url_object
return self . objects [ ' url ' ] [ attributes ]
url_uuid = url_object . uuid
self . misp_event . add_object ( * * url_object )
self . objects [ ' url ' ] [ attributes ] = url_uuid
return url_uuid
def _fetch_types ( self , value ) :
if self . attribute . type in ( ' ip-src ' , ' ip-dst ' ) :
return ' ip ' , ' domain ' , self . attribute . value
return ' domain ' , ' ip ' , value
def _handle_file ( self , malware , relationship ) :
file_object = self . _create_file ( malware , relationship )
attributes = tuple ( f ' { attribute . object_relation } _ { attribute . value } ' for attribute in file_object . attributes )
if attributes in self . objects [ ' file ' ] :
self . objects [ ' file ' ] [ attributes ] . add_reference ( self . _create_url ( malware ) , ' dropped-by ' )
del file_object
file_object . add_reference ( self . _create_url ( malware ) , ' dropped-by ' )
self . objects [ ' file ' ] [ attributes ] = file_object
self . misp_event . add_object ( * * file_object )
def _parse_dns ( self , value ) :
dns_result = self . _api_call ( f ' { self . base_url } /resolve/ { value } ' )
2019-11-18 00:00:19 +01:00
if dns_result . get ( ' Passive ' ) and dns_result [ ' Passive ' ] . get ( ' records ' ) :
2019-11-05 16:43:03 +01:00
itype , ftype , value = self . _fetch_types ( dns_result [ ' Passive ' ] [ ' query ' ] )
misp_object = MISPObject ( ' domain-ip ' )
misp_object . add_attribute ( itype , value )
for record in dns_result [ ' Passive ' ] [ ' records ' ] :
misp_object . add_attribute ( ftype , record [ ' value ' ] )
misp_object . add_reference ( self . attribute . uuid , ' related-to ' )
self . misp_event . add_object ( * * misp_object )
def _parse_hash ( self , value ) :
malware_result = self . _api_call ( f ' { self . base_url } /malware/ { value } ' )
if malware_result and malware_result . get ( ' malware ' ) :
malware_report = malware_result [ ' malware ' ]
for malware in malware_report . get ( ' origins ' , { } ) . get ( ' CnCServers ' , { } ) . get ( ' rows ' , [ ] ) :
self . _handle_file ( malware , ' related-to ' )
def _parse_ip ( self , value ) :
self . _parse_dns ( value )
self . _parse_malware ( value , ' ipr ' )
def _parse_malware ( self , value , feature ) :
malware_result = self . _api_call ( f ' { self . base_url } / { feature } /malware/ { value } ' )
if malware_result and malware_result . get ( ' malware ' ) :
for malware in malware_result [ ' malware ' ] :
self . _handle_file ( malware , ' associated-with ' )
def _parse_url ( self , value ) :
self . _parse_dns ( value )
self . _parse_malware ( value , ' url ' )
def _parse_vulnerability ( self , value ) :
vulnerability_result = self . _api_call ( f ' { self . base_url } /vulnerabilities/search/ { value } ' )
if vulnerability_result :
for vulnerability in vulnerability_result :
misp_object = MISPObject ( ' vulnerability ' )
for code in vulnerability [ ' stdcode ' ] :
misp_object . add_attribute ( ' id ' , code )
for feature , relation in zip ( ( ' title ' , ' description ' , ' temporal_score ' ) ,
( ' summary ' , ' description ' , ' cvss-score ' ) ) :
misp_object . add_attribute ( relation , vulnerability [ feature ] )
for reference in vulnerability [ ' references ' ] :
misp_object . add_attribute ( ' references ' , reference [ ' link_target ' ] )
misp_object . add_reference ( self . attribute . uuid , ' related-to ' )
self . misp_event . add_object ( * * misp_object )
2019-02-04 11:05:51 +01:00
def handler ( q = False ) :
if q is False :
return False
2019-11-05 16:43:03 +01:00
request = json . loads ( q )
if not request . get ( ' config ' ) or not ( request [ ' config ' ] . get ( ' apikey ' ) and request [ ' config ' ] . get ( ' apipassword ' ) ) :
misperrors [ ' error ' ] = ' An API authentication is required (key and password). '
return misperrors
key = request [ " config " ] [ " apikey " ]
password = request [ ' config ' ] [ ' apipassword ' ]
2020-07-28 11:47:53 +02:00
if not request . get ( ' attribute ' ) or not check_input_attribute ( request [ ' attribute ' ] ) :
return { ' error ' : f ' { standard_error_message } which should contain at least a type, a value and an uuid. ' }
if request [ ' attribute ' ] [ ' type ' ] not in mispattributes [ ' input ' ] :
return { ' error ' : ' Unsupported attribute type. ' }
2019-11-05 16:43:03 +01:00
parser = XforceExchange ( request [ ' attribute ' ] , key , password )
parser . parse ( )
return parser . get_result ( )
2019-02-04 11:05:51 +01:00
def introspection ( ) :
return mispattributes
def version ( ) :
moduleinfo [ ' config ' ] = moduleconfig
return moduleinfo