From 5f7b1277130697665a122425192bd78a0f3b95cd Mon Sep 17 00:00:00 2001 From: chrisr3d Date: Tue, 15 Oct 2019 23:30:39 +0200 Subject: [PATCH] chg: Avoids returning empty values + easier results parsing --- misp_modules/modules/expansion/threatminer.py | 251 ++++++++---------- 1 file changed, 110 insertions(+), 141 deletions(-) diff --git a/misp_modules/modules/expansion/threatminer.py b/misp_modules/modules/expansion/threatminer.py index 2f899dc..292d00b 100755 --- a/misp_modules/modules/expansion/threatminer.py +++ b/misp_modules/modules/expansion/threatminer.py @@ -1,5 +1,6 @@ import json import requests +from collections import defaultdict misperrors = {'error': 'Error'} mispattributes = {'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'md5', 'sha1', 'sha256', 'sha512'], @@ -12,7 +13,112 @@ moduleinfo = {'version': '1', 'author': 'KX499', 'description': 'Get information from ThreatMiner', 'module-type': ['expansion']} -desc = '{}: Threatminer - {}' +class ThreatMiner(): + def __init__(self): + self.results = defaultdict(set) + self.comment = '{}: Threatminer - {}' + self.types_mapping = {'domain': '_get_domain', 'hostname': '_get_domain', + 'ip-dst': '_get_ip', 'ip-src': '_get_ip', + 'md5': '_get_hash', 'sha1': '_get_hash', + 'sha256': '_get_hash', 'sha512': '_get_hash'} + + @property + def parsed_results(self): + to_return = [] + for key, values in self.results.items(): + input_value, comment = key[:2] + types = [k for k in key[2:]] + to_return.append({'types': types, 'values': list(values), + 'comment': self.comment.format(input_value, comment)}) + return to_return + + def parse_query(self, request): + for input_type, to_call in self.types_mapping.items(): + if request.get(input_type): + getattr(self, to_call)(request[input_type]) + + def _get_domain(self, q): + queries_mapping = {1: ('_add_whois', 'whois'), 2: ('_add_ip', 'pdns'), + 3: ('_add_uri', 'uri'), 4: ('_add_hash', 'samples'), + 5: ('_add_domain', 'subdomain'), 6: ('_add_link', 'report')} + for flag, mapped in queries_mapping.items(): + req = requests.get('https://www.threatminer.org/domain.php', params={'q': q, 'api': 'True', 'rt': flag}) + if not req.status_code == 200: + continue + results = req.json().get('results') + if not results: + continue + to_call, comment = mapped + getattr(self, to_call)(results, q, comment) + + def _get_hash(self, q): + queries_mapping = {1: ('_add_filename', 'file'), 3: ('_add_network', 'network'), + 6: ('_add_text', 'detection'), 7: ('_add_hash', 'report')} + for flag, mapped in queries_mapping.items(): + req = requests.get('https://www.threatminer.org/sample.php', params={'q': q, 'api': 'True', 'rt': flag}) + if not req.status_code == 200: + continue + results = req.json().get('results') + if not results: + continue + to_call, comment = mapped + getattr(self, to_call)(results, q, comment) + + def _get_ip(self, q): + queries_mapping = {1: ('_add_whois', 'whois'), 2: ('_add_ip', 'pdns'), + 3: ('_add_uri', 'uri'), 4: ('_add_hash', 'samples'), + 5: ('_add_x509', 'ssl'), 6: ('_add_link', 'report')} + for flag, mapped in queries_mapping.items(): + req = requests.get('https://www.threatminer.org/host.php', params={'q': q, 'api': 'True', 'rt': flag}) + if not req.status_code == 200: + continue + results = req.json().get('results') + if not results: + continue + to_call, comment = mapped + getattr(self, to_call)(results, q, comment) + + def _add_domain(self, results, q, comment): + self.results[(q, comment, 'domain')].update({result for result in results if isinstance(result, str)}) + + def _add_filename(self, results, q, comment): + self.results[(q, comment, 'filename')].update({result['filename'] for result in results if result.get('file_name')}) + + def _add_hash(self, results, q, comment): + self.results[(q, comment, 'sha256')].update({result for result in results if isinstance(result, str)}) + + def _add_ip(self, results, q, comment): + self.results[(q, comment, 'ip-src', 'ip-dst')].update({result['ip'] for result in results if result.get('ip')}) + + def _add_link(self, results, q, comment): + self.results[(q, comment, 'link')].update({result['URL'] for result in results if result.get('URL')}) + + def _add_network(self, results, q, comment): + for result in results: + domains = result.get('domains') + if domains: + self.results[(q, comment, 'domain')].update({domain['domain'] for domain in domains if domain.get('domain')}) + hosts = result.get('hosts') + if hosts: + self.results[(q, comment, 'ip-src', 'ip-dst')].update({host for host in hosts if isinstance(host, str)}) + + def _add_text(self, results, q, comment): + for result in results: + detections = result.get('av_detections') + if detections: + self.results[(q, comment, 'text')].update({d['detection'] for d in detections if d.get('detection')}) + + def _add_uri(self, results, q, comment): + self.results[(q, comment, 'url')].update({result['uri'] for result in results if result.get('uri')}) + + def _add_whois(self, results, q, comment): + for result in results: + emails = result.get('whois', {}).get('emails') + if emails: + self.results[(q, comment, 'whois-registrant-email')].update({email for em_type, email in emails.items() if em_type == 'registrant' and email}) + + def _add_x509(self, results, q, comment): + self.results[(q, 'x509-fingerprint-sha1')].update({result for result in results if isinstance(result, str)}) def handler(q=False): @@ -21,146 +127,9 @@ def handler(q=False): q = json.loads(q) - r = {'results': []} - - if 'ip-src' in q: - r['results'] += get_ip(q['ip-src']) - if 'ip-dst' in q: - r['results'] += get_ip(q['ip-dst']) - if 'domain' in q: - r['results'] += get_domain(q['domain']) - if 'hostname' in q: - r['results'] += get_domain(q['hostname']) - if 'md5' in q: - r['results'] += get_hash(q['md5']) - if 'sha1' in q: - r['results'] += get_hash(q['sha1']) - if 'sha256' in q: - r['results'] += get_hash(q['sha256']) - if 'sha512' in q: - r['results'] += get_hash(q['sha512']) - - uniq = [] - for res in r['results']: - if res not in uniq: - uniq.append(res) - r['results'] = uniq - return r - - -def get_domain(q): - ret = [] - for flag in [1, 2, 3, 4, 5, 6]: - req = requests.get('https://www.threatminer.org/domain.php', params={'q': q, 'api': 'True', 'rt': flag}) - if not req.status_code == 200: - continue - results = req.json().get('results') - if not results: - continue - - for result in results: - if flag == 1: # whois - emails = result.get('whois', {}).get('emails') - if not emails: - continue - for em_type, email in emails.items(): - ret.append({'types': ['whois-registrant-email'], 'values': [email], 'comment': desc.format(q, 'whois')}) - if flag == 2: # pdns - ip = result.get('ip') - if ip: - ret.append({'types': ['ip-src', 'ip-dst'], 'values': [ip], 'comment': desc.format(q, 'pdns')}) - if flag == 3: # uri - uri = result.get('uri') - if uri: - ret.append({'types': ['url'], 'values': [uri], 'comment': desc.format(q, 'uri')}) - if flag == 4: # samples - if type(result) is str: - ret.append({'types': ['sha256'], 'values': [result], 'comment': desc.format(q, 'samples')}) - if flag == 5: # subdomains - if type(result) is str: - ret.append({'types': ['domain'], 'values': [result], 'comment': desc.format(q, 'subdomain')}) - if flag == 6: # reports - link = result.get('URL') - if link: - ret.append({'types': ['url'], 'values': [link], 'comment': desc.format(q, 'report')}) - - return ret - - -def get_ip(q): - ret = [] - for flag in [1, 2, 3, 4, 5, 6]: - req = requests.get('https://www.threatminer.org/host.php', params={'q': q, 'api': 'True', 'rt': flag}) - if not req.status_code == 200: - continue - results = req.json().get('results') - if not results: - continue - - for result in results: - if flag == 1: # whois - emails = result.get('whois', {}).get('emails') - if not emails: - continue - for em_type, email in emails.items(): - ret.append({'types': ['whois-registrant-email'], 'values': [email], 'comment': desc.format(q, 'whois')}) - if flag == 2: # pdns - ip = result.get('ip') - if ip: - ret.append({'types': ['ip-src', 'ip-dst'], 'values': [ip], 'comment': desc.format(q, 'pdns')}) - if flag == 3: # uri - uri = result.get('uri') - if uri: - ret.append({'types': ['url'], 'values': [uri], 'comment': desc.format(q, 'uri')}) - if flag == 4: # samples - if type(result) is str: - ret.append({'types': ['sha256'], 'values': [result], 'comment': desc.format(q, 'samples')}) - if flag == 5: # ssl - if type(result) is str: - ret.append({'types': ['x509-fingerprint-sha1'], 'values': [result], 'comment': desc.format(q, 'ssl')}) - if flag == 6: # reports - link = result.get('URL') - if link: - ret.append({'types': ['url'], 'values': [link], 'comment': desc.format(q, 'report')}) - - return ret - - -def get_hash(q): - ret = [] - for flag in [1, 3, 6, 7]: - req = requests.get('https://www.threatminer.org/sample.php', params={'q': q, 'api': 'True', 'rt': flag}) - if not req.status_code == 200: - continue - results = req.json().get('results') - if not results: - continue - - for result in results: - if flag == 1: # meta (filename) - name = result.get('file_name') - if name: - ret.append({'types': ['filename'], 'values': [name], 'comment': desc.format(q, 'file')}) - if flag == 3: # network - domains = result.get('domains') - for dom in domains: - if dom.get('domain'): - ret.append({'types': ['domain'], 'values': [dom['domain']], 'comment': desc.format(q, 'network')}) - - hosts = result.get('hosts') - for h in hosts: - if type(h) is str: - ret.append({'types': ['ip-src', 'ip-dst'], 'values': [h], 'comment': desc.format(q, 'network')}) - if flag == 6: # detections - detections = result.get('av_detections') - for d in detections: - if d.get('detection'): - ret.append({'types': ['text'], 'values': [d['detection']], 'comment': desc.format(q, 'detection')}) - if flag == 7: # report - if type(result) is str: - ret.append({'types': ['sha256'], 'values': [result], 'comment': desc.format(q, 'report')}) - - return ret + parser = ThreatMiner() + parser.parse_query(q) + return {'results': parser.parsed_results} def introspection():