chg: Cleanup, better digest

pull/14/head
Raphaël Vinot 2019-01-15 16:20:38 +01:00
parent cfdf8d7c0e
commit 0ccd535f3d
4 changed files with 75 additions and 70 deletions

View File

@ -12,7 +12,7 @@ if __name__ == '__main__':
parser.add_argument('--url', type=str, help='URL of the instance.')
parser.add_argument('--query', help='URL to lookup')
parser.add_argument('--email', action='store_true', help='Return the email template')
parser.add_argument('--digest', action='store_true', help='Return the digest')
args = parser.parse_args()
@ -21,8 +21,8 @@ if __name__ == '__main__':
else:
urlabuse = PyURLAbuse()
response = urlabuse.run_query(args.query, args.email)
if args.email:
print(response['mail'])
response = urlabuse.run_query(args.query, args.digest)
if args.digest:
print(response['digest'][0])
else:
print(json.dumps(response, indent=2))

View File

@ -21,7 +21,7 @@ class PyURLAbuse(object):
return r.status_code == 200
def get_result(self, job_id):
response = self.session.get(urljoin(self.url, f'_result/{job_id}'))
response = self.session.get(urljoin(self.url, '_result/{}'.format(job_id)))
if response.status_code == 202:
return None
else:
@ -117,13 +117,11 @@ class PyURLAbuse(object):
content.append('\t\tVirusTotal positive detections: {} out of {}'.format(res[1], res[2]))
return '\n\n '.join(content)
def run_query(self, q, return_mail_template=False):
cached = self.get_cache(q)
if len(cached[0][q]) > 0:
to_return = {'info': 'Used cached content', 'result': cached}
if return_mail_template:
to_return['mail'] = self.make_mail_template(cached)
return to_return
def run_query(self, q, with_digest=False):
cached = self.get_cache(q, with_digest)
if len(cached['result']) > 0:
cached['info'] = 'Used cached content'
return cached
job_id = self.urls(q)
all_urls = None
while True:
@ -174,13 +172,11 @@ class PyURLAbuse(object):
waiting = True
time.sleep(.5)
time.sleep(1)
cached = self.get_cache(q)
to_return = {'info': 'New query, all the details may not be available.', 'result': cached}
if return_mail_template:
to_return['mail'] = self.make_mail_template(cached)
return to_return
cached = self.get_cache(q, with_digest)
cached['info'] = 'New query, all the details may not be available.'
return cached
def get_cache(self, q):
query = {'query': q}
def get_cache(self, q, digest=False):
query = {'query': q, 'digest': digest}
response = self.session.post(urljoin(self.url, 'get_cache'), data=json.dumps(query))
return response.json()

View File

@ -501,13 +501,58 @@ class Query():
data.update(ip_data)
return {url: data}, redirects
def cached(self, url):
def cached(self, url, digest=False):
url_data, redirects = self.get_url_data(url)
to_return = [url_data]
if redirects:
for u in redirects:
if u == url:
continue
data, redir = self.get_url_data(u)
to_return.append(data)
for u in redirects:
if u == url:
continue
data, redir = self.get_url_data(u)
to_return.append(data)
if digest:
return {'result': to_return, 'digest': self.digest(to_return)}
return {'result': to_return}
def ip_details_digest(self, ips, all_info, all_asns, all_mails):
to_return = ''
for ip in ips:
to_return += '\t' + ip + '\n'
data = all_info[ip]
if data.get('bgpranking'):
to_return += '\t\tis announced by {} ({}). Position {}/{}.'.format(
data['bgpranking'][2], data['bgpranking'][0],
data['bgpranking'][4], data['bgpranking'][5])
all_asns.add('{} ({})'.format(data['bgpranking'][2], data['bgpranking'][0]))
if data.get('whois'):
all_mails.update(data.get('whois'))
to_return += '\n\t\tContacts: {}\n'.format(', '.join(data.get('whois')))
return to_return
def digest(self, data):
to_return = ''
all_mails = set()
all_asns = set()
for entry in data:
# Each URL we're redirected to
for url, info in entry.items():
# info contains the information we got for the URL.
to_return += '\n{}\n'.format(url)
if 'whois' in info:
all_mails.update(info['whois'])
to_return += '\tContacts: {}\n'.format(', '.join(info['whois']))
if 'vt' in info and len(info['vt']) == 4:
to_return += '\t{} out of {} positive detections in VT - {}\n'.format(
info['vt'][2], info['vt'][3], info['vt'][1])
if 'gsb' in info:
to_return += '\tKnown as malicious on Google Safe Browsing: {}\n'.format(info['gsb'])
if 'phishtank' in info:
to_return += '\tKnown on PhishTank: {}\n'.format(info['phishtank'])
if 'dns'in info:
ipv4, ipv6 = info['dns']
if ipv4 is not None:
to_return += self.ip_details_digest(ipv4, info, all_asns, all_mails)
if ipv6 is not None:
to_return += self.ip_details_digest(ipv6, info, all_asns, all_mails)
to_return += '\n\tAll contacts: {}\n'.format(', '.join(all_mails))
return to_return, list(all_mails), list(all_asns)

View File

@ -296,62 +296,26 @@ def create_app(configfile=None):
def get_cache():
data = request.get_json(force=True)
url = data["query"]
data = urlabuse_query.cached(url)
if 'digest' in data:
digest = data["digest"]
else:
digest = False
data = urlabuse_query.cached(url, digest)
return Response(json.dumps(data), mimetype='application/json')
def digest(data):
to_return = ''
all_mails = set()
for entry in data:
for url, info in list(entry.items()):
to_return += '\n{}\n'.format(url)
if info.get('whois'):
all_mails.update(info.get('whois'))
to_return += '\tContacts: {}\n'.format(', '.join(info.get('whois')))
if info.get('vt') and len(info.get('vt')) == 4:
vtstuff = info.get('vt')
to_return += '\t{} out of {} positive detections in VT - {}\n'.format(
vtstuff[2], vtstuff[3], vtstuff[1])
if info.get('gsb'):
to_return += '\tKnown as malicious on Google Safe Browsing: {}\n'.format(info.get('gsb'))
if info.get('phishtank'):
to_return += '\tKnown as malicious on PhishTank\n'
if info.get('dns'):
ipv4, ipv6 = info.get('dns')
if ipv4 is not None:
for ip in ipv4:
to_return += '\t' + ip + '\n'
data = info[ip]
if data.get('bgpranking'):
to_return += '\t\t(PTR: {}) is announced by {} ({}).\n'.format(*(data.get('bgp')[:3]))
if data.get('whois'):
all_mails.update(data.get('whois'))
to_return += '\t\tContacts: {}\n'.format(', '.join(data.get('whois')))
if ipv6 is not None:
for ip in ipv6:
to_return += '\t' + ip + '\n'
data = info[ip]
if data.get('bgpranking'):
to_return += '\t\t(PTR: {}) is announced by {} ({}).\n'.format(*(data.get('bgp')[:3]))
if data.get('whois'):
all_mails.update(data.get('whois'))
to_return += '\t\tContacts: {}\n'.format(', '.join(data.get('whois')))
to_return += '\tAll contacts: {}\n'.format(', '.join(all_mails))
return to_return
def send(url, ip='', autosend=False):
if not urlabuse_query.get_mail_sent(url):
urlabuse_query.set_mail_sent(url)
data = urlabuse_query.cached(url)
if not autosend:
subject = 'URL Abuse report from ' + ip
else:
subject = 'URL Abuse report sent automatically'
msg = Message(subject, sender='urlabuse@circl.lu', recipients=["info@circl.lu"])
msg.body = digest(data)
msg.body = urlabuse_query.digest(data)
msg.body += '\n\n'
msg.body += json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '))
msg.body += json.dumps(data, sort_keys=True, indent=2)
mail.send(msg)
urlabuse_query.set_mail_sent(url)
@app.route('/submit', methods=['POST'])
def send_mail():