mirror of https://github.com/MISP/misp-modules
commit
d1451ea168
2 changed files with 567 additions and 0 deletions
@ -0,0 +1,566 @@ |
||||
import json |
||||
import logging |
||||
import sys |
||||
import time |
||||
|
||||
from dnstrails import APIError |
||||
from dnstrails import DnsTrails |
||||
|
||||
log = logging.getLogger('dnstrails') |
||||
log.setLevel(logging.DEBUG) |
||||
ch = logging.StreamHandler(sys.stdout) |
||||
ch.setLevel(logging.DEBUG) |
||||
formatter = logging.Formatter( |
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
||||
ch.setFormatter(formatter) |
||||
log.addHandler(ch) |
||||
|
||||
misperrors = {'error': 'Error'} |
||||
mispattributes = { |
||||
'input': ['hostname', 'domain', 'ip-src', 'ip-dst'], |
||||
'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'dns-soa-email', |
||||
'whois-registrant-email', 'whois-registrant-phone', |
||||
'whois-registrant-name', |
||||
'whois-registrar', 'whois-creation-date', 'domain'] |
||||
} |
||||
|
||||
moduleinfo = {'version': '1', 'author': 'Sebastien Larinier @sebdraven', |
||||
'description': 'Query on securitytrails.com', |
||||
'module-type': ['expansion', 'hover']} |
||||
|
||||
# config fields that your code expects from the site admin |
||||
moduleconfig = ['apikey'] |
||||
|
||||
|
||||
def handler(q=False): |
||||
if q: |
||||
|
||||
request = json.loads(q) |
||||
|
||||
if not request.get('config') and not (request['config'].get('apikey')): |
||||
misperrors['error'] = 'DNS authentication is missing' |
||||
return misperrors |
||||
|
||||
api = DnsTrails(request['config'].get('apikey')) |
||||
|
||||
if not api: |
||||
misperrors['error'] = 'Onyphe Error instance api' |
||||
|
||||
ip = "" |
||||
dns_name = "" |
||||
|
||||
ip = '' |
||||
if request.get('ip-src'): |
||||
ip = request['ip-src'] |
||||
return handle_ip(api, ip, misperrors) |
||||
elif request.get('ip-dst'): |
||||
ip = request['ip-dst'] |
||||
return handle_ip(api, ip, misperrors) |
||||
elif request.get('domain'): |
||||
domain = request['domain'] |
||||
return handle_domain(api, domain, misperrors) |
||||
elif request.get('hostname'): |
||||
hostname = request['hostname'] |
||||
return handle_domain(api, hostname, misperrors) |
||||
else: |
||||
misperrors['error'] = "Unsupported attributes types" |
||||
return misperrors |
||||
else: |
||||
return False |
||||
|
||||
|
||||
def handle_domain(api, domain, misperrors): |
||||
result_filtered = {"results": []} |
||||
|
||||
r, status_ok = expand_domain_info(api, misperrors, domain) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] = misperrors['error'] + ' Error DNS result' |
||||
return misperrors |
||||
|
||||
time.sleep(1) |
||||
r, status_ok = expand_subdomains(api, domain) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] = misperrors['error'] + ' Error subdomains result' |
||||
return misperrors |
||||
|
||||
time.sleep(1) |
||||
r, status_ok = expand_whois(api, domain) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] = misperrors['error'] + ' Error whois result' |
||||
return misperrors |
||||
|
||||
time.sleep(1) |
||||
r, status_ok = expand_history_ipv4_ipv6(api, domain) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] = misperrors['error'] + ' Error history ipv4' |
||||
return misperrors |
||||
|
||||
time.sleep(1) |
||||
|
||||
r, status_ok = expand_history_dns(api, domain) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] = misperrors[ |
||||
'error'] + ' Error in expand History DNS' |
||||
return misperrors |
||||
|
||||
r, status_ok = expand_history_whois(api, domain) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] = misperrors['error'] + \ |
||||
' Error in expand History Whois' |
||||
return misperrors |
||||
|
||||
return result_filtered |
||||
|
||||
|
||||
def handle_ip(api, ip, misperrors): |
||||
result_filtered = {"results": []} |
||||
|
||||
r, status_ok = expand_searching_domain(api, ip) |
||||
|
||||
if status_ok: |
||||
if r: |
||||
result_filtered['results'].extend(r) |
||||
else: |
||||
misperrors['error'] += ' Error in expand searching domain' |
||||
return misperrors |
||||
|
||||
return result_filtered |
||||
|
||||
|
||||
def expand_domain_info(api, misperror, domain): |
||||
r = [] |
||||
status_ok = False |
||||
ns_servers = [] |
||||
list_ipv4 = [] |
||||
list_ipv6 = [] |
||||
servers_mx = [] |
||||
soa_hostnames = [] |
||||
|
||||
results = api.domain(domain) |
||||
|
||||
if results: |
||||
status_ok = True |
||||
if 'current_dns' in results: |
||||
if 'values' in results['current_dns']['ns']: |
||||
ns_servers = [ns_entry['nameserver'] for ns_entry in |
||||
results['current_dns']['ns']['values'] |
||||
if 'nameserver' in ns_entry] |
||||
if 'values' in results['current_dns']['a']: |
||||
list_ipv4 = [a_entry['ip'] for a_entry in |
||||
results['current_dns']['a']['values'] if |
||||
'ip' in a_entry] |
||||
|
||||
if 'values' in results['current_dns']['aaaa']: |
||||
list_ipv6 = [ipv6_entry['ipv6'] for ipv6_entry in |
||||
results['current_dns']['aaaa']['values'] if |
||||
'ipv6' in ipv6_entry] |
||||
|
||||
if 'values' in results['current_dns']['mx']: |
||||
servers_mx = [mx_entry['hostname'] for mx_entry in |
||||
results['current_dns']['mx']['values'] if |
||||
'hostname' in mx_entry] |
||||
if 'values' in results['current_dns']['soa']: |
||||
soa_hostnames = [soa_entry['email'] for soa_entry in |
||||
results['current_dns']['soa']['values'] if |
||||
'email' in soa_entry] |
||||
|
||||
if ns_servers: |
||||
r.append({'types': ['domain'], |
||||
'values': ns_servers, |
||||
'categories': ['Network activity'], |
||||
'comment': 'List of name servers of %s first seen %s ' % |
||||
(domain, |
||||
results['current_dns']['ns']['first_seen']) |
||||
}) |
||||
|
||||
if list_ipv4: |
||||
r.append({'types': ['domain|ip'], |
||||
'values': ['%s|%s' % (domain, ipv4) for ipv4 in |
||||
list_ipv4], |
||||
'categories': ['Network activity'], |
||||
|
||||
'comment': ' List ipv4 of %s first seen %s' % |
||||
(domain, |
||||
results['current_dns']['a']['first_seen']) |
||||
|
||||
}) |
||||
if list_ipv6: |
||||
r.append({'types': ['domain|ip'], |
||||
'values': ['%s|%s' % (domain, ipv6) for ipv6 in |
||||
list_ipv6], |
||||
'categories': ['Network activity'], |
||||
'comment': ' List ipv6 of %s first seen %s' % |
||||
(domain, |
||||
results['current_dns']['aaaa']['first_seen']) |
||||
|
||||
}) |
||||
|
||||
if servers_mx: |
||||
r.append({'types': ['domain'], |
||||
'values': servers_mx, |
||||
'categories': ['Network activity'], |
||||
'comment': ' List mx of %s first seen %s' % |
||||
(domain, |
||||
results['current_dns']['mx']['first_seen']) |
||||
|
||||
}) |
||||
if soa_hostnames: |
||||
r.append({'types': ['domain'], |
||||
'values': soa_hostnames, |
||||
'categories': ['Network activity'], |
||||
'comment': ' List soa of %s first seen %s' % |
||||
(domain, |
||||
results['current_dns']['soa']['first_seen']) |
||||
}) |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def expand_subdomains(api, domain): |
||||
r = [] |
||||
status_ok = False |
||||
|
||||
try: |
||||
results = api.subdomains(domain) |
||||
|
||||
if results: |
||||
status_ok = True |
||||
if 'subdomains' in results: |
||||
r.append({ |
||||
'types': ['domain'], |
||||
'values': ['%s.%s' % (sub, domain) |
||||
for sub in results['subdomains']], |
||||
'categories': ['Network activity'], |
||||
'comment': 'subdomains of %s' % domain |
||||
} |
||||
|
||||
) |
||||
except APIError as e: |
||||
misperrors['error'] = e.value |
||||
return [], False |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def expand_whois(api, domain): |
||||
r = [] |
||||
status_ok = False |
||||
|
||||
try: |
||||
results = api.whois(domain) |
||||
|
||||
if results: |
||||
status_ok = True |
||||
item_registrant = __select_registrant_item(results) |
||||
if item_registrant: |
||||
|
||||
if 'email' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-registrant-email'], |
||||
'values': [item_registrant[0]['email']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois information of %s by securitytrails' |
||||
% domain |
||||
} |
||||
) |
||||
|
||||
if 'telephone' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-registrant-phone'], |
||||
'values': [item_registrant[0]['telephone']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois information of %s by securitytrails' |
||||
% domain |
||||
} |
||||
) |
||||
|
||||
if 'name' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-registrant-name'], |
||||
'values': [item_registrant[0]['name']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois information of %s by securitytrails' |
||||
% domain |
||||
} |
||||
) |
||||
|
||||
if 'registrarName' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-registrar'], |
||||
'values': [item_registrant[0]['registrarName']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois information of %s by securitytrails' |
||||
% domain |
||||
} |
||||
) |
||||
|
||||
if 'createdDate' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-creation-date'], |
||||
'values': [item_registrant[0]['createdDate']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois information of %s by securitytrails' |
||||
% domain |
||||
} |
||||
) |
||||
|
||||
except APIError as e: |
||||
misperrors['error'] = e.value |
||||
return [], False |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def expand_history_ipv4_ipv6(api, domain): |
||||
r = [] |
||||
status_ok = False |
||||
|
||||
try: |
||||
results = api.history_dns_ipv4(domain) |
||||
|
||||
if results: |
||||
status_ok = True |
||||
r.extend(__history_ip(results, domain)) |
||||
|
||||
time.sleep(1) |
||||
results = api.history_dns_aaaa(domain) |
||||
|
||||
if results: |
||||
status_ok = True |
||||
r.extend(__history_ip(results, domain, type_ip='ipv6')) |
||||
|
||||
except APIError as e: |
||||
misperrors['error'] = e.value |
||||
return [], False |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def expand_history_dns(api, domain): |
||||
r = [] |
||||
status_ok = False |
||||
|
||||
try: |
||||
|
||||
results = api.history_dns_ns(domain) |
||||
if results: |
||||
r.extend(__history_dns(results, domain, 'nameserver', 'ns')) |
||||
|
||||
time.sleep(1) |
||||
|
||||
results = api.history_dns_soa(domain) |
||||
|
||||
if results: |
||||
r.extend(__history_dns(results, domain, 'email', 'soa')) |
||||
|
||||
time.sleep(1) |
||||
|
||||
results = api.history_dns_mx(domain) |
||||
|
||||
if results: |
||||
status_ok = True |
||||
r.extend(__history_dns(results, domain, 'host', 'mx')) |
||||
|
||||
except APIError as e: |
||||
misperrors['error'] = e.value |
||||
return [], False |
||||
|
||||
status_ok = True |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def expand_history_whois(api, domain): |
||||
r = [] |
||||
status_ok = False |
||||
try: |
||||
results = api.history_whois(domain) |
||||
|
||||
if results: |
||||
|
||||
if 'items' in results['result']: |
||||
for item in results['result']['items']: |
||||
item_registrant = __select_registrant_item(item) |
||||
|
||||
r.append( |
||||
{ |
||||
'types': ['domain'], |
||||
'values': item['nameServers'], |
||||
'categories': ['Network activity'], |
||||
'comment': 'Whois history Name Servers of %s ' |
||||
'Status: %s ' % ( |
||||
domain, ' '.join(item['status'])) |
||||
|
||||
} |
||||
) |
||||
if item_registrant: |
||||
|
||||
if 'email' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-registrant-email'], |
||||
'values': [item_registrant[0]['email']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois history registrant email of %s' |
||||
'Status: %s' % ( |
||||
domain, |
||||
' '.join(item['status'])) |
||||
} |
||||
) |
||||
|
||||
if 'telephone' in item_registrant[0]: |
||||
r.append( |
||||
{ |
||||
'types': ['whois-registrant-phone'], |
||||
'values': [item_registrant[0]['telephone']], |
||||
'categories': ['Attribution'], |
||||
'comment': 'Whois history registrant phone of %s' |
||||
'Status: %s' % ( |
||||
domain, |
||||
' '.join(item['status'])) |
||||
} |
||||
) |
||||
|
||||
except APIError as e: |
||||
misperrors['error'] = e.value |
||||
return [], False |
||||
status_ok = True |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def __history_ip(results, domain, type_ip='ip'): |
||||
r = [] |
||||
if 'records' in results: |
||||
for record in results['records']: |
||||
if 'values' in record: |
||||
for item in record['values']: |
||||
r.append( |
||||
{'types': ['domain|ip'], |
||||
'values': ['%s|%s' % (domain, item[type_ip])], |
||||
'categories': ['Network activity'], |
||||
'comment': 'History IP on securitytrails %s ' |
||||
'last seen: %s first seen: %s' % |
||||
(domain, record['last_seen'], |
||||
record['first_seen']) |
||||
} |
||||
) |
||||
|
||||
return r |
||||
|
||||
|
||||
def __history_dns(results, domain, type_serv, service): |
||||
r = [] |
||||
|
||||
if 'records' in results: |
||||
for record in results['records']: |
||||
if 'values' in record: |
||||
values = record['values'] |
||||
if type(values) is list: |
||||
|
||||
for item in record['values']: |
||||
r.append( |
||||
{'types': ['domain|ip'], |
||||
'values': [item[type_serv]], |
||||
'categories': ['Network activity'], |
||||
'comment': 'history %s of %s last seen: %s first seen: %s' % |
||||
(service, domain, record['last_seen'], |
||||
record['first_seen']) |
||||
} |
||||
) |
||||
else: |
||||
r.append( |
||||
{'types': ['domain|ip'], |
||||
'values': [values[type_serv]], |
||||
'categories': ['Network activity'], |
||||
'comment': 'history %s of %s last seen: %s first seen: %s' % |
||||
(service, domain, record['last_seen'], |
||||
record['first_seen']) |
||||
} |
||||
) |
||||
return r |
||||
|
||||
|
||||
def expand_searching_domain(api, ip): |
||||
r = [] |
||||
status_ok = False |
||||
|
||||
try: |
||||
results = api.searching_domains(ipv4=ip) |
||||
|
||||
if results: |
||||
if 'records' in results: |
||||
res = [(r['host_provider'], r['hostname'], r['whois']) |
||||
for r in results['records']] |
||||
|
||||
for host_provider, hostname, whois in res: |
||||
comment = 'domain for %s by %s' % (ip, host_provider[0]) |
||||
if whois['registrar']: |
||||
comment = comment + ' registrar %s' % whois['registrar'] |
||||
|
||||
r.append( |
||||
{ |
||||
'types': ['domain'], |
||||
'category': ['Network activity'], |
||||
'values': [hostname], |
||||
'comment': comment |
||||
|
||||
} |
||||
) |
||||
status_ok = True |
||||
except APIError as e: |
||||
misperrors['error'] = e.value |
||||
return [], False |
||||
|
||||
return r, status_ok |
||||
|
||||
|
||||
def introspection(): |
||||
return mispattributes |
||||
|
||||
|
||||
def version(): |
||||
moduleinfo['config'] = moduleconfig |
||||
return moduleinfo |
||||
|
||||
|
||||
def __select_registrant_item(entry): |
||||
res = None |
||||
if 'contacts' in entry: |
||||
res = list(filter(lambda x: x['type'] == 'registrant', |
||||
entry['contacts'])) |
||||
|
||||
if 'contact' in entry: |
||||
res = list(filter(lambda x: x['type'] == 'registrant', |
||||
entry['contact'])) |
||||
|
||||
return res |
Loading…
Reference in new issue