mirror of https://github.com/MISP/misp-modules
Adding support for more input types, including multi-types
parent
b79636ccfa
commit
500f0301a9
|
@ -14,7 +14,8 @@ except ImportError:
|
|||
|
||||
misperrors = {'error': 'Error'}
|
||||
moduleconfig = ['api_id', 'api_secret']
|
||||
mispattributes = {'input': ['ip-src', 'ip-dst', 'domain', 'x509-fingerprint-md5', 'x509-fingerprint-sha1', 'x509-fingerprint-sha256'], 'format': 'misp_standard'}
|
||||
mispattributes = {'input': ['ip-src', 'ip-dst', 'domain', 'hostname', 'hostname|port', 'domain|ip', 'ip-dst|port', 'ip-src|port',
|
||||
'x509-fingerprint-md5', 'x509-fingerprint-sha1', 'x509-fingerprint-sha256'], 'format': 'misp_standard'}
|
||||
moduleinfo = {'version': '0.1', 'author': 'Loïc Fortemps',
|
||||
'description': 'Censys.io expansion module', 'module-type': ['expansion', 'hover']}
|
||||
|
||||
|
@ -43,27 +44,53 @@ def handler(q=False):
|
|||
|
||||
attribute = MISPAttribute()
|
||||
attribute.from_dict(**request['attribute'])
|
||||
# Lists to accomodate multi-types attribute
|
||||
conn = list()
|
||||
types = list()
|
||||
values = list()
|
||||
results = list()
|
||||
|
||||
if attribute.type == 'ip-dst' or attribute.type == 'ip-src':
|
||||
conn = censys.ipv4.CensysIPv4(api_id=api_id, api_secret=api_secret)
|
||||
elif attribute.type == 'domain':
|
||||
conn = censys.websites.CensysWebsites(api_id=api_id, api_secret=api_secret)
|
||||
elif 'x509-fingerprint' in attribute.type:
|
||||
conn = censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)
|
||||
if "|" in attribute.type:
|
||||
t_1, t_2 = attribute.type.split('|')
|
||||
v_1, v_2 = attribute.value.split('|')
|
||||
# We cannot use the port information
|
||||
if t_2 == "port":
|
||||
types.append(t_1)
|
||||
values.append(v_1)
|
||||
else:
|
||||
types = [t_1, t_2]
|
||||
values = [v_1, v_2]
|
||||
else:
|
||||
return False
|
||||
types.append(attribute.type)
|
||||
values.append(attribute.value)
|
||||
|
||||
try:
|
||||
result = conn.view(attribute.value)
|
||||
except censys.base.CensysNotFoundException:
|
||||
for t in types:
|
||||
# ip, ip-src or ip-dst
|
||||
if t[:2] == "ip":
|
||||
conn.append(censys.ipv4.CensysIPv4(api_id=api_id, api_secret=api_secret))
|
||||
elif t == 'domain' or t == "hostname":
|
||||
conn.append(censys.websites.CensysWebsites(api_id=api_id, api_secret=api_secret))
|
||||
elif 'x509-fingerprint' in t:
|
||||
conn.append(censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret))
|
||||
|
||||
found = True
|
||||
for c in conn:
|
||||
val = values.pop(0)
|
||||
try:
|
||||
r = c.view(val)
|
||||
results.append(parse_response(r, attribute))
|
||||
found = True
|
||||
except censys.base.CensysNotFoundException:
|
||||
found = False
|
||||
except Exception:
|
||||
misperrors['error'] = "Connection issue"
|
||||
return misperrors
|
||||
|
||||
if not found:
|
||||
misperrors['error'] = "Nothing could be found on Censys"
|
||||
return misperrors
|
||||
except Exception:
|
||||
misperrors['error'] = "Connection issue"
|
||||
return misperrors
|
||||
|
||||
r = {'results': parse_response(result, attribute)}
|
||||
return r
|
||||
return {'results': remove_duplicates(results)}
|
||||
|
||||
|
||||
def parse_response(censys_output, attribute):
|
||||
|
@ -102,7 +129,7 @@ def parse_response(censys_output, attribute):
|
|||
if 'ssh' in censys_output[k]:
|
||||
try:
|
||||
cert = censys_output[k]['ssh']['v2']['server_host_key']
|
||||
# To enable once the type is merged
|
||||
# TODO enable once the type is merged
|
||||
# misp_event.add_attribute(type='hasshserver-sha256', value=cert['fingerprint_sha256'])
|
||||
except KeyError:
|
||||
pass
|
||||
|
@ -118,9 +145,11 @@ def parse_response(censys_output, attribute):
|
|||
loc = censys_output['location']
|
||||
loc_obj.add_attribute('latitude', value=loc['latitude'])
|
||||
loc_obj.add_attribute('longitude', value=loc['longitude'])
|
||||
loc_obj.add_attribute('city', value=loc['city'])
|
||||
if 'city' in loc:
|
||||
loc_obj.add_attribute('city', value=loc['city'])
|
||||
loc_obj.add_attribute('country', value=loc['country'])
|
||||
loc_obj.add_attribute('zipcode', value=loc['postal_code'])
|
||||
if 'postal_code' in loc:
|
||||
loc_obj.add_attribute('zipcode', value=loc['postal_code'])
|
||||
if 'province' in loc:
|
||||
loc_obj.add_attribute('region', value=loc['province'])
|
||||
loc_obj.add_reference(attribute.uuid, 'associated-to')
|
||||
|
@ -130,6 +159,51 @@ def parse_response(censys_output, attribute):
|
|||
return {'Object': event['Object'], 'Attribute': event['Attribute']}
|
||||
|
||||
|
||||
# In case of multiple enrichment (ip and domain), we need to filter out similar objects
|
||||
# TODO: make it more granular
|
||||
def remove_duplicates(results):
|
||||
# Only one enrichment was performed so no duplicate
|
||||
if len(results) == 1:
|
||||
return results[0]
|
||||
elif len(results) == 2:
|
||||
final_result = results[0]
|
||||
obj_l2 = results[1]['Object']
|
||||
for o2 in obj_l2:
|
||||
if o2['name'] == "asn":
|
||||
key = "asn"
|
||||
elif o2['name'] == "ip-port":
|
||||
key = "ip"
|
||||
elif o2['name'] == "x509":
|
||||
key = "x509-fingerprint-sha256"
|
||||
elif o2['name'] == "geolocation":
|
||||
key = "latitude"
|
||||
if not check_if_present(o2, key, final_result['Object']):
|
||||
final_result['Object'].append(o2)
|
||||
|
||||
return final_result
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def check_if_present(object, attribute_name, list_objects):
|
||||
"""
|
||||
Assert if a given object is present in the list.
|
||||
|
||||
This function check if object (json format) is present in list_objects
|
||||
using attribute_name for the matching
|
||||
"""
|
||||
for o in list_objects:
|
||||
if o['name'] == object['name']:
|
||||
for attr in object['Attribute']:
|
||||
if attr['type'] == attribute_name:
|
||||
value = attr['value']
|
||||
for attr2 in o['Attribute']:
|
||||
if attr['type'] == attribute_name and attr['value'] == value:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_certificate_object(cert, attribute):
|
||||
parsed = cert['parsed']
|
||||
cert_object = MISPObject('x509')
|
||||
|
|
Loading…
Reference in New Issue