mirror of https://github.com/MISP/misp-modules
235 lines
11 KiB
Python
Executable File
235 lines
11 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
try:
|
|
import shodan
|
|
except ImportError:
|
|
print("shodan module not installed.")
|
|
from . import check_input_attribute, standard_error_message
|
|
from datetime import datetime
|
|
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
|
|
|
misperrors = {'error': 'Error'}
|
|
mispattributes = {'input': ['ip-src', 'ip-dst'],
|
|
'format': 'misp_standard'}
|
|
moduleinfo = {'version': '0.2', 'author': 'Raphaël Vinot',
|
|
'description': 'Query on Shodan',
|
|
'module-type': ['expansion']}
|
|
|
|
moduleconfig = ['apikey']
|
|
|
|
|
|
class ShodanParser():
|
|
def __init__(self, attribute):
|
|
self.misp_event = MISPEvent()
|
|
self.attribute = MISPAttribute()
|
|
self.attribute.from_dict(**attribute)
|
|
self.misp_event.add_attribute(**self.attribute)
|
|
self.ip_address_mapping = {
|
|
'asn': {'type': 'AS', 'object_relation': 'asn'},
|
|
'city': {'type': 'text', 'object_relation': 'city'},
|
|
'country_code': {'type': 'text', 'object_relation': 'country-code'},
|
|
'country_name': {'type': 'text', 'object_relation': 'country'},
|
|
'isp': {'type': 'text', 'object_relation': 'ISP'},
|
|
'latitude': {'type': 'float', 'object_relation': 'latitude'},
|
|
'longitude': {'type': 'float', 'object_relation': 'longitude'},
|
|
'org': {'type': 'text', 'object_relation': 'organization'},
|
|
'postal_code': {'type': 'text', 'object_relation': 'zipcode'},
|
|
'region_code': {'type': 'text', 'object_relation': 'region-code'}
|
|
}
|
|
self.ip_port_mapping = {
|
|
'domains': {'type': 'domain', 'object_relation': 'domain'},
|
|
'hostnames': {'type': 'hostname', 'object_relation': 'hostname'}
|
|
}
|
|
self.vulnerability_mapping = {
|
|
'cvss': {'type': 'float', 'object_relation': 'cvss-score'},
|
|
'summary': {'type': 'text', 'object_relation': 'summary'}
|
|
}
|
|
self.x509_mapping = {
|
|
'bits': {'type': 'text', 'object_relation': 'pubkey-info-size'},
|
|
'expires': {'type': 'datetime', 'object_relation': 'validity-not-after'},
|
|
'issued': {'type': 'datetime', 'object_relation': 'validity-not-before'},
|
|
'issuer': {'type': 'text', 'object_relation': 'issuer'},
|
|
'serial': {'type': 'text', 'object_relation': 'serial-number'},
|
|
'sig_alg': {'type': 'text', 'object_relation': 'signature_algorithm'},
|
|
'subject': {'type': 'text', 'object_relation': 'subject'},
|
|
'type': {'type': 'text', 'object_relation': 'pubkey-info-algorithm'},
|
|
'version': {'type': 'text', 'object_relation': 'version'}
|
|
}
|
|
|
|
def query_shodan(self, apikey):
|
|
# Query Shodan and get the results in a json blob
|
|
api = shodan.Shodan(apikey)
|
|
query_results = api.host(self.attribute.value)
|
|
|
|
# Parse the information about the IP address used as input
|
|
ip_address_attributes = []
|
|
for feature, mapping in self.ip_address_mapping.items():
|
|
if query_results.get(feature):
|
|
attribute = {'value': query_results[feature]}
|
|
attribute.update(mapping)
|
|
ip_address_attributes.append(attribute)
|
|
if ip_address_attributes:
|
|
ip_address_object = MISPObject('ip-api-address')
|
|
for attribute in ip_address_attributes:
|
|
ip_address_object.add_attribute(**attribute)
|
|
ip_address_object.add_reference(self.attribute.uuid, 'describes')
|
|
self.misp_event.add_object(ip_address_object)
|
|
|
|
# Parse the hostnames / domains and ports associated with the IP address
|
|
if query_results.get('ports'):
|
|
ip_port_object = MISPObject('ip-port')
|
|
ip_port_object.add_attribute(**self._get_source_attribute())
|
|
feature = self.attribute.type.split('-')[1]
|
|
for port in query_results['ports']:
|
|
attribute = {
|
|
'type': 'port',
|
|
'object_relation': f'{feature}-port',
|
|
'value': port
|
|
}
|
|
ip_port_object.add_attribute(**attribute)
|
|
for feature, mapping in self.ip_port_mapping.items():
|
|
for value in query_results.get(feature, []):
|
|
attribute = {'value': value}
|
|
attribute.update(mapping)
|
|
ip_port_object.add_attribute(**attribute)
|
|
ip_port_object.add_reference(self.attribute.uuid, 'extends')
|
|
self.misp_event.add_object(ip_port_object)
|
|
else:
|
|
if any(query_results.get(feature) for feature in ('domains', 'hostnames')):
|
|
domain_ip_object = MISPObject('domain-ip')
|
|
domain_ip_object.add_attribute(**self._get_source_attribute())
|
|
for feature in ('domains', 'hostnames'):
|
|
for value in query_results[feature]:
|
|
attribute = {
|
|
'type': 'domain',
|
|
'object_relation': 'domain',
|
|
'value': value
|
|
}
|
|
domain_ip_object.add_attribute(**attribute)
|
|
domain_ip_object.add_reference(self.attribute.uuid, 'extends')
|
|
self.misp_event.add_object(domain_ip_object)
|
|
|
|
# Parse data within the "data" field
|
|
if query_results.get('vulns'):
|
|
vulnerabilities = {}
|
|
for data in query_results['data']:
|
|
# Parse vulnerabilities
|
|
if data.get('vulns'):
|
|
for cve, vulnerability in data['vulns'].items():
|
|
if cve not in vulnerabilities:
|
|
vulnerabilities[cve] = vulnerability
|
|
# Also parse the certificates
|
|
if data.get('ssl'):
|
|
self._parse_cert(data['ssl'])
|
|
for cve, vulnerability in vulnerabilities.items():
|
|
vulnerability_object = MISPObject('vulnerability')
|
|
vulnerability_object.add_attribute(**{
|
|
'type': 'vulnerability',
|
|
'object_relation': 'id',
|
|
'value': cve
|
|
})
|
|
for feature, mapping in self.vulnerability_mapping.items():
|
|
if vulnerability.get(feature):
|
|
attribute = {'value': vulnerability[feature]}
|
|
attribute.update(mapping)
|
|
vulnerability_object.add_attribute(**attribute)
|
|
if vulnerability.get('references'):
|
|
for reference in vulnerability['references']:
|
|
vulnerability_object.add_attribute(**{
|
|
'type': 'link',
|
|
'object_relation': 'references',
|
|
'value': reference
|
|
})
|
|
vulnerability_object.add_reference(self.attribute.uuid, 'vulnerability-of')
|
|
self.misp_event.add_object(vulnerability_object)
|
|
for cve_id in query_results['vulns']:
|
|
if cve_id not in vulnerabilities:
|
|
attribute = {
|
|
'type': 'vulnerability',
|
|
'value': cve_id
|
|
}
|
|
self.misp_event.add_attribute(**attribute)
|
|
else:
|
|
# We have no vulnerability data, we only check if we have
|
|
# certificates within the "data" field
|
|
for data in query_results['data']:
|
|
if data.get('ssl'):
|
|
self._parse_cert(data['ssl']['cert'])
|
|
|
|
def get_result(self):
|
|
event = json.loads(self.misp_event.to_json())
|
|
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
|
|
return {'results': results}
|
|
|
|
# When we want to add the IP address information in objects such as the
|
|
# domain-ip or ip-port objects referencing the input IP address attribute
|
|
def _get_source_attribute(self):
|
|
return {
|
|
'type': self.attribute.type,
|
|
'object_relation': self.attribute.type,
|
|
'value': self.attribute.value
|
|
}
|
|
|
|
def _parse_cert(self, certificate):
|
|
x509_object = MISPObject('x509')
|
|
for feature in ('serial', 'sig_alg', 'version'):
|
|
if certificate.get(feature):
|
|
attribute = {'value': certificate[feature]}
|
|
attribute.update(self.x509_mapping[feature])
|
|
x509_object.add_attribute(**attribute)
|
|
# Parse issuer and subject value
|
|
for feature in ('issuer', 'subject'):
|
|
if certificate.get(feature):
|
|
attribute_value = (f'{identifier}={value}' for identifier, value in certificate[feature].items())
|
|
attribute = {'value': f'/{"/".join(attribute_value)}'}
|
|
attribute.update(self.x509_mapping[feature])
|
|
x509_object.add_attribute(**attribute)
|
|
# Parse datetime attributes
|
|
for feature in ('expires', 'issued'):
|
|
if certificate.get(feature):
|
|
attribute = {'value': datetime.strptime(certificate[feature], '%Y%m%d%H%M%SZ')}
|
|
attribute.update(self.x509_mapping[feature])
|
|
x509_object.add_attribute(**attribute)
|
|
# Parse fingerprints
|
|
if certificate.get('fingerprint'):
|
|
for hash_type, hash_value in certificate['fingerprint'].items():
|
|
x509_object.add_attribute(**{
|
|
'type': f'x509-fingerprint-{hash_type}',
|
|
'object_relation': f'x509-fingerprint-{hash_type}',
|
|
'value': hash_value
|
|
})
|
|
# Parse public key related info
|
|
if certificate.get('pubkey'):
|
|
for feature, value in certificate['pubkey'].items():
|
|
attribute = {'value': value}
|
|
attribute.update(self.x509_mapping[feature])
|
|
x509_object.add_attribute(**attribute)
|
|
x509_object.add_reference(self.attribute.uuid, 'identifies')
|
|
self.misp_event.add_object(x509_object)
|
|
|
|
|
|
def handler(q=False):
|
|
if q is False:
|
|
return False
|
|
request = json.loads(q)
|
|
if not request.get('config', {}).get('apikey'):
|
|
return {'error': 'Shodan authentication is missing'}
|
|
if not request.get('attribute') or not check_input_attribute(request['attribute']):
|
|
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
|
|
attribute = request['attribute']
|
|
if attribute['type'] not in mispattributes['input']:
|
|
return {'error': 'Unsupported attribute type.'}
|
|
shodan_parser = ShodanParser(attribute)
|
|
shodan_parser.query_shodan(request['config']['apikey'])
|
|
return shodan_parser.get_result()
|
|
|
|
|
|
def introspection():
|
|
return mispattributes
|
|
|
|
|
|
def version():
|
|
moduleinfo['config'] = moduleconfig
|
|
return moduleinfo
|