mirror of https://github.com/MISP/misp-modules
Merge pull request #542 from slv008/main
Upgrade censys_enrich module to new api versionpull/547/head
commit
8ae64ba264
|
@ -27,8 +27,8 @@ beautifulsoup4==4.9.3
|
|||
bidict==0.21.2; python_version >= '3.6'
|
||||
blockchain==1.4.4
|
||||
certifi==2021.5.30
|
||||
censys==2.0.9
|
||||
cffi==1.14.6
|
||||
|
||||
#chardet==4.0.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
|
||||
chardet
|
||||
charset-normalizer==2.0.4; python_version >= '3'
|
||||
|
|
|
@ -1,15 +1,26 @@
|
|||
# encoding: utf-8
|
||||
import json
|
||||
import configparser
|
||||
import base64
|
||||
import codecs
|
||||
import censys.common.config
|
||||
from dateutil.parser import isoparse
|
||||
from . import check_input_attribute, standard_error_message
|
||||
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
||||
|
||||
try:
|
||||
import censys.base
|
||||
import censys.ipv4
|
||||
import censys.websites
|
||||
import censys.certificates
|
||||
#needed in order to overwrite the censys module intent of creating config files in the home folder of the proccess owner
|
||||
#--
|
||||
def get_config_over() -> configparser.ConfigParser:
|
||||
config = configparser.ConfigParser()
|
||||
config[censys.common.config.DEFAULT] = censys.common.config.default_config
|
||||
return config
|
||||
censys.common.config.get_config = get_config_over
|
||||
#--
|
||||
|
||||
from censys.search import CensysHosts
|
||||
from censys.search import CensysCertificates
|
||||
from censys.common.base import *
|
||||
except ImportError:
|
||||
print("Censys module not installed. Try 'pip install censys'")
|
||||
|
||||
|
@ -20,8 +31,11 @@ mispattributes = {'input': ['ip-src', 'ip-dst', 'domain', 'hostname', 'hostname|
|
|||
moduleinfo = {'version': '0.1', 'author': 'Loïc Fortemps',
|
||||
'description': 'Censys.io expansion module', 'module-type': ['expansion', 'hover']}
|
||||
|
||||
api_id = None
|
||||
api_secret = None
|
||||
|
||||
def handler(q=False):
|
||||
global api_id, api_secret
|
||||
if q is False:
|
||||
return False
|
||||
request = json.loads(q)
|
||||
|
@ -46,7 +60,6 @@ def handler(q=False):
|
|||
attribute = MISPAttribute()
|
||||
attribute.from_dict(**request['attribute'])
|
||||
# Lists to accomodate multi-types attribute
|
||||
conn = list()
|
||||
types = list()
|
||||
values = list()
|
||||
results = list()
|
||||
|
@ -65,26 +78,29 @@ def handler(q=False):
|
|||
types.append(attribute.type)
|
||||
values.append(attribute.value)
|
||||
|
||||
found = False
|
||||
for t in types:
|
||||
# ip, ip-src or ip-dst
|
||||
if t[:2] == "ip":
|
||||
conn.append(censys.ipv4.CensysIPv4(api_id=api_id, api_secret=api_secret))
|
||||
elif t == 'domain' or t == "hostname":
|
||||
conn.append(censys.websites.CensysWebsites(api_id=api_id, api_secret=api_secret))
|
||||
elif 'x509-fingerprint' in t:
|
||||
conn.append(censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret))
|
||||
|
||||
found = True
|
||||
for c in conn:
|
||||
val = values.pop(0)
|
||||
try:
|
||||
r = c.view(val)
|
||||
results.append(parse_response(r, attribute))
|
||||
found = True
|
||||
except censys.base.CensysNotFoundException:
|
||||
found = False
|
||||
except Exception:
|
||||
misperrors['error'] = "Connection issue"
|
||||
value = values.pop(0)
|
||||
# ip, ip-src or ip-dst
|
||||
if t[:2] == "ip":
|
||||
r = CensysHosts(api_id, api_secret).view(value)
|
||||
results.append(parse_response(r, attribute))
|
||||
found = True
|
||||
elif t == 'domain' or t == "hostname":
|
||||
# get ips
|
||||
endpoint = CensysHosts(api_id, api_secret)
|
||||
for r_list in endpoint.search(query=value, per_page=5, pages=1):
|
||||
for r in r_list:
|
||||
results.append(parse_response(r, attribute))
|
||||
found = True
|
||||
elif 'x509-fingerprint-sha256' in t:
|
||||
# use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details
|
||||
r = CensysCertificates(api_id, api_secret).view(value)
|
||||
results.append(parse_response(r, attribute))
|
||||
found = True
|
||||
except CensysException as e:
|
||||
misperrors['error'] = "ERROR: param {} / response: {}".format(value, e)
|
||||
return misperrors
|
||||
|
||||
if not found:
|
||||
|
@ -98,38 +114,43 @@ def parse_response(censys_output, attribute):
|
|||
misp_event = MISPEvent()
|
||||
misp_event.add_attribute(**attribute)
|
||||
# Generic fields (for IP/Websites)
|
||||
if "autonomous_system" in censys_output:
|
||||
cen_as = censys_output['autonomous_system']
|
||||
if censys_output.get('autonomous_system'):
|
||||
cen_as = censys_output.get('autonomous_system')
|
||||
asn_object = MISPObject('asn')
|
||||
asn_object.add_attribute('asn', value=cen_as["asn"])
|
||||
asn_object.add_attribute('description', value=cen_as['name'])
|
||||
asn_object.add_attribute('subnet-announced', value=cen_as['routed_prefix'])
|
||||
asn_object.add_attribute('country', value=cen_as['country_code'])
|
||||
asn_object.add_attribute('asn', value=cen_as.get("asn"))
|
||||
asn_object.add_attribute('description', value=cen_as.get('name'))
|
||||
asn_object.add_attribute('subnet-announced', value=cen_as.get('routed_prefix'))
|
||||
asn_object.add_attribute('country', value=cen_as.get('country_code'))
|
||||
asn_object.add_reference(attribute.uuid, 'associated-to')
|
||||
misp_event.add_object(**asn_object)
|
||||
|
||||
if "ip" in censys_output and "ports" in censys_output:
|
||||
if censys_output.get('ip') and len(censys_output.get('services')): #"ports" in censys_output
|
||||
ip_object = MISPObject('ip-port')
|
||||
ip_object.add_attribute('ip', value=censys_output['ip'])
|
||||
for p in censys_output['ports']:
|
||||
ip_object.add_attribute('dst-port', value=p)
|
||||
ip_object.add_attribute('ip', value=censys_output.get('ip'))
|
||||
for serv in censys_output.get('services'):
|
||||
if serv.get('port'):
|
||||
ip_object.add_attribute('dst-port', value=serv.get('port'))
|
||||
ip_object.add_reference(attribute.uuid, 'associated-to')
|
||||
misp_event.add_object(**ip_object)
|
||||
|
||||
# We explore all ports to find https or ssh services
|
||||
for k in censys_output.keys():
|
||||
if not isinstance(censys_output[k], dict):
|
||||
for serv in censys_output.get('services', []):
|
||||
if not isinstance(serv, dict):
|
||||
continue
|
||||
if 'https' in censys_output[k]:
|
||||
if serv.get('service_name').lower() == 'http' and serv.get('certificate', None):
|
||||
try:
|
||||
cert = censys_output[k]['https']['tls']['certificate']
|
||||
cert_obj = get_certificate_object(cert, attribute)
|
||||
misp_event.add_object(**cert_obj)
|
||||
cert = serv.get('certificate', None)
|
||||
if cert:
|
||||
# TODO switch to api_v2 once available
|
||||
# use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details
|
||||
cert_details = CensysCertificates(api_id, api_secret).view(cert)
|
||||
cert_obj = get_certificate_object(cert_details, attribute)
|
||||
misp_event.add_object(**cert_obj)
|
||||
except KeyError:
|
||||
print("Error !")
|
||||
if 'ssh' in censys_output[k]:
|
||||
if serv.get('ssh') and serv.get('service_name').lower() == 'ssh':
|
||||
try:
|
||||
cert = censys_output[k]['ssh']['v2']['server_host_key']
|
||||
cert = serv.get('ssh').get('server_host_key').get('fingerprint_sha256')
|
||||
# TODO enable once the type is merged
|
||||
# misp_event.add_attribute(type='hasshserver-sha256', value=cert['fingerprint_sha256'])
|
||||
except KeyError:
|
||||
|
@ -144,20 +165,20 @@ def parse_response(censys_output, attribute):
|
|||
if "location" in censys_output:
|
||||
loc_obj = MISPObject('geolocation')
|
||||
loc = censys_output['location']
|
||||
loc_obj.add_attribute('latitude', value=loc['latitude'])
|
||||
loc_obj.add_attribute('longitude', value=loc['longitude'])
|
||||
loc_obj.add_attribute('latitude', value=loc.get('coordinates', {}).get('latitude', None))
|
||||
loc_obj.add_attribute('longitude', value=loc.get('coordinates', {}).get('longitude', None))
|
||||
if 'city' in loc:
|
||||
loc_obj.add_attribute('city', value=loc['city'])
|
||||
loc_obj.add_attribute('country', value=loc['country'])
|
||||
loc_obj.add_attribute('city', value=loc.get('city'))
|
||||
loc_obj.add_attribute('country', value=loc.get('country'))
|
||||
if 'postal_code' in loc:
|
||||
loc_obj.add_attribute('zipcode', value=loc['postal_code'])
|
||||
loc_obj.add_attribute('zipcode', value=loc.get('postal_code'))
|
||||
if 'province' in loc:
|
||||
loc_obj.add_attribute('region', value=loc['province'])
|
||||
loc_obj.add_attribute('region', value=loc.get('province'))
|
||||
loc_obj.add_reference(attribute.uuid, 'associated-to')
|
||||
misp_event.add_object(**loc_obj)
|
||||
|
||||
event = json.loads(misp_event.to_json())
|
||||
return {'Object': event['Object'], 'Attribute': event['Attribute']}
|
||||
return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])}
|
||||
|
||||
|
||||
# In case of multiple enrichment (ip and domain), we need to filter out similar objects
|
||||
|
@ -166,24 +187,23 @@ def remove_duplicates(results):
|
|||
# Only one enrichment was performed so no duplicate
|
||||
if len(results) == 1:
|
||||
return results[0]
|
||||
elif len(results) == 2:
|
||||
final_result = results[0]
|
||||
obj_l2 = results[1]['Object']
|
||||
for o2 in obj_l2:
|
||||
if o2['name'] == "asn":
|
||||
key = "asn"
|
||||
elif o2['name'] == "ip-port":
|
||||
key = "ip"
|
||||
elif o2['name'] == "x509":
|
||||
key = "x509-fingerprint-sha256"
|
||||
elif o2['name'] == "geolocation":
|
||||
key = "latitude"
|
||||
if not check_if_present(o2, key, final_result['Object']):
|
||||
final_result['Object'].append(o2)
|
||||
|
||||
return final_result
|
||||
else:
|
||||
return []
|
||||
final_result = results[0]
|
||||
for i,result in enumerate(results[1:]):
|
||||
obj_l = results[i+1].get('Object', [])
|
||||
for o2 in obj_l:
|
||||
if o2['name'] == "asn":
|
||||
key = "asn"
|
||||
elif o2['name'] == "ip-port":
|
||||
key = "ip"
|
||||
elif o2['name'] == "x509":
|
||||
key = "x509-fingerprint-sha256"
|
||||
elif o2['name'] == "geolocation":
|
||||
key = "latitude"
|
||||
if not check_if_present(o2, key, final_result.get('Object', [])):
|
||||
final_result['Object'].append(o2)
|
||||
|
||||
return final_result
|
||||
|
||||
|
||||
def check_if_present(object, attribute_name, list_objects):
|
||||
|
@ -253,4 +273,4 @@ def introspection():
|
|||
|
||||
def version():
|
||||
moduleinfo['config'] = moduleconfig
|
||||
return moduleinfo
|
||||
return moduleinfo
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"censys_enrich": {
|
||||
"api_id" : "<api_id>",
|
||||
"api_secret": "<api_secret>"
|
||||
}
|
||||
}
|
|
@ -215,6 +215,25 @@ class TestExpansions(unittest.TestCase):
|
|||
response = self.misp_modules_post(query)
|
||||
self.assertEqual(self.get_values(response), '\nThis is an basic test docx file. ')
|
||||
|
||||
def test_censys(self):
|
||||
module_name = "censys_enrich"
|
||||
query = {
|
||||
"attribute": {"type" : "ip-dst", "value": "8.8.8.8", "uuid": ""},
|
||||
"module": module_name,
|
||||
"config": {}
|
||||
}
|
||||
if module_name in self.configs:
|
||||
query['config'] = self.configs[module_name]
|
||||
response = self.misp_modules_post(query)
|
||||
|
||||
if self.configs[module_name].get('api_id') == '<api_id>':
|
||||
self.assertTrue(self.get_errors(response).startswith('ERROR: param '))
|
||||
else:
|
||||
self.assertGreaterEqual(len(response.json().get('results', {}).get('Attribute')), 1)
|
||||
else:
|
||||
response = self.misp_modules_post(query)
|
||||
self.assertTrue(self.get_errors(response).startswith('Please provide config options'))
|
||||
|
||||
def test_farsight_passivedns(self):
|
||||
module_name = 'farsight_passivedns'
|
||||
if module_name in self.configs:
|
||||
|
|
Loading…
Reference in New Issue