mirror of https://github.com/CIRCL/url-abuse
chg: refactoring, cleanup, enable ipv6 lookup
parent
b9c0b0c690
commit
cfdf8d7c0e
|
@ -12,6 +12,7 @@ if __name__ == '__main__':
|
|||
parser.add_argument('--url', type=str, help='URL of the instance.')
|
||||
|
||||
parser.add_argument('--query', help='URL to lookup')
|
||||
parser.add_argument('--email', action='store_true', help='Return the email template')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
@ -20,5 +21,8 @@ if __name__ == '__main__':
|
|||
else:
|
||||
urlabuse = PyURLAbuse()
|
||||
|
||||
response = urlabuse.run_query(args.query)
|
||||
print(json.dumps(response, indent=2))
|
||||
response = urlabuse.run_query(args.query, args.email)
|
||||
if args.email:
|
||||
print(response['mail'])
|
||||
else:
|
||||
print(json.dumps(response, indent=2))
|
||||
|
|
|
@ -79,10 +79,51 @@ class PyURLAbuse(object):
|
|||
query = {'query': q}
|
||||
return self._async('psslcircl', query)
|
||||
|
||||
def run_query(self, q):
|
||||
def make_mail_template(self, results):
|
||||
content = []
|
||||
|
||||
for result in results:
|
||||
url = list(result.keys())[0]
|
||||
details = list(result.values())[0]
|
||||
content.append(url)
|
||||
if 'googlesafebrowsing' in details:
|
||||
content.append('\tKnown as malicious on Google Safe Browsing: {}'.format(details.get('googlesafebrowsing')))
|
||||
|
||||
if 'phishtank' in details:
|
||||
content.append('\tKnown as on PhishTank: {}'.format(details.get('phishtank')))
|
||||
|
||||
if 'vt' in details and details.get('vt'):
|
||||
vt_res = details.get('vt')
|
||||
if int(vt_res[2]) != 0:
|
||||
content.append('\tVirusTotal positive detections: {} out of {}'.format(vt_res[2], vt_res[3]))
|
||||
|
||||
# IPs
|
||||
if 'dns' not in details:
|
||||
content.append('No DNS resolutions.')
|
||||
continue
|
||||
for ip_list in details['dns']:
|
||||
if not ip_list:
|
||||
continue
|
||||
for ip in ip_list:
|
||||
ip_details = details[ip]
|
||||
content.append('\t' + ip)
|
||||
if 'bgpranking' in ip_details:
|
||||
content.append('\t\t is announced by {} ({}). Position {}/{}.'.format(
|
||||
ip_details['bgpranking'][2], ip_details['bgpranking'][0], ip_details['bgpranking'][4],
|
||||
ip_details['bgpranking'][5]))
|
||||
if ip_details.get('virustotal'):
|
||||
res = ip_details.get('virustotal')
|
||||
if res[0] == 1 and int(res[1]) != 0:
|
||||
content.append('\t\tVirusTotal positive detections: {} out of {}'.format(res[1], res[2]))
|
||||
return '\n\n '.join(content)
|
||||
|
||||
def run_query(self, q, return_mail_template=False):
|
||||
cached = self.get_cache(q)
|
||||
if len(cached[0][q]) > 0:
|
||||
return {'info': 'Used cached content'}, cached
|
||||
to_return = {'info': 'Used cached content', 'result': cached}
|
||||
if return_mail_template:
|
||||
to_return['mail'] = self.make_mail_template(cached)
|
||||
return to_return
|
||||
job_id = self.urls(q)
|
||||
all_urls = None
|
||||
while True:
|
||||
|
@ -125,6 +166,7 @@ class PyURLAbuse(object):
|
|||
if v6 is not None:
|
||||
for ip in v6:
|
||||
self.phishtank(ip)
|
||||
self.bgpr(ip)
|
||||
self.urlquery(ip)
|
||||
self.pdnscircl(ip)
|
||||
self.ticket(ip)
|
||||
|
@ -132,7 +174,11 @@ class PyURLAbuse(object):
|
|||
waiting = True
|
||||
time.sleep(.5)
|
||||
time.sleep(1)
|
||||
return {'info': 'New query, all the details may not be available.'}, self.get_cache(q)
|
||||
cached = self.get_cache(q)
|
||||
to_return = {'info': 'New query, all the details may not be available.', 'result': cached}
|
||||
if return_mail_template:
|
||||
to_return['mail'] = self.make_mail_template(cached)
|
||||
return to_return
|
||||
|
||||
def get_cache(self, q):
|
||||
query = {'query': q}
|
||||
|
|
|
@ -460,7 +460,7 @@ class Query():
|
|||
# asn, prefix, asn_descr, rank, position, known_asns
|
||||
return None, None, None, None, None, None
|
||||
|
||||
cached = self._cache_get(asn, 'bgp')
|
||||
cached = self._cache_get(ip, 'bgpranking')
|
||||
if cached is not None:
|
||||
return cached
|
||||
bgpranking = BGPRanking()
|
||||
|
@ -469,38 +469,45 @@ class Query():
|
|||
return None, None, None, None, None, None
|
||||
to_return = (asn, prefix, response['response']['asn_description'], response['response']['ranking']['rank'],
|
||||
response['response']['ranking']['position'], response['response']['ranking']['total_known_asns'])
|
||||
self._cache_set(asn, to_return, 'bgp')
|
||||
self._cache_set(ip, to_return, 'bgpranking')
|
||||
return to_return
|
||||
|
||||
def _deserialize_cached(self, entry):
|
||||
to_return = {}
|
||||
redirects = []
|
||||
h = self.cache.hgetall(entry)
|
||||
for key, value in list(h.items()):
|
||||
to_return[key] = json.loads(value)
|
||||
return to_return
|
||||
for key, value in h.items():
|
||||
v = json.loads(value)
|
||||
if key == 'list':
|
||||
redirects = v
|
||||
continue
|
||||
to_return[key] = v
|
||||
return to_return, redirects
|
||||
|
||||
def get_url_data(self, url):
|
||||
data = self._deserialize_cached(url)
|
||||
data, redirects = self._deserialize_cached(url)
|
||||
if data.get('dns') is not None:
|
||||
ipv4, ipv6 = data['dns']
|
||||
ip_data = {}
|
||||
if ipv4 is not None:
|
||||
for ip in ipv4:
|
||||
ip_data[ip] = self._deserialize_cached(ip)
|
||||
info, _ = self._deserialize_cached(ip)
|
||||
ip_data[ip] = info
|
||||
if ipv6 is not None:
|
||||
for ip in ipv6:
|
||||
ip_data[ip] = self._deserialize_cached(ip)
|
||||
info, _ = self._deserialize_cached(ip)
|
||||
ip_data[ip] = info
|
||||
if len(ip_data) > 0:
|
||||
data.update(ip_data)
|
||||
return {url: data}
|
||||
return {url: data}, redirects
|
||||
|
||||
def cached(self, url):
|
||||
url_data = self.get_url_data(url)
|
||||
url_data, redirects = self.get_url_data(url)
|
||||
to_return = [url_data]
|
||||
if url_data[url].get('list') is not None:
|
||||
url_redirs = url_data[url]['list']
|
||||
for u in url_redirs:
|
||||
if redirects:
|
||||
for u in redirects:
|
||||
if u == url:
|
||||
continue
|
||||
to_return.append(self.get_url_data(u))
|
||||
data, redir = self.get_url_data(u)
|
||||
to_return.append(data)
|
||||
return to_return
|
||||
|
|
|
@ -322,7 +322,7 @@ def create_app(configfile=None):
|
|||
for ip in ipv4:
|
||||
to_return += '\t' + ip + '\n'
|
||||
data = info[ip]
|
||||
if data.get('bgp'):
|
||||
if data.get('bgpranking'):
|
||||
to_return += '\t\t(PTR: {}) is announced by {} ({}).\n'.format(*(data.get('bgp')[:3]))
|
||||
if data.get('whois'):
|
||||
all_mails.update(data.get('whois'))
|
||||
|
@ -331,6 +331,8 @@ def create_app(configfile=None):
|
|||
for ip in ipv6:
|
||||
to_return += '\t' + ip + '\n'
|
||||
data = info[ip]
|
||||
if data.get('bgpranking'):
|
||||
to_return += '\t\t(PTR: {}) is announced by {} ({}).\n'.format(*(data.get('bgp')[:3]))
|
||||
if data.get('whois'):
|
||||
all_mails.update(data.get('whois'))
|
||||
to_return += '\t\tContacts: {}\n'.format(', '.join(data.get('whois')))
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
<uib-accordion-heading>{{ip}}</uib-accordion-heading>
|
||||
<uq-phishtank data="ip"></uq-phishtank>
|
||||
<!-- <li><uq-virustotal data="ip"></uq-virustotal></li> -->
|
||||
<!--<li><uq-bgpranking data="ip"></uq-bgpranking></li>-->
|
||||
<uq-bgpranking data="ip"></uq-bgpranking>
|
||||
<uq-urlquery data="ip"></uq-urlquery>
|
||||
<uq-pdnscircl data="ip"></uq-pdnscircl>
|
||||
<uq-ticket data="ip"></uq-ticket>
|
||||
|
|
Loading…
Reference in New Issue