2016-03-25 17:38:03 +01:00
import json
import pypssl
2020-07-28 11:47:53 +02:00
from . import check_input_attribute , standard_error_message
2019-12-17 10:26:43 +01:00
from pymisp import MISPAttribute , MISPEvent , MISPObject
2016-03-25 17:38:03 +01:00
2020-06-03 11:12:47 +02:00
mispattributes = { ' input ' : [ ' ip-src ' , ' ip-dst ' , ' ip-src|port ' , ' ip-dst|port ' ] , ' format ' : ' misp_standard ' }
2024-08-12 11:23:10 +02:00
moduleinfo = {
' version ' : ' 0.2 ' ,
' author ' : ' Raphaël Vinot ' ,
' description ' : ' Modules to access CIRCL Passive SSL. ' ,
' module-type ' : [ ' expansion ' , ' hover ' ] ,
' name ' : ' CIRCL Passive SSL ' ,
' logo ' : ' passivessl.png ' ,
' requirements ' : [ ' pypssl: Passive SSL python library ' , ' A CIRCL passive SSL account with username & password ' ] ,
' features ' : ' This module takes an ip-address (ip-src or ip-dst) attribute as input, and queries the CIRCL Passive SSL REST API to gather the related certificates and return the corresponding MISP objects. \n \n To make it work a username and a password are required to authenticate to the CIRCL Passive SSL API. ' ,
' references ' : [ ' https://www.circl.lu/services/passive-ssl/ ' ] ,
' input ' : ' IP address attribute. ' ,
' output ' : ' x509 certificate objects seen by the IP address(es). ' ,
}
2016-03-25 17:38:03 +01:00
moduleconfig = [ ' username ' , ' password ' ]
2019-12-17 10:26:43 +01:00
class PassiveSSLParser ( ) :
def __init__ ( self , attribute , authentication ) :
self . misp_event = MISPEvent ( )
self . attribute = MISPAttribute ( )
self . attribute . from_dict ( * * attribute )
self . misp_event . add_attribute ( * * self . attribute )
self . pssl = pypssl . PyPSSL ( basic_auth = authentication )
self . cert_hash = ' x509-fingerprint-sha1 '
self . cert_type = ' pem '
self . mapping = { ' issuer ' : ( ' text ' , ' issuer ' ) ,
' keylength ' : ( ' text ' , ' pubkey-info-size ' ) ,
' not_after ' : ( ' datetime ' , ' validity-not-after ' ) ,
' not_before ' : ( ' datetime ' , ' validity-not-before ' ) ,
' subject ' : ( ' text ' , ' subject ' ) }
def get_results ( self ) :
if hasattr ( self , ' result ' ) :
2019-12-17 11:17:56 +01:00
return self . result
2019-12-17 10:26:43 +01:00
event = json . loads ( self . misp_event . to_json ( ) )
2019-12-18 16:16:47 +01:00
results = { key : event [ key ] for key in ( ' Attribute ' , ' Object ' ) }
2019-12-17 10:26:43 +01:00
return { ' results ' : results }
2020-06-03 11:12:47 +02:00
def parse ( self ) :
value = self . attribute . value . split ( ' | ' ) [ 0 ] if ' | ' in self . attribute . type else self . attribute . value
2019-12-17 10:26:43 +01:00
try :
2020-06-03 11:12:47 +02:00
results = self . pssl . query ( value )
2019-12-17 10:26:43 +01:00
except Exception :
self . result = { ' error ' : ' There is an authentication error, please make sure you supply correct credentials. ' }
return
2020-06-03 11:19:21 +02:00
if not results :
self . result = { ' error ' : ' Not found ' }
return
2020-06-03 14:06:57 +02:00
if ' error ' in results :
self . result = { ' error ' : results [ ' error ' ] }
return
2019-12-17 10:26:43 +01:00
for ip_address , certificates in results . items ( ) :
ip_uuid = self . _handle_ip_attribute ( ip_address )
for certificate in certificates [ ' certificates ' ] :
self . _handle_certificate ( certificate , ip_uuid )
def _handle_certificate ( self , certificate , ip_uuid ) :
x509 = MISPObject ( ' x509 ' )
2019-12-17 14:29:29 +01:00
x509 . add_attribute ( self . cert_hash , type = self . cert_hash , value = certificate )
2019-12-17 10:26:43 +01:00
cert_details = self . pssl . fetch_cert ( certificate )
info = cert_details [ ' info ' ]
for feature , mapping in self . mapping . items ( ) :
attribute_type , object_relation = mapping
x509 . add_attribute ( object_relation , type = attribute_type , value = info [ feature ] )
x509 . add_attribute ( self . cert_type , type = ' text ' , value = self . cert_type )
x509 . add_reference ( ip_uuid , ' seen-by ' )
self . misp_event . add_object ( * * x509 )
def _handle_ip_attribute ( self , ip_address ) :
if ip_address == self . attribute . value :
return self . attribute . uuid
ip_attribute = MISPAttribute ( )
ip_attribute . from_dict ( * * { ' type ' : self . attribute . type , ' value ' : ip_address } )
self . misp_event . add_attribute ( * * ip_attribute )
return ip_attribute . uuid
2016-03-25 17:38:03 +01:00
def handler ( q = False ) :
if q is False :
return False
request = json . loads ( q )
2019-12-17 10:26:43 +01:00
if not request . get ( ' config ' ) :
return { ' error ' : ' CIRCL Passive SSL authentication is missing. ' }
if not request [ ' config ' ] . get ( ' username ' ) or not request [ ' config ' ] . get ( ' password ' ) :
return { ' error ' : ' CIRCL Passive SSL authentication is incomplete, please provide your username and password. ' }
authentication = ( request [ ' config ' ] [ ' username ' ] , request [ ' config ' ] [ ' password ' ] )
2020-07-28 11:47:53 +02:00
if not request . get ( ' attribute ' ) or not check_input_attribute ( request [ ' attribute ' ] ) :
return { ' error ' : f ' { standard_error_message } , which should contain at least a type, a value and an uuid. ' }
2019-12-17 10:26:43 +01:00
attribute = request [ ' attribute ' ]
if not any ( input_type == attribute [ ' type ' ] for input_type in mispattributes [ ' input ' ] ) :
2020-07-28 11:47:53 +02:00
return { ' error ' : ' Unsupported attribute type. ' }
2019-12-17 10:26:43 +01:00
pssl_parser = PassiveSSLParser ( attribute , authentication )
2020-06-03 11:12:47 +02:00
pssl_parser . parse ( )
2019-12-17 10:26:43 +01:00
return pssl_parser . get_results ( )
2016-03-25 17:38:03 +01:00
def introspection ( ) :
return mispattributes
def version ( ) :
moduleinfo [ ' config ' ] = moduleconfig
return moduleinfo