diff --git a/REQUIREMENTS b/REQUIREMENTS index a2486b4..ccc737f 100644 --- a/REQUIREMENTS +++ b/REQUIREMENTS @@ -27,8 +27,8 @@ beautifulsoup4==4.9.3 bidict==0.21.2; python_version >= '3.6' blockchain==1.4.4 certifi==2021.5.30 +censys==2.0.9 cffi==1.14.6 - #chardet==4.0.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' chardet charset-normalizer==2.0.4; python_version >= '3' diff --git a/misp_modules/modules/expansion/censys_enrich.py b/misp_modules/modules/expansion/censys_enrich.py index d5823ff..6f630a3 100644 --- a/misp_modules/modules/expansion/censys_enrich.py +++ b/misp_modules/modules/expansion/censys_enrich.py @@ -1,15 +1,26 @@ # encoding: utf-8 import json +import configparser import base64 import codecs +import censys.common.config from dateutil.parser import isoparse from . import check_input_attribute, standard_error_message from pymisp import MISPAttribute, MISPEvent, MISPObject + try: - import censys.base - import censys.ipv4 - import censys.websites - import censys.certificates + #needed in order to overwrite the censys module intent of creating config files in the home folder of the proccess owner + #-- + def get_config_over() -> configparser.ConfigParser: + config = configparser.ConfigParser() + config[censys.common.config.DEFAULT] = censys.common.config.default_config + return config + censys.common.config.get_config = get_config_over + #-- + + from censys.search import CensysHosts + from censys.search import CensysCertificates + from censys.common.base import * except ImportError: print("Censys module not installed. Try 'pip install censys'") @@ -20,8 +31,11 @@ mispattributes = {'input': ['ip-src', 'ip-dst', 'domain', 'hostname', 'hostname| moduleinfo = {'version': '0.1', 'author': 'Loïc Fortemps', 'description': 'Censys.io expansion module', 'module-type': ['expansion', 'hover']} +api_id = None +api_secret = None def handler(q=False): + global api_id, api_secret if q is False: return False request = json.loads(q) @@ -65,26 +79,29 @@ def handler(q=False): types.append(attribute.type) values.append(attribute.value) + found = False for t in types: - # ip, ip-src or ip-dst - if t[:2] == "ip": - conn.append(censys.ipv4.CensysIPv4(api_id=api_id, api_secret=api_secret)) - elif t == 'domain' or t == "hostname": - conn.append(censys.websites.CensysWebsites(api_id=api_id, api_secret=api_secret)) - elif 'x509-fingerprint' in t: - conn.append(censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)) - - found = True - for c in conn: - val = values.pop(0) try: - r = c.view(val) - results.append(parse_response(r, attribute)) - found = True - except censys.base.CensysNotFoundException: - found = False - except Exception: - misperrors['error'] = "Connection issue" + value = values.pop(0) + # ip, ip-src or ip-dst + if t[:2] == "ip": + r = CensysHosts(api_id, api_secret).view(value) + results.append(parse_response(r, attribute)) + found = True + elif t == 'domain' or t == "hostname": + # get ips + endpoint = CensysHosts(api_id, api_secret) + for r_list in endpoint.search(query=value, per_page=5, pages=1): + for r in r_list: + results.append(parse_response(r, attribute)) + found = True + elif 'x509-fingerprint-sha256' in t: + # use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details + r = CensysCertificates(api_id, api_secret).view(value) + results.append(parse_response(r, attribute)) + found = True + except CensysException as e: + misperrors['error'] = print("ERROR: param {} / response: {}".format(value, e)) return misperrors if not found: @@ -98,38 +115,43 @@ def parse_response(censys_output, attribute): misp_event = MISPEvent() misp_event.add_attribute(**attribute) # Generic fields (for IP/Websites) - if "autonomous_system" in censys_output: - cen_as = censys_output['autonomous_system'] + if censys_output.get('autonomous_system'): + cen_as = censys_output.get('autonomous_system') asn_object = MISPObject('asn') - asn_object.add_attribute('asn', value=cen_as["asn"]) - asn_object.add_attribute('description', value=cen_as['name']) - asn_object.add_attribute('subnet-announced', value=cen_as['routed_prefix']) - asn_object.add_attribute('country', value=cen_as['country_code']) + asn_object.add_attribute('asn', value=cen_as.get("asn")) + asn_object.add_attribute('description', value=cen_as.get('name')) + asn_object.add_attribute('subnet-announced', value=cen_as.get('routed_prefix')) + asn_object.add_attribute('country', value=cen_as.get('country_code')) asn_object.add_reference(attribute.uuid, 'associated-to') misp_event.add_object(**asn_object) - if "ip" in censys_output and "ports" in censys_output: + if censys_output.get('ip') and len(censys_output.get('services')): #"ports" in censys_output ip_object = MISPObject('ip-port') - ip_object.add_attribute('ip', value=censys_output['ip']) - for p in censys_output['ports']: - ip_object.add_attribute('dst-port', value=p) + ip_object.add_attribute('ip', value=censys_output.get('ip')) + for serv in censys_output.get('services'): + if serv.get('port'): + ip_object.add_attribute('dst-port', value=serv.get('port')) ip_object.add_reference(attribute.uuid, 'associated-to') misp_event.add_object(**ip_object) # We explore all ports to find https or ssh services - for k in censys_output.keys(): - if not isinstance(censys_output[k], dict): + for serv in censys_output.get('services', []): + if not isinstance(serv, dict): continue - if 'https' in censys_output[k]: + if serv.get('service_name').lower() == 'http' and serv.get('certificate', None): try: - cert = censys_output[k]['https']['tls']['certificate'] - cert_obj = get_certificate_object(cert, attribute) - misp_event.add_object(**cert_obj) + cert = serv.get('certificate', None) + if cert: + # TODO switch to api_v2 once available + # use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details + cert_details = CensysCertificates(api_id, api_secret).view(cert) + cert_obj = get_certificate_object(cert_details, attribute) + misp_event.add_object(**cert_obj) except KeyError: print("Error !") - if 'ssh' in censys_output[k]: + if serv.get('ssh') and serv.get('service_name').lower() == 'ssh': try: - cert = censys_output[k]['ssh']['v2']['server_host_key'] + cert = serv.get('ssh').get('server_host_key').get('fingerprint_sha256') # TODO enable once the type is merged # misp_event.add_attribute(type='hasshserver-sha256', value=cert['fingerprint_sha256']) except KeyError: @@ -144,20 +166,20 @@ def parse_response(censys_output, attribute): if "location" in censys_output: loc_obj = MISPObject('geolocation') loc = censys_output['location'] - loc_obj.add_attribute('latitude', value=loc['latitude']) - loc_obj.add_attribute('longitude', value=loc['longitude']) + loc_obj.add_attribute('latitude', value=loc.get('coordinates', {}).get('latitude', None)) + loc_obj.add_attribute('longitude', value=loc.get('coordinates', {}).get('longitude', None)) if 'city' in loc: - loc_obj.add_attribute('city', value=loc['city']) - loc_obj.add_attribute('country', value=loc['country']) + loc_obj.add_attribute('city', value=loc.get('city')) + loc_obj.add_attribute('country', value=loc.get('country')) if 'postal_code' in loc: - loc_obj.add_attribute('zipcode', value=loc['postal_code']) + loc_obj.add_attribute('zipcode', value=loc.get('postal_code')) if 'province' in loc: - loc_obj.add_attribute('region', value=loc['province']) + loc_obj.add_attribute('region', value=loc.get('province')) loc_obj.add_reference(attribute.uuid, 'associated-to') misp_event.add_object(**loc_obj) event = json.loads(misp_event.to_json()) - return {'Object': event['Object'], 'Attribute': event['Attribute']} + return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])} # In case of multiple enrichment (ip and domain), we need to filter out similar objects @@ -166,24 +188,23 @@ def remove_duplicates(results): # Only one enrichment was performed so no duplicate if len(results) == 1: return results[0] - elif len(results) == 2: - final_result = results[0] - obj_l2 = results[1]['Object'] - for o2 in obj_l2: - if o2['name'] == "asn": - key = "asn" - elif o2['name'] == "ip-port": - key = "ip" - elif o2['name'] == "x509": - key = "x509-fingerprint-sha256" - elif o2['name'] == "geolocation": - key = "latitude" - if not check_if_present(o2, key, final_result['Object']): - final_result['Object'].append(o2) - - return final_result else: - return [] + final_result = results[0] + for i,result in enumerate(results[1:]): + obj_l = results[i+1].get('Object', []) + for o2 in obj_l: + if o2['name'] == "asn": + key = "asn" + elif o2['name'] == "ip-port": + key = "ip" + elif o2['name'] == "x509": + key = "x509-fingerprint-sha256" + elif o2['name'] == "geolocation": + key = "latitude" + if not check_if_present(o2, key, final_result.get('Object', [])): + final_result['Object'].append(o2) + + return final_result def check_if_present(object, attribute_name, list_objects): @@ -253,4 +274,4 @@ def introspection(): def version(): moduleinfo['config'] = moduleconfig - return moduleinfo + return moduleinfo \ No newline at end of file diff --git a/tests/expansion_configs.json b/tests/expansion_configs.json new file mode 100644 index 0000000..57b0f9e --- /dev/null +++ b/tests/expansion_configs.json @@ -0,0 +1,6 @@ +{ + "censys_enrich": { + "api_id" : "", + "api_secret": "" + } +} \ No newline at end of file diff --git a/tests/test_expansions.py b/tests/test_expansions.py index 841a39a..d015bcb 100644 --- a/tests/test_expansions.py +++ b/tests/test_expansions.py @@ -215,6 +215,25 @@ class TestExpansions(unittest.TestCase): response = self.misp_modules_post(query) self.assertEqual(self.get_values(response), '\nThis is an basic test docx file. ') + def test_censys(self): + module_name = "censys_enrich" + query = { + "attribute": {"type" : "ip-dst", "value": "8.8.8.8", "uuid": ""}, + "module": module_name, + "config": {} + } + if module_name in self.configs: + query['config'] = self.configs[module_name] + response = self.misp_modules_post(query) + + if self.configs[module_name].get('api_id') == '': + self.assertTrue(self.get_errors(response).startswith('ERROR: param ')) + else: + self.assertGreaterEqual(len(response.json().get('results', {}).get('Attribute')), 1) + else: + response = self.misp_modules_post(query) + self.assertTrue(self.get_errors(response).startswith('Please provide config options')) + def test_farsight_passivedns(self): module_name = 'farsight_passivedns' if module_name in self.configs: