mirror of https://github.com/CIRCL/url-abuse
575 lines
20 KiB
Python
575 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
#
|
|
# Copyright (C) 2014 Sascha Rommelfangen, Raphael Vinot
|
|
# Copyright (C) 2014 CIRCL Computer Incident Response Center Luxembourg (SMILE gie)
|
|
#
|
|
|
|
from datetime import date, timedelta
|
|
import json
|
|
from redis import Redis
|
|
from urllib.parse import quote
|
|
from .helpers import get_socket_path
|
|
import ipaddress
|
|
|
|
|
|
from pyfaup.faup import Faup
|
|
import socket
|
|
import dns.resolver
|
|
import re
|
|
import logging
|
|
from pypdns import PyPDNS
|
|
from pyipasnhistory import IPASNHistory
|
|
from pybgpranking import BGPRanking
|
|
from pylookyloo import Lookyloo
|
|
|
|
from pypssl import PyPSSL
|
|
from pyeupi import PyEUPI
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
try:
|
|
# import sphinxapi
|
|
sphinx = True
|
|
except Exception:
|
|
sphinx = False
|
|
|
|
|
|
class Query():
|
|
|
|
def __init__(self, loglevel: int=logging.DEBUG):
|
|
self.__init_logger(loglevel)
|
|
self.fex = Faup()
|
|
self.cache = Redis(unix_socket_path=get_socket_path('cache'), db=1,
|
|
decode_responses=True)
|
|
|
|
def __init_logger(self, loglevel) -> None:
|
|
self.logger = logging.getLogger(f'{self.__class__.__name__}')
|
|
self.logger.setLevel(loglevel)
|
|
|
|
def _cache_set(self, key, value, field=None):
|
|
if field is None:
|
|
self.cache.setex(key, json.dumps(value), 3600)
|
|
else:
|
|
self.cache.hset(key, field, json.dumps(value))
|
|
self.cache.expire(key, 3600)
|
|
|
|
def _cache_get(self, key, field=None):
|
|
if field is None:
|
|
value_json = self.cache.get(key)
|
|
else:
|
|
value_json = self.cache.hget(key, field)
|
|
if value_json is not None:
|
|
return json.loads(value_json)
|
|
return None
|
|
|
|
def to_bool(self, s):
|
|
"""
|
|
Converts the given string to a boolean.
|
|
"""
|
|
return s.lower() in ('1', 'true', 'yes', 'on')
|
|
|
|
def get_submissions(self, url, day=None):
|
|
if day is None:
|
|
day = date.today().isoformat()
|
|
else:
|
|
day = day.isoformat()
|
|
return self.cache.zscore(f'{day}_submissions', url)
|
|
|
|
def get_mail_sent(self, url, day=None):
|
|
if day is None:
|
|
day = date.today().isoformat()
|
|
else:
|
|
day = day.isoformat()
|
|
self.fex.decode(url)
|
|
host = self.fex.get_host()
|
|
return self.cache.sismember(f'{day}_mails', host)
|
|
|
|
def set_mail_sent(self, url, day=None):
|
|
if day is None:
|
|
day = date.today().isoformat()
|
|
else:
|
|
day = day.isoformat()
|
|
self.fex.decode(url)
|
|
host = self.fex.get_host()
|
|
return self.cache.sadd(f'{day}_mails', host)
|
|
|
|
def is_valid_url(self, url):
|
|
cached = self._cache_get(url, 'valid')
|
|
key = f'{date.today().isoformat()}_submissions'
|
|
self.cache.zincrby(key, 1, url)
|
|
if cached is not None:
|
|
return cached
|
|
if url.startswith('hxxp'):
|
|
url = 'http' + url[4:]
|
|
elif not url.startswith('http'):
|
|
url = 'http://' + url
|
|
logging.debug("Checking validity of URL: " + url)
|
|
self.fex.decode(url)
|
|
scheme = self.fex.get_scheme()
|
|
host = self.fex.get_host()
|
|
if scheme is None or host is None:
|
|
reason = "Not a valid http/https URL/URI"
|
|
return False, url, reason
|
|
self._cache_set(url, (True, url, None), 'valid')
|
|
return True, url, None
|
|
|
|
def is_ip(self, host):
|
|
try:
|
|
ipaddress.ip_address(host)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def try_resolve(self, url):
|
|
self.fex.decode(url)
|
|
host = self.fex.get_host().lower()
|
|
if self.is_ip(host):
|
|
return True, None
|
|
try:
|
|
ipaddr = dns.resolver.query(host, 'A')
|
|
except Exception:
|
|
reason = "DNS server problem. Check resolver settings."
|
|
return False, reason
|
|
if not ipaddr:
|
|
reason = "Host " + host + " does not exist."
|
|
return False, reason
|
|
return True, None
|
|
|
|
def get_urls(self, url, depth=1):
|
|
if depth > 5:
|
|
print('Too many redirects.')
|
|
return
|
|
|
|
def meta_redirect(content):
|
|
c = content.lower()
|
|
soup = BeautifulSoup(c, "html.parser")
|
|
for result in soup.find_all(attrs={'http-equiv': 'refresh'}):
|
|
if result:
|
|
out = result["content"].split(";")
|
|
if len(out) == 2:
|
|
wait, text = out
|
|
try:
|
|
a, url = text.split('=', 1)
|
|
return url.strip()
|
|
except Exception:
|
|
print(text)
|
|
return None
|
|
|
|
resolve, reason = self.try_resolve(url)
|
|
if not resolve:
|
|
# FIXME: inform that the domain does not resolve
|
|
yield url
|
|
return
|
|
|
|
logging.debug(f"Making HTTP connection to {url}")
|
|
|
|
headers = {'User-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0'}
|
|
try:
|
|
response = requests.get(url, allow_redirects=True, headers=headers,
|
|
timeout=15, verify=False)
|
|
except Exception:
|
|
# That one can fail (DNS for example)
|
|
# FIXME: inform that the get failed
|
|
yield url
|
|
return
|
|
if response.history is not None:
|
|
for h in response.history:
|
|
# Yeld the urls in the order we find them
|
|
yield h.url
|
|
|
|
yield response.url
|
|
|
|
meta_redir_url = meta_redirect(response.content)
|
|
if meta_redir_url is not None:
|
|
depth += 1
|
|
if not meta_redir_url.startswith('http'):
|
|
self.fex.decode(url)
|
|
base = '{}://{}'.format(self.fex.get_scheme(), self.fex.get_host())
|
|
port = self.fex.get_port()
|
|
if port is not None:
|
|
base += f':{port}'
|
|
if not meta_redir_url.startswith('/'):
|
|
# relative redirect. resource_path has the initial '/'
|
|
if self.fex.get_resource_path() is not None:
|
|
base += self.fex.get_resource_path()
|
|
if not base.endswith('/'):
|
|
base += '/'
|
|
meta_redir_url = base + meta_redir_url
|
|
for url in self.get_urls(meta_redir_url, depth):
|
|
yield url
|
|
|
|
def url_list(self, url):
|
|
cached = self._cache_get(url, 'list')
|
|
if cached is not None:
|
|
return cached
|
|
list_urls = []
|
|
for u in self.get_urls(url):
|
|
if u is None or u in list_urls:
|
|
continue
|
|
list_urls.append(u)
|
|
self._cache_set(url, list_urls, 'list')
|
|
return list_urls
|
|
|
|
def dns_resolve(self, url):
|
|
cached = self._cache_get(url, 'dns')
|
|
if cached is not None:
|
|
return cached
|
|
self.fex.decode(url)
|
|
host = self.fex.get_host().lower()
|
|
ipv4 = None
|
|
ipv6 = None
|
|
if self.is_ip(host):
|
|
if ':' in host:
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, host)
|
|
ipv6 = [host]
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
socket.inet_aton(host)
|
|
ipv4 = [host]
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
ipv4 = [str(ip) for ip in dns.resolver.query(host, 'A')]
|
|
except Exception:
|
|
logging.debug("No IPv4 address assigned to: " + host)
|
|
try:
|
|
ipv6 = [str(ip) for ip in dns.resolver.query(host, 'AAAA')]
|
|
except Exception:
|
|
logging.debug("No IPv6 address assigned to: " + host)
|
|
self._cache_set(url, (ipv4, ipv6), 'dns')
|
|
return ipv4, ipv6
|
|
|
|
def phish_query(self, url, key, query):
|
|
cached = self._cache_get(query, 'phishtank')
|
|
if cached is not None:
|
|
return cached
|
|
postfields = {'url': quote(query), 'format': 'json', 'app_key': key}
|
|
response = requests.post(url, data=postfields)
|
|
res = response.json()
|
|
if res["meta"]["status"] == "success":
|
|
if res["results"]["in_database"]:
|
|
self._cache_set(query, res["results"]["phish_detail_page"], 'phishtank')
|
|
return res["results"]["phish_detail_page"]
|
|
else:
|
|
# no information
|
|
pass
|
|
elif res["meta"]["status"] == 'error':
|
|
# Inform the user?
|
|
# errormsg = res["errortext"]
|
|
pass
|
|
return None
|
|
|
|
def sphinxsearch(server, port, url, query):
|
|
# WARNING: too dangerous to have on the public interface
|
|
return ''
|
|
"""
|
|
if not sphinx:
|
|
return None
|
|
cached = _cache_get(query, 'sphinx')
|
|
if cached is not None:
|
|
return cached
|
|
client = sphinxapi.SphinxClient()
|
|
client.SetServer(server, port)
|
|
client.SetMatchMode(2)
|
|
client.SetConnectTimeout(5.0)
|
|
result = []
|
|
res = client.Query(query)
|
|
if res.get("matches") is not None:
|
|
for ticket in res["matches"]:
|
|
ticket_id = ticket["id"]
|
|
ticket_link = url + str(ticket_id)
|
|
result.append(ticket_link)
|
|
_cache_set(query, result, 'sphinx')
|
|
return result
|
|
|
|
"""
|
|
|
|
def vt_query_url(self, url, url_up, key, query, upload=True):
|
|
cached = self._cache_get(query, 'vt')
|
|
if cached is not None and cached[2] is not None:
|
|
return cached
|
|
parameters = {"resource": query, "apikey": key}
|
|
if upload:
|
|
parameters['scan'] = 1
|
|
response = requests.post(url, data=parameters)
|
|
if response.text is None or len(response.text) == 0:
|
|
return None
|
|
res = response.json()
|
|
msg = res["verbose_msg"]
|
|
link = res.get("permalink")
|
|
positives = res.get("positives")
|
|
total = res.get("total")
|
|
self._cache_set(query, (msg, link, positives, total), 'vt')
|
|
return msg, link, positives, total
|
|
|
|
def gsb_query(self, url, query):
|
|
cached = self._cache_get(query, 'gsb')
|
|
if cached is not None:
|
|
return cached
|
|
param = '1\n' + query
|
|
response = requests.post(url, data=param)
|
|
status = response.status_code
|
|
if status == 200:
|
|
self._cache_set(query, response.text, 'gsb')
|
|
return response.text
|
|
|
|
'''
|
|
def urlquery_query(url, key, query):
|
|
return None
|
|
cached = _cache_get(query, 'urlquery')
|
|
if cached is not None:
|
|
return cached
|
|
try:
|
|
urlquery.url = url
|
|
urlquery.key = key
|
|
response = urlquery.search(query)
|
|
except Exception:
|
|
return None
|
|
if response['_response_']['status'] == 'ok':
|
|
if response.get('reports') is not None:
|
|
total_alert_count = 0
|
|
for r in response['reports']:
|
|
total_alert_count += r['urlquery_alert_count']
|
|
total_alert_count += r['ids_alert_count']
|
|
total_alert_count += r['blacklist_alert_count']
|
|
_cache_set(query, total_alert_count, 'urlquery')
|
|
return total_alert_count
|
|
else:
|
|
return None
|
|
'''
|
|
|
|
def process_emails(self, emails, ignorelist, replacelist):
|
|
to_return = list(set(emails))
|
|
for mail in reversed(to_return):
|
|
for ignorelist_entry in ignorelist:
|
|
if re.search(ignorelist_entry, mail, re.I):
|
|
if mail in to_return:
|
|
to_return.remove(mail)
|
|
for k, v in list(replacelist.items()):
|
|
if re.search(k, mail, re.I):
|
|
if k in to_return:
|
|
to_return.remove(k)
|
|
to_return += v
|
|
return to_return
|
|
|
|
def whois(self, server, port, domain, ignorelist, replacelist):
|
|
cached = self._cache_get(domain, 'whois')
|
|
if cached is not None:
|
|
return cached
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(15)
|
|
try:
|
|
s.connect((server, port))
|
|
except Exception:
|
|
print("Connection problems - check WHOIS server")
|
|
print(("WHOIS request while problem occurred: ", domain))
|
|
print(("WHOIS server: {}:{}".format(server, port)))
|
|
return None
|
|
if domain.startswith('http'):
|
|
self.fex.decode(domain)
|
|
d = self.fex.get_domain().lower()
|
|
else:
|
|
d = domain
|
|
s.send(("{}\r\n".format(d)).encode())
|
|
response = b''
|
|
while True:
|
|
d = s.recv(4096)
|
|
response += d
|
|
if d == b'':
|
|
break
|
|
s.close()
|
|
match = re.findall(r'[\w\.-]+@[\w\.-]+', response.decode())
|
|
emails = self.process_emails(match, ignorelist, replacelist)
|
|
if len(emails) == 0:
|
|
return None
|
|
list_mail = list(set(emails))
|
|
self._cache_set(domain, list_mail, 'whois')
|
|
return list_mail
|
|
|
|
def pdnscircl(self, url, user, passwd, q):
|
|
cached = self._cache_get(q, 'pdns')
|
|
if cached is not None:
|
|
return cached
|
|
pdns = PyPDNS(url, basic_auth=(user, passwd))
|
|
response = pdns.query(q)
|
|
all_uniq = []
|
|
for e in reversed(response):
|
|
host = e['rrname'].lower()
|
|
if host in all_uniq:
|
|
continue
|
|
else:
|
|
all_uniq.append(host)
|
|
response = (len(all_uniq), all_uniq[:5])
|
|
self._cache_set(q, response, 'pdns')
|
|
return response
|
|
|
|
def psslcircl(self, url, user, passwd, q):
|
|
cached = self._cache_get(q, 'pssl')
|
|
if cached is not None:
|
|
return cached
|
|
pssl = PyPSSL(url, basic_auth=(user, passwd))
|
|
response = pssl.query(q)
|
|
if response.get(q) is not None:
|
|
certinfo = response.get(q)
|
|
entries = {}
|
|
for sha1 in certinfo['certificates']:
|
|
entries[sha1] = []
|
|
if certinfo['subjects'].get(sha1):
|
|
for value in certinfo['subjects'][sha1]['values']:
|
|
entries[sha1].append(value)
|
|
self._cache_set(q, entries, 'pssl')
|
|
return entries
|
|
return None
|
|
|
|
def eupi(self, url, key, q):
|
|
cached = self._cache_get(q, 'eupi')
|
|
if cached is not None:
|
|
return cached
|
|
eu = PyEUPI(key, url)
|
|
response = eu.search_url(url=q)
|
|
if response.get('results'):
|
|
r = response.get('results')[0]['tag_label']
|
|
self._cache_set(q, r, 'eupi')
|
|
return r
|
|
eu.post_submission(q)
|
|
return None
|
|
|
|
def bgpranking(self, ip):
|
|
cached = self._cache_get(ip, 'ipasn')
|
|
if cached is not None:
|
|
asn = cached['asn']
|
|
prefix = cached['prefix']
|
|
else:
|
|
ipasn = IPASNHistory()
|
|
response = ipasn.query(ip)
|
|
if 'response' not in response:
|
|
asn = None
|
|
prefix = None
|
|
entry = response['response'][list(response['response'].keys())[0]]
|
|
if entry:
|
|
self._cache_set(ip, entry, 'ipasn')
|
|
asn = entry['asn']
|
|
prefix = entry['prefix']
|
|
else:
|
|
asn = None
|
|
prefix = None
|
|
|
|
if not asn or not prefix:
|
|
# asn, prefix, asn_descr, rank, position, known_asns
|
|
return None, None, None, None, None, None
|
|
|
|
cached = self._cache_get(ip, 'bgpranking')
|
|
if cached is not None:
|
|
return cached
|
|
bgpranking = BGPRanking()
|
|
response = bgpranking.query(asn, date=(date.today() - timedelta(1)).isoformat())
|
|
if 'response' not in response or not response['response']:
|
|
return None, None, None, None, None, None
|
|
to_return = (asn, prefix, response['response']['asn_description'], response['response']['ranking']['rank'],
|
|
response['response']['ranking']['position'], response['response']['ranking']['total_known_asns'])
|
|
self._cache_set(ip, to_return, 'bgpranking')
|
|
return to_return
|
|
|
|
def lookyloo(self, url):
|
|
cached = self._cache_get(url, 'lookyloo')
|
|
if cached is not None:
|
|
return cached
|
|
lookyloo = Lookyloo()
|
|
lookyloo_perma_url = lookyloo.enqueue(url)
|
|
if lookyloo_perma_url:
|
|
self._cache_set(url, lookyloo_perma_url, 'lookyloo')
|
|
return lookyloo_perma_url
|
|
return None
|
|
|
|
def _deserialize_cached(self, entry):
|
|
to_return = {}
|
|
redirects = []
|
|
h = self.cache.hgetall(entry)
|
|
for key, value in h.items():
|
|
v = json.loads(value)
|
|
if key == 'list':
|
|
redirects = v
|
|
continue
|
|
to_return[key] = v
|
|
return to_return, redirects
|
|
|
|
def get_url_data(self, url):
|
|
data, redirects = self._deserialize_cached(url)
|
|
if data.get('dns') is not None:
|
|
ipv4, ipv6 = data['dns']
|
|
ip_data = {}
|
|
if ipv4 is not None:
|
|
for ip in ipv4:
|
|
info, _ = self._deserialize_cached(ip)
|
|
ip_data[ip] = info
|
|
if ipv6 is not None:
|
|
for ip in ipv6:
|
|
info, _ = self._deserialize_cached(ip)
|
|
ip_data[ip] = info
|
|
if len(ip_data) > 0:
|
|
data.update(ip_data)
|
|
return {url: data}, redirects
|
|
|
|
def cached(self, url, digest=False):
|
|
url_data, redirects = self.get_url_data(url)
|
|
to_return = [url_data]
|
|
for u in redirects:
|
|
if u == url:
|
|
continue
|
|
data, redir = self.get_url_data(u)
|
|
to_return.append(data)
|
|
if digest:
|
|
return {'result': to_return, 'digest': self.digest(to_return)}
|
|
return {'result': to_return}
|
|
|
|
def ip_details_digest(self, ips, all_info, all_asns, all_mails):
|
|
to_return = ''
|
|
for ip in ips:
|
|
to_return += '\t' + ip + '\n'
|
|
data = all_info[ip]
|
|
if data.get('bgpranking'):
|
|
to_return += '\t\tis announced by {} ({}). Position {}/{}.\n'.format(
|
|
data['bgpranking'][2], data['bgpranking'][0],
|
|
data['bgpranking'][4], data['bgpranking'][5])
|
|
all_asns.add('{} ({})'.format(data['bgpranking'][2], data['bgpranking'][0]))
|
|
if data.get('whois'):
|
|
all_mails.update(data.get('whois'))
|
|
return to_return
|
|
|
|
def digest(self, data):
|
|
to_return = ''
|
|
all_mails = set()
|
|
all_asns = set()
|
|
for entry in data:
|
|
# Each URL we're redirected to
|
|
for url, info in entry.items():
|
|
# info contains the information we got for the URL.
|
|
to_return += '\n{}\n'.format(url)
|
|
if 'whois' in info:
|
|
all_mails.update(info['whois'])
|
|
if 'lookyloo' in info:
|
|
to_return += '\tLookyloo permanent URL: {}\n'.format(info['lookyloo'])
|
|
if 'vt' in info and len(info['vt']) == 4:
|
|
if info['vt'][2] is not None:
|
|
to_return += '\t{} out of {} positive detections in VT - {}\n'.format(
|
|
info['vt'][2], info['vt'][3], info['vt'][1])
|
|
else:
|
|
to_return += '\t{} - {}\n'.format(info['vt'][0], info['vt'][1])
|
|
if 'gsb' in info:
|
|
to_return += '\tKnown as malicious on Google Safe Browsing: {}\n'.format(info['gsb'])
|
|
if 'phishtank' in info:
|
|
to_return += '\tKnown on PhishTank: {}\n'.format(info['phishtank'])
|
|
|
|
if 'dns'in info:
|
|
ipv4, ipv6 = info['dns']
|
|
if ipv4 is not None:
|
|
to_return += self.ip_details_digest(ipv4, info, all_asns, all_mails)
|
|
if ipv6 is not None:
|
|
to_return += self.ip_details_digest(ipv6, info, all_asns, all_mails)
|
|
return to_return, list(all_mails), list(all_asns)
|