From 0ccd535f3dc257eb05dea47e6a0065b7c8d7a729 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Tue, 15 Jan 2019 16:20:38 +0100 Subject: [PATCH] chg: Cleanup, better digest --- client/bin/urlabuse | 8 +++--- client/pyurlabuse/api.py | 26 ++++++++---------- urlabuse/urlabuse.py | 59 +++++++++++++++++++++++++++++++++++----- website/web/__init__.py | 52 ++++++----------------------------- 4 files changed, 75 insertions(+), 70 deletions(-) diff --git a/client/bin/urlabuse b/client/bin/urlabuse index b0db71a..4c0d1b5 100755 --- a/client/bin/urlabuse +++ b/client/bin/urlabuse @@ -12,7 +12,7 @@ if __name__ == '__main__': parser.add_argument('--url', type=str, help='URL of the instance.') parser.add_argument('--query', help='URL to lookup') - parser.add_argument('--email', action='store_true', help='Return the email template') + parser.add_argument('--digest', action='store_true', help='Return the digest') args = parser.parse_args() @@ -21,8 +21,8 @@ if __name__ == '__main__': else: urlabuse = PyURLAbuse() - response = urlabuse.run_query(args.query, args.email) - if args.email: - print(response['mail']) + response = urlabuse.run_query(args.query, args.digest) + if args.digest: + print(response['digest'][0]) else: print(json.dumps(response, indent=2)) diff --git a/client/pyurlabuse/api.py b/client/pyurlabuse/api.py index 0d3be5e..8f4c3cd 100644 --- a/client/pyurlabuse/api.py +++ b/client/pyurlabuse/api.py @@ -21,7 +21,7 @@ class PyURLAbuse(object): return r.status_code == 200 def get_result(self, job_id): - response = self.session.get(urljoin(self.url, f'_result/{job_id}')) + response = self.session.get(urljoin(self.url, '_result/{}'.format(job_id))) if response.status_code == 202: return None else: @@ -117,13 +117,11 @@ class PyURLAbuse(object): content.append('\t\tVirusTotal positive detections: {} out of {}'.format(res[1], res[2])) return '\n\n '.join(content) - def run_query(self, q, return_mail_template=False): - cached = self.get_cache(q) - if len(cached[0][q]) > 0: - to_return = {'info': 'Used cached content', 'result': cached} - if return_mail_template: - to_return['mail'] = self.make_mail_template(cached) - return to_return + def run_query(self, q, with_digest=False): + cached = self.get_cache(q, with_digest) + if len(cached['result']) > 0: + cached['info'] = 'Used cached content' + return cached job_id = self.urls(q) all_urls = None while True: @@ -174,13 +172,11 @@ class PyURLAbuse(object): waiting = True time.sleep(.5) time.sleep(1) - cached = self.get_cache(q) - to_return = {'info': 'New query, all the details may not be available.', 'result': cached} - if return_mail_template: - to_return['mail'] = self.make_mail_template(cached) - return to_return + cached = self.get_cache(q, with_digest) + cached['info'] = 'New query, all the details may not be available.' + return cached - def get_cache(self, q): - query = {'query': q} + def get_cache(self, q, digest=False): + query = {'query': q, 'digest': digest} response = self.session.post(urljoin(self.url, 'get_cache'), data=json.dumps(query)) return response.json() diff --git a/urlabuse/urlabuse.py b/urlabuse/urlabuse.py index 88346bf..4731a86 100644 --- a/urlabuse/urlabuse.py +++ b/urlabuse/urlabuse.py @@ -501,13 +501,58 @@ class Query(): data.update(ip_data) return {url: data}, redirects - def cached(self, url): + def cached(self, url, digest=False): url_data, redirects = self.get_url_data(url) to_return = [url_data] - if redirects: - for u in redirects: - if u == url: - continue - data, redir = self.get_url_data(u) - to_return.append(data) + for u in redirects: + if u == url: + continue + data, redir = self.get_url_data(u) + to_return.append(data) + if digest: + return {'result': to_return, 'digest': self.digest(to_return)} + return {'result': to_return} + + def ip_details_digest(self, ips, all_info, all_asns, all_mails): + to_return = '' + for ip in ips: + to_return += '\t' + ip + '\n' + data = all_info[ip] + if data.get('bgpranking'): + to_return += '\t\tis announced by {} ({}). Position {}/{}.'.format( + data['bgpranking'][2], data['bgpranking'][0], + data['bgpranking'][4], data['bgpranking'][5]) + all_asns.add('{} ({})'.format(data['bgpranking'][2], data['bgpranking'][0])) + if data.get('whois'): + all_mails.update(data.get('whois')) + to_return += '\n\t\tContacts: {}\n'.format(', '.join(data.get('whois'))) return to_return + + def digest(self, data): + to_return = '' + all_mails = set() + all_asns = set() + for entry in data: + # Each URL we're redirected to + for url, info in entry.items(): + # info contains the information we got for the URL. + to_return += '\n{}\n'.format(url) + if 'whois' in info: + all_mails.update(info['whois']) + to_return += '\tContacts: {}\n'.format(', '.join(info['whois'])) + if 'vt' in info and len(info['vt']) == 4: + to_return += '\t{} out of {} positive detections in VT - {}\n'.format( + info['vt'][2], info['vt'][3], info['vt'][1]) + if 'gsb' in info: + to_return += '\tKnown as malicious on Google Safe Browsing: {}\n'.format(info['gsb']) + if 'phishtank' in info: + to_return += '\tKnown on PhishTank: {}\n'.format(info['phishtank']) + + if 'dns'in info: + ipv4, ipv6 = info['dns'] + if ipv4 is not None: + to_return += self.ip_details_digest(ipv4, info, all_asns, all_mails) + if ipv6 is not None: + to_return += self.ip_details_digest(ipv6, info, all_asns, all_mails) + to_return += '\n\tAll contacts: {}\n'.format(', '.join(all_mails)) + return to_return, list(all_mails), list(all_asns) diff --git a/website/web/__init__.py b/website/web/__init__.py index 23d8da5..d2a6852 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -296,62 +296,26 @@ def create_app(configfile=None): def get_cache(): data = request.get_json(force=True) url = data["query"] - data = urlabuse_query.cached(url) + if 'digest' in data: + digest = data["digest"] + else: + digest = False + data = urlabuse_query.cached(url, digest) return Response(json.dumps(data), mimetype='application/json') - def digest(data): - to_return = '' - all_mails = set() - for entry in data: - for url, info in list(entry.items()): - to_return += '\n{}\n'.format(url) - if info.get('whois'): - all_mails.update(info.get('whois')) - to_return += '\tContacts: {}\n'.format(', '.join(info.get('whois'))) - if info.get('vt') and len(info.get('vt')) == 4: - vtstuff = info.get('vt') - to_return += '\t{} out of {} positive detections in VT - {}\n'.format( - vtstuff[2], vtstuff[3], vtstuff[1]) - if info.get('gsb'): - to_return += '\tKnown as malicious on Google Safe Browsing: {}\n'.format(info.get('gsb')) - if info.get('phishtank'): - to_return += '\tKnown as malicious on PhishTank\n' - if info.get('dns'): - ipv4, ipv6 = info.get('dns') - if ipv4 is not None: - for ip in ipv4: - to_return += '\t' + ip + '\n' - data = info[ip] - if data.get('bgpranking'): - to_return += '\t\t(PTR: {}) is announced by {} ({}).\n'.format(*(data.get('bgp')[:3])) - if data.get('whois'): - all_mails.update(data.get('whois')) - to_return += '\t\tContacts: {}\n'.format(', '.join(data.get('whois'))) - if ipv6 is not None: - for ip in ipv6: - to_return += '\t' + ip + '\n' - data = info[ip] - if data.get('bgpranking'): - to_return += '\t\t(PTR: {}) is announced by {} ({}).\n'.format(*(data.get('bgp')[:3])) - if data.get('whois'): - all_mails.update(data.get('whois')) - to_return += '\t\tContacts: {}\n'.format(', '.join(data.get('whois'))) - to_return += '\tAll contacts: {}\n'.format(', '.join(all_mails)) - return to_return - def send(url, ip='', autosend=False): if not urlabuse_query.get_mail_sent(url): - urlabuse_query.set_mail_sent(url) data = urlabuse_query.cached(url) if not autosend: subject = 'URL Abuse report from ' + ip else: subject = 'URL Abuse report sent automatically' msg = Message(subject, sender='urlabuse@circl.lu', recipients=["info@circl.lu"]) - msg.body = digest(data) + msg.body = urlabuse_query.digest(data) msg.body += '\n\n' - msg.body += json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')) + msg.body += json.dumps(data, sort_keys=True, indent=2) mail.send(msg) + urlabuse_query.set_mail_sent(url) @app.route('/submit', methods=['POST']) def send_mail():