url-abuse/client/pyurlabuse/api.py

173 lines
5.3 KiB
Python

#!/bin/python
# -*- coding: utf-8 -*-
import json
import requests
import time
from urllib.parse import urljoin
class PyURLAbuse(object):
def __init__(self, url='https://www.circl.lu/urlabuse/'):
self.url = url
self.session = requests.Session()
self.session.headers.update({'content-type': 'application/json'})
@property
def is_up(self):
r = self.session.head(self.root_url)
return r.status_code == 200
def get_result(self, job_id):
response = self.session.get(urljoin(self.url, '_result/{}'.format(job_id)))
if response.status_code == 202:
return None
else:
return response.json()
def _async(self, path, query):
response = self.session.post(urljoin(self.url, path), data=json.dumps(query))
return response.text
def start(self, q):
query = {'url': q}
return self._async('start', query)
def urls(self, q):
query = {'url': q}
return self._async('urls', query)
def resolve(self, q):
query = {'url': q}
return self._async('resolve', query)
def phishtank(self, q):
query = {'query': q}
return self._async('phishtank', query)
def virustotal(self, q):
query = {'query': q}
return self._async('virustotal_report', query)
def googlesafebrowsing(self, q):
query = {'query': q}
return self._async('googlesafebrowsing', query)
def urlquery(self, q):
query = {'query': q}
return self._async('urlquery', query)
def ticket(self, q):
query = {'query': q}
return self._async('ticket', query)
def whoismail(self, q):
query = {'query': q}
return self._async('whois', query)
def pdnscircl(self, q):
query = {'query': q}
return self._async('pdnscircl', query)
def bgpr(self, q):
query = {'query': q}
return self._async('bgpranking', query)
def sslcircl(self, q):
query = {'query': q}
return self._async('psslcircl', query)
def _update_cache(self, cached):
for result in cached['result']:
for url, items in result.items():
self.resolve(url)
self.phishtank(url)
self.virustotal(url)
self.googlesafebrowsing(url)
self.urlquery(url)
self.ticket(url)
self.whoismail(url)
if 'dns' not in items:
continue
for entry in items['dns']:
if entry is None:
continue
for ip in entry:
self.phishtank(ip)
self.bgpr(ip)
self.urlquery(ip)
self.pdnscircl(ip)
self.sslcircl(ip)
self.whoismail(ip)
def run_query(self, q, with_digest=False):
cached = self.get_cache(q, with_digest)
if len(cached['result']) > 0:
has_cached_content = True
self._update_cache(cached)
for r in cached['result']:
for url, content in r.items():
if not content:
has_cached_content = False
if has_cached_content:
cached['info'] = 'Used cached content'
return cached
job_id = self.urls(q)
all_urls = None
while True:
all_urls = self.get_result(job_id)
if all_urls is None:
time.sleep(.5)
else:
break
res = {}
for u in all_urls:
res[u] = self.resolve(u)
self.phishtank(u)
self.virustotal(u)
self.googlesafebrowsing(u)
self.urlquery(u)
self.ticket(u)
self.whoismail(u)
waiting = True
done = []
while waiting:
waiting = False
for u, job_id in res.items():
if job_id in done:
continue
ips = self.get_result(job_id)
if ips is not None:
done.append(job_id)
v4, v6 = ips
if v4 is not None:
for ip in v4:
self.phishtank(ip)
self.bgpr(ip)
self.urlquery(ip)
self.pdnscircl(ip)
self.sslcircl(ip)
self.whoismail(ip)
if v6 is not None:
for ip in v6:
self.phishtank(ip)
self.bgpr(ip)
self.urlquery(ip)
self.pdnscircl(ip)
self.whoismail(ip)
waiting = True
time.sleep(.5)
time.sleep(1)
cached = self.get_cache(q, with_digest)
cached['info'] = 'New query, all the details may not be available.'
return cached
def get_cache(self, q, digest=False):
query = {'query': q, 'digest': digest}
response = self.session.post(urljoin(self.url, 'get_cache'), data=json.dumps(query))
return response.json()