From 4a8ccb54fba29736a3600677b28814d41d7d6f98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 15 Dec 2016 16:49:56 +0100 Subject: [PATCH] Refactoring of domaintools expansion module --- misp_modules/modules/expansion/domaintools.py | 159 ++++++++++++------ 1 file changed, 105 insertions(+), 54 deletions(-) diff --git a/misp_modules/modules/expansion/domaintools.py b/misp_modules/modules/expansion/domaintools.py index 6382726..8af89ed 100755 --- a/misp_modules/modules/expansion/domaintools.py +++ b/misp_modules/modules/expansion/domaintools.py @@ -15,9 +15,10 @@ log.addHandler(ch) misperrors = {'error': 'Error'} mispattributes = { - 'input': ['domain'], + 'input': ['domain', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email', + 'whois-registrant-name', 'whois-registrant-phone', 'ip-src', 'ip-dst'], 'output': ['whois-registrant-email', 'whois-registrant-phone', 'whois-registrant-name', - 'whois-registrar', 'whois-creation-date', 'freetext'] + 'whois-registrar', 'whois-creation-date', 'freetext', 'domain'] } moduleinfo = { @@ -29,6 +30,12 @@ moduleinfo = { moduleconfig = ['username', 'api_key'] +query_profiles = [ + {'inputs': ['domain'], 'services': ['parsed_whois', 'domain_profile', 'reputation', 'reverse_ip']}, + {'inputs': ['email-src', 'email-dst', 'target-email', 'whois-registrant-email', 'whois-registrant-name', 'whois-registrant-phone'], 'services': ['reverse_whois']}, + {'inputs': ['ip-src', 'ip-dst'], 'services': ['host_domains', 'reverse_ip_whois']} +] + class DomainTools(object): @@ -39,6 +46,7 @@ class DomainTools(object): self.registrar = {} self.creation_date = {} self.domain_ip = {} + self.domain = {} self.risk = () self.freetext = '' @@ -68,6 +76,9 @@ class DomainTools(object): def add_ip(self, ip, comment=None): self.domain_ip = self._add_value(self.domain_ip, ip, comment) + def add_domain(self, domain, comment=None): + self.domain = self._add_value(self.domain, domain, comment) + def dump(self): to_return = [] if self.reg_mail: @@ -88,6 +99,9 @@ class DomainTools(object): if self.domain_ip: for ip, comment in self.domain_ip.items(): to_return.append({'types': ['ip-dst', 'ip-src'], 'values': [ip], 'comment': comment or ''}) + if self.domain: + for domain, comment in self.domain.items(): + to_return.append({'type': 'domain', 'values': [domain], 'comment': comment or ''}) if self.freetext: to_return.append({'type': 'freetext', 'values': [self.freetext], 'comment': 'Freetext import'}) if self.risk: @@ -95,65 +109,14 @@ class DomainTools(object): return to_return -def handler(q=False): - if not q: - return q - - request = json.loads(q) - to_query = None - for t in mispattributes['input']: - to_query = request.get(t) - if to_query: - break - if not to_query: - misperrors['error'] = "Unsupported attributes type" - return misperrors - - if request.get('config'): - if (request['config'].get('username') is None) or (request['config'].get('api_key') is None): - misperrors['error'] = 'DomainTools authentication is incomplete' - return misperrors - else: - domtools = API(request['config'].get('username'), request['config'].get('api_key')) - else: - misperrors['error'] = 'DomainTools authentication is missing' - return misperrors - +def parsed_whois(domtools, to_query, values): whois_entry = domtools.parsed_whois(to_query) - profile = domtools.domain_profile(to_query) - # NOTE: profile['website_data']['response_code'] could be used to see if the host is still up. Maybe set a tag. - reputation = domtools.reputation(to_query, include_reasons=True) - # NOTE: use that value in a tag when we will have attribute level tagging - values = DomainTools() - if whois_entry.get('error'): misperrors['error'] = whois_entry['error']['message'] return misperrors - if profile.get('error'): - misperrors['error'] = profile['error']['message'] - return misperrors - - if reputation and not reputation.get('error'): - reasons = ', '.join(reputation['reasons']) - values.risk = [reputation['risk_score'], 'Risk value of {} (via Domain Tools), Reasons: {}'.format(to_query, reasons)] - if whois_entry.get('registrant'): values.add_name(whois_entry['registrant'], 'Parsed registrant') - if profile.get('registrant'): - values.add_name(profile['registrant']['name'], 'Profile registrant') - - if profile.get('server'): - other_domains = profile['server']['other_domains'] - values.add_ip(profile['server']['ip_address'], 'IP of {} (via DomainTools). Has {} other domains.'.format(to_query, other_domains)) - - if profile.get('registration'): - if profile['registration'].get('created'): - values.add_creation_date(profile['registration']['created'], 'created') - if profile['registration'].get('updated'): - values.add_creation_date(profile['registration']['updated'], 'updated') - if profile['registration'].get('registrar'): - values.add_registrar(profile['registration']['registrar'], 'name') if whois_entry.get('registration'): values.add_creation_date(whois_entry['registration']['created'], 'timestamp') @@ -180,6 +143,94 @@ def handler(q=False): for mail in whois_entry.emails(): if mail not in values.reg_mail.keys(): values.add_mail(mail, 'Maybe registrar') + return values + + +def domain_profile(domtools, to_query, values): + profile = domtools.domain_profile(to_query) + # NOTE: profile['website_data']['response_code'] could be used to see if the host is still up. Maybe set a tag. + if profile.get('error'): + misperrors['error'] = profile['error']['message'] + return misperrors + + if profile.get('registrant'): + values.add_name(profile['registrant']['name'], 'Profile registrant') + + if profile.get('server'): + other_domains = profile['server']['other_domains'] + values.add_ip(profile['server']['ip_address'], 'IP of {} (via DomainTools). Has {} other domains.'.format(to_query, other_domains)) + + if profile.get('registration'): + if profile['registration'].get('created'): + values.add_creation_date(profile['registration']['created'], 'created') + if profile['registration'].get('updated'): + values.add_creation_date(profile['registration']['updated'], 'updated') + if profile['registration'].get('registrar'): + values.add_registrar(profile['registration']['registrar'], 'name') + return values + + +def reputation(domtools, to_query, values): + rep = domtools.reputation(to_query, include_reasons=True) + # NOTE: use that value in a tag when we will have attribute level tagging + + if rep and not rep.get('error'): + reasons = ', '.join(rep['reasons']) + values.risk = [rep['risk_score'], 'Risk value of {} (via Domain Tools), Reasons: {}'.format(to_query, reasons)] + + return values + + +def reverse_ip(domtools, to_query, values): + rev_ip = domtools.reverse_ip(to_query) + if rev_ip and not rev_ip.get('error'): + ip_addresses = rev_ip['ip_addresses'] + values.add_ip(ip_addresses['ip_address'], 'IP of {} (via DomainTools). Has {} other domains.'.format(to_query, ip_addresses['domain_count'])) + for d in ip_addresses['domain_names']: + values.add_domain(d, 'Other domain on {}.'.format(ip_addresses['ip_address'])) + return values + + +def get_services(request): + for t in mispattributes['input']: + to_query = request.get(t) + if not to_query: + continue + for p in query_profiles: + if t in p['inputs']: + return p['services'] + + +def handler(q=False): + if not q: + return q + + request = json.loads(q) + to_query = None + for t in mispattributes['input']: + to_query = request.get(t) + if to_query: + break + if not to_query: + misperrors['error'] = "Unsupported attributes type" + return misperrors + + if request.get('config'): + if (request['config'].get('username') is None) or (request['config'].get('api_key') is None): + misperrors['error'] = 'DomainTools authentication is incomplete' + return misperrors + else: + domtools = API(request['config'].get('username'), request['config'].get('api_key')) + else: + misperrors['error'] = 'DomainTools authentication is missing' + return misperrors + + values = DomainTools() + services = get_services(request) + if services: + for s in services: + globals()[s](domtools, to_query, values) + return {'results': values.dump()}