mirror of https://github.com/CIRCL/url-abuse
				
				
				
			
		
			
				
	
	
		
			137 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
| #!/bin/python
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| import json
 | |
| import requests
 | |
| import time
 | |
| 
 | |
| 
 | |
| class PyURLAbuse(object):
 | |
| 
 | |
|     # def __init__(self, url='https://www.circl.lu/urlabuse/'):
 | |
|     def __init__(self, url='http://0.0.0.0:5100/'):
 | |
|         self.url = url
 | |
| 
 | |
|         self.session = requests.Session()
 | |
|         self.session.headers.update({'content-type': 'application/json'})
 | |
| 
 | |
|     def get_result(self, job_id):
 | |
|         response = self.session.get('{}_result/{}' .format(self.url, job_id))
 | |
|         if response.status_code == 202:
 | |
|             return None
 | |
|         else:
 | |
|             return response.json()
 | |
| 
 | |
|     def _async(self, path, query):
 | |
|         response = self.session.post('{}{}' .format(self.url, path),
 | |
|                                      data=json.dumps(query))
 | |
|         return response.text
 | |
| 
 | |
|     def start(self, q):
 | |
|         query = {'url': q}
 | |
|         return self._async('start', query)
 | |
| 
 | |
|     def urls(self, q):
 | |
|         query = {'url': q}
 | |
|         return self._async('urls', query)
 | |
| 
 | |
|     def resolve(self, q):
 | |
|         query = {'url': q}
 | |
|         return self._async('resolve', query)
 | |
| 
 | |
|     def phishtank(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('phishtank', query)
 | |
| 
 | |
|     def virustotal(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('virustotal_report', query)
 | |
| 
 | |
|     def googlesafebrowsing(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('googlesafebrowsing', query)
 | |
| 
 | |
|     def urlquery(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('urlquery', query)
 | |
| 
 | |
|     def ticket(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('ticket', query)
 | |
| 
 | |
|     def whoismail(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('whois', query)
 | |
| 
 | |
|     def pdnscircl(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('pdnscircl', query)
 | |
| 
 | |
|     def bgpr(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('bgpranking', query)
 | |
| 
 | |
|     def sslcircl(self, q):
 | |
|         query = {'query': q}
 | |
|         return self._async('psslcircl', query)
 | |
| 
 | |
|     def run_query(self, q):
 | |
|         cached = self.get_cache(q)
 | |
|         if len(cached[0][q]) > 0:
 | |
|             return {'info': 'Used cached content'}, cached
 | |
|         job_id = self.urls(q)
 | |
|         all_urls = None
 | |
|         while True:
 | |
|             all_urls = self.get_result(job_id)
 | |
|             if all_urls is None:
 | |
|                 time.sleep(.5)
 | |
|             else:
 | |
|                 break
 | |
| 
 | |
|         res = {}
 | |
|         for u in all_urls:
 | |
|             res[u] = self.resolve(u)
 | |
|             self.phishtank(u)
 | |
|             self.virustotal(u)
 | |
|             self.googlesafebrowsing(u)
 | |
|             self.urlquery(u)
 | |
|             self.ticket(u)
 | |
|             self.whoismail(u)
 | |
| 
 | |
|         waiting = True
 | |
|         done = []
 | |
|         while waiting:
 | |
|             waiting = False
 | |
|             for u, job_id in res.iteritems():
 | |
|                 if job_id in done:
 | |
|                     continue
 | |
|                 ips = self.get_result(job_id)
 | |
|                 if ips is not None:
 | |
|                     done.append(job_id)
 | |
|                     v4, v6 = ips
 | |
|                     if v4 is not None:
 | |
|                         for ip in v4:
 | |
|                             self.phishtank(ip)
 | |
|                             self.bgpr(ip)
 | |
|                             self.urlquery(ip)
 | |
|                             self.pdnscircl(ip)
 | |
|                             self.sslcircl(ip)
 | |
|                             self.ticket(ip)
 | |
|                             self.whoismail(ip)
 | |
|                     if v6 is not None:
 | |
|                         for ip in v6:
 | |
|                             self.phishtank(ip)
 | |
|                             self.urlquery(ip)
 | |
|                             self.pdnscircl(ip)
 | |
|                             self.ticket(ip)
 | |
|                             self.whoismail(ip)
 | |
|                 waiting = True
 | |
|                 time.sleep(.5)
 | |
|         time.sleep(1)
 | |
|         return {'info': 'New query, all the details may not be available.'}, self.get_cache(q)
 | |
| 
 | |
|     def get_cache(self, q):
 | |
|         query = {'query': q}
 | |
|         response = self.session.post('{}get_cache' .format(self.url), data=json.dumps(query))
 | |
|         return response.json()
 |