Upgrade censys_enrich module to new api version

pull/542/head
Silvian I 2022-01-06 11:35:01 +01:00
parent 895e992349
commit b9d9df4dd0
4 changed files with 112 additions and 66 deletions

View File

@ -27,8 +27,8 @@ beautifulsoup4==4.9.3
bidict==0.21.2; python_version >= '3.6'
blockchain==1.4.4
certifi==2021.5.30
censys==2.0.9
cffi==1.14.6
#chardet==4.0.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
chardet
charset-normalizer==2.0.4; python_version >= '3'

View File

@ -1,15 +1,26 @@
# encoding: utf-8
import json
import configparser
import base64
import codecs
import censys.common.config
from dateutil.parser import isoparse
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
try:
import censys.base
import censys.ipv4
import censys.websites
import censys.certificates
#needed in order to overwrite the censys module intent of creating config files in the home folder of the proccess owner
#--
def get_config_over() -> configparser.ConfigParser:
config = configparser.ConfigParser()
config[censys.common.config.DEFAULT] = censys.common.config.default_config
return config
censys.common.config.get_config = get_config_over
#--
from censys.search import CensysHosts
from censys.search import CensysCertificates
from censys.common.base import *
except ImportError:
print("Censys module not installed. Try 'pip install censys'")
@ -20,8 +31,11 @@ mispattributes = {'input': ['ip-src', 'ip-dst', 'domain', 'hostname', 'hostname|
moduleinfo = {'version': '0.1', 'author': 'Loïc Fortemps',
'description': 'Censys.io expansion module', 'module-type': ['expansion', 'hover']}
api_id = None
api_secret = None
def handler(q=False):
global api_id, api_secret
if q is False:
return False
request = json.loads(q)
@ -65,26 +79,29 @@ def handler(q=False):
types.append(attribute.type)
values.append(attribute.value)
found = False
for t in types:
# ip, ip-src or ip-dst
if t[:2] == "ip":
conn.append(censys.ipv4.CensysIPv4(api_id=api_id, api_secret=api_secret))
elif t == 'domain' or t == "hostname":
conn.append(censys.websites.CensysWebsites(api_id=api_id, api_secret=api_secret))
elif 'x509-fingerprint' in t:
conn.append(censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret))
found = True
for c in conn:
val = values.pop(0)
try:
r = c.view(val)
results.append(parse_response(r, attribute))
found = True
except censys.base.CensysNotFoundException:
found = False
except Exception:
misperrors['error'] = "Connection issue"
value = values.pop(0)
# ip, ip-src or ip-dst
if t[:2] == "ip":
r = CensysHosts(api_id, api_secret).view(value)
results.append(parse_response(r, attribute))
found = True
elif t == 'domain' or t == "hostname":
# get ips
endpoint = CensysHosts(api_id, api_secret)
for r_list in endpoint.search(query=value, per_page=5, pages=1):
for r in r_list:
results.append(parse_response(r, attribute))
found = True
elif 'x509-fingerprint-sha256' in t:
# use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details
r = CensysCertificates(api_id, api_secret).view(value)
results.append(parse_response(r, attribute))
found = True
except CensysException as e:
misperrors['error'] = print("ERROR: param {} / response: {}".format(value, e))
return misperrors
if not found:
@ -98,38 +115,43 @@ def parse_response(censys_output, attribute):
misp_event = MISPEvent()
misp_event.add_attribute(**attribute)
# Generic fields (for IP/Websites)
if "autonomous_system" in censys_output:
cen_as = censys_output['autonomous_system']
if censys_output.get('autonomous_system'):
cen_as = censys_output.get('autonomous_system')
asn_object = MISPObject('asn')
asn_object.add_attribute('asn', value=cen_as["asn"])
asn_object.add_attribute('description', value=cen_as['name'])
asn_object.add_attribute('subnet-announced', value=cen_as['routed_prefix'])
asn_object.add_attribute('country', value=cen_as['country_code'])
asn_object.add_attribute('asn', value=cen_as.get("asn"))
asn_object.add_attribute('description', value=cen_as.get('name'))
asn_object.add_attribute('subnet-announced', value=cen_as.get('routed_prefix'))
asn_object.add_attribute('country', value=cen_as.get('country_code'))
asn_object.add_reference(attribute.uuid, 'associated-to')
misp_event.add_object(**asn_object)
if "ip" in censys_output and "ports" in censys_output:
if censys_output.get('ip') and len(censys_output.get('services')): #"ports" in censys_output
ip_object = MISPObject('ip-port')
ip_object.add_attribute('ip', value=censys_output['ip'])
for p in censys_output['ports']:
ip_object.add_attribute('dst-port', value=p)
ip_object.add_attribute('ip', value=censys_output.get('ip'))
for serv in censys_output.get('services'):
if serv.get('port'):
ip_object.add_attribute('dst-port', value=serv.get('port'))
ip_object.add_reference(attribute.uuid, 'associated-to')
misp_event.add_object(**ip_object)
# We explore all ports to find https or ssh services
for k in censys_output.keys():
if not isinstance(censys_output[k], dict):
for serv in censys_output.get('services', []):
if not isinstance(serv, dict):
continue
if 'https' in censys_output[k]:
if serv.get('service_name').lower() == 'http' and serv.get('certificate', None):
try:
cert = censys_output[k]['https']['tls']['certificate']
cert_obj = get_certificate_object(cert, attribute)
misp_event.add_object(**cert_obj)
cert = serv.get('certificate', None)
if cert:
# TODO switch to api_v2 once available
# use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details
cert_details = CensysCertificates(api_id, api_secret).view(cert)
cert_obj = get_certificate_object(cert_details, attribute)
misp_event.add_object(**cert_obj)
except KeyError:
print("Error !")
if 'ssh' in censys_output[k]:
if serv.get('ssh') and serv.get('service_name').lower() == 'ssh':
try:
cert = censys_output[k]['ssh']['v2']['server_host_key']
cert = serv.get('ssh').get('server_host_key').get('fingerprint_sha256')
# TODO enable once the type is merged
# misp_event.add_attribute(type='hasshserver-sha256', value=cert['fingerprint_sha256'])
except KeyError:
@ -144,20 +166,20 @@ def parse_response(censys_output, attribute):
if "location" in censys_output:
loc_obj = MISPObject('geolocation')
loc = censys_output['location']
loc_obj.add_attribute('latitude', value=loc['latitude'])
loc_obj.add_attribute('longitude', value=loc['longitude'])
loc_obj.add_attribute('latitude', value=loc.get('coordinates', {}).get('latitude', None))
loc_obj.add_attribute('longitude', value=loc.get('coordinates', {}).get('longitude', None))
if 'city' in loc:
loc_obj.add_attribute('city', value=loc['city'])
loc_obj.add_attribute('country', value=loc['country'])
loc_obj.add_attribute('city', value=loc.get('city'))
loc_obj.add_attribute('country', value=loc.get('country'))
if 'postal_code' in loc:
loc_obj.add_attribute('zipcode', value=loc['postal_code'])
loc_obj.add_attribute('zipcode', value=loc.get('postal_code'))
if 'province' in loc:
loc_obj.add_attribute('region', value=loc['province'])
loc_obj.add_attribute('region', value=loc.get('province'))
loc_obj.add_reference(attribute.uuid, 'associated-to')
misp_event.add_object(**loc_obj)
event = json.loads(misp_event.to_json())
return {'Object': event['Object'], 'Attribute': event['Attribute']}
return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])}
# In case of multiple enrichment (ip and domain), we need to filter out similar objects
@ -166,24 +188,23 @@ def remove_duplicates(results):
# Only one enrichment was performed so no duplicate
if len(results) == 1:
return results[0]
elif len(results) == 2:
final_result = results[0]
obj_l2 = results[1]['Object']
for o2 in obj_l2:
if o2['name'] == "asn":
key = "asn"
elif o2['name'] == "ip-port":
key = "ip"
elif o2['name'] == "x509":
key = "x509-fingerprint-sha256"
elif o2['name'] == "geolocation":
key = "latitude"
if not check_if_present(o2, key, final_result['Object']):
final_result['Object'].append(o2)
return final_result
else:
return []
final_result = results[0]
for i,result in enumerate(results[1:]):
obj_l = results[i+1].get('Object', [])
for o2 in obj_l:
if o2['name'] == "asn":
key = "asn"
elif o2['name'] == "ip-port":
key = "ip"
elif o2['name'] == "x509":
key = "x509-fingerprint-sha256"
elif o2['name'] == "geolocation":
key = "latitude"
if not check_if_present(o2, key, final_result.get('Object', [])):
final_result['Object'].append(o2)
return final_result
def check_if_present(object, attribute_name, list_objects):
@ -253,4 +274,4 @@ def introspection():
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
return moduleinfo

View File

@ -0,0 +1,6 @@
{
"censys_enrich": {
"api_id" : "<api_id>",
"api_secret": "<api_secret>"
}
}

View File

@ -215,6 +215,25 @@ class TestExpansions(unittest.TestCase):
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), '\nThis is an basic test docx file. ')
def test_censys(self):
module_name = "censys_enrich"
query = {
"attribute": {"type" : "ip-dst", "value": "8.8.8.8", "uuid": ""},
"module": module_name,
"config": {}
}
if module_name in self.configs:
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
if self.configs[module_name].get('api_id') == '<api_id>':
self.assertTrue(self.get_errors(response).startswith('ERROR: param '))
else:
self.assertGreaterEqual(len(response.json().get('results', {}).get('Attribute')), 1)
else:
response = self.misp_modules_post(query)
self.assertTrue(self.get_errors(response).startswith('Please provide config options'))
def test_farsight_passivedns(self):
module_name = 'farsight_passivedns'
if module_name in self.configs: