diff --git a/bin/run_workers.py b/bin/run_workers.py index aae1d7a..4111322 100755 --- a/bin/run_workers.py +++ b/bin/run_workers.py @@ -2,17 +2,29 @@ # -*- coding: utf-8 -*- import argparse from multiprocessing import Pool -from rq import Worker, Queue, Connection from redis import Redis from urlabuse.helpers import get_socket_path +from urlabuse.urlabuse import Query +import json +import time def worker(process_id: int): - listen = ['default'] - cache_socket = get_socket_path('cache') - with Connection(Redis(unix_socket_path=cache_socket)): - worker = Worker(list(map(Queue, listen))) - worker.work() + urlabuse_query = Query() + queue = Redis(unix_socket_path=get_socket_path('cache'), db=0, + decode_responses=True) + while True: + jobid = queue.spop('to_process') + if not jobid: + time.sleep(.1) + continue + to_process = queue.hgetall(jobid) + parameters = json.loads(to_process['data']) + try: + result = getattr(urlabuse_query, to_process['method'])(**parameters) + queue.hset(jobid, 'result', json.dumps(result)) + except Exception as e: + print(e, to_process) if __name__ == '__main__': diff --git a/client/bin/urlabuse b/client/bin/urlabuse new file mode 100755 index 0000000..0ae64d1 --- /dev/null +++ b/client/bin/urlabuse @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import argparse + +from pyurlabuse import PyURLAbuse +import json + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Run a query against URL Abuse') + parser.add_argument('--url', type=str, help='URL of the instance.') + + parser.add_argument('--query', help='URL to lookup') + + args = parser.parse_args() + + if args.url: + urlabuse = PyURLAbuse(args.url) + else: + urlabuse = PyURLAbuse() + + response = urlabuse.run_query(args.query) + print(json.dumps(response, indent=2)) diff --git a/client/pyurlabuse/__init__.py b/client/pyurlabuse/__init__.py index dd3c1ba..f258653 100644 --- a/client/pyurlabuse/__init__.py +++ b/client/pyurlabuse/__init__.py @@ -1 +1 @@ -from api import PyURLAbuse +from .api import PyURLAbuse diff --git a/client/pyurlabuse/api.py b/client/pyurlabuse/api.py index cc477c6..12b43f3 100644 --- a/client/pyurlabuse/api.py +++ b/client/pyurlabuse/api.py @@ -4,27 +4,31 @@ import json import requests import time +from urllib.parse import urljoin class PyURLAbuse(object): - # def __init__(self, url='https://www.circl.lu/urlabuse/'): - def __init__(self, url='http://0.0.0.0:5100/'): + def __init__(self, url='https://www.circl.lu/urlabuse/'): self.url = url self.session = requests.Session() self.session.headers.update({'content-type': 'application/json'}) + @property + def is_up(self): + r = self.session.head(self.root_url) + return r.status_code == 200 + def get_result(self, job_id): - response = self.session.get('{}_result/{}' .format(self.url, job_id)) + response = self.session.get(urljoin(self.url, f'_result/{job_id}')) if response.status_code == 202: return None else: return response.json() def _async(self, path, query): - response = self.session.post('{}{}' .format(self.url, path), - data=json.dumps(query)) + response = self.session.post(urljoin(self.url, path), data=json.dumps(query)) return response.text def start(self, q): @@ -102,7 +106,7 @@ class PyURLAbuse(object): done = [] while waiting: waiting = False - for u, job_id in res.iteritems(): + for u, job_id in res.items(): if job_id in done: continue ips = self.get_result(job_id) @@ -132,5 +136,5 @@ class PyURLAbuse(object): def get_cache(self, q): query = {'query': q} - response = self.session.post('{}get_cache' .format(self.url), data=json.dumps(query)) + response = self.session.post(urljoin(self.url, 'get_cache'), data=json.dumps(query)) return response.json() diff --git a/client/setup.py b/client/setup.py index 9ce95f6..f43c573 100644 --- a/client/setup.py +++ b/client/setup.py @@ -12,6 +12,7 @@ setup( description='Python API for URL Abuse.', long_description=open('README.md').read(), packages=['pyurlabuse'], + scripts=['bin/urlabuse'], classifiers=[ 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'Development Status :: 3 - Alpha', diff --git a/requirements.txt b/requirements.txt index e3bcf14..114a8d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -rq redis>=3 pypssl pypdns diff --git a/urlabuse/urlabuse.py b/urlabuse/urlabuse.py index 6d69fa7..d474627 100644 --- a/urlabuse/urlabuse.py +++ b/urlabuse/urlabuse.py @@ -7,7 +7,7 @@ from datetime import date, timedelta import json -import redis +from redis import Redis from urllib.parse import quote from .helpers import get_socket_path import ipaddress @@ -17,7 +17,6 @@ from pyfaup.faup import Faup import socket import dns.resolver import re -import sys import logging from pypdns import PyPDNS from pyipasnhistory import IPASNHistory @@ -35,503 +34,473 @@ try: except Exception: sphinx = False -r_cache = None +class Query(): -def _cache_init(): - global r_cache - if r_cache is None: - r_cache = redis.Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) + def __init__(self, loglevel: int=logging.DEBUG): + self.__init_logger(loglevel) + self.fex = Faup() + self.cache = Redis(unix_socket_path=get_socket_path('cache'), db=1, + decode_responses=True) + def __init_logger(self, loglevel) -> None: + self.logger = logging.getLogger(f'{self.__class__.__name__}') + self.logger.setLevel(loglevel) -def _cache_set(key, value, field=None): - _cache_init() - if field is None: - r_cache.setex(key, json.dumps(value), 3600) - else: - r_cache.hset(key, field, json.dumps(value)) - r_cache.expire(key, 3600) + def _cache_set(self, key, value, field=None): + if field is None: + self.cache.setex(key, json.dumps(value), 3600) + else: + self.cache.hset(key, field, json.dumps(value)) + self.cache.expire(key, 3600) + def _cache_get(self, key, field=None): + if field is None: + value_json = self.cache.get(key) + else: + value_json = self.cache.hget(key, field) + if value_json is not None: + return json.loads(value_json) + return None -def _cache_get(key, field=None): - _cache_init() - if field is None: - value_json = r_cache.get(key) - else: - value_json = r_cache.hget(key, field) - if value_json is not None: - return json.loads(value_json) - return None + def to_bool(self, s): + """ + Converts the given string to a boolean. + """ + return s.lower() in ('1', 'true', 'yes', 'on') + def get_submissions(self, url, day=None): + if day is None: + day = date.today().isoformat() + else: + day = day.isoformat() + return self.cache.zscore(f'{day}_submissions', url) -def to_bool(s): - """ - Converts the given string to a boolean. - """ - return s.lower() in ('1', 'true', 'yes', 'on') + def get_mail_sent(self, url, day=None): + if day is None: + day = date.today().isoformat() + else: + day = day.isoformat() + return self.cache.sismember(f'{day}_mails', url) + def set_mail_sent(self, url, day=None): + if day is None: + day = date.today().isoformat() + else: + day = day.isoformat() + return self.cache.sadd(f'{day}_mails', url) -def get_submissions(url, day=None): - _cache_init() - if day is None: - day = date.today().isoformat() - else: - day = day.isoformat() - key = date.today().isoformat() + '_submissions' - return r_cache.zscore(key, url) + def is_valid_url(self, url): + cached = self._cache_get(url, 'valid') + key = f'{date.today().isoformat()}_submissions' + self.cache.zincrby(key, 1, url) + if cached is not None: + return cached + if url.startswith('hxxp'): + url = 'http' + url[4:] + elif not url.startswith('http'): + url = 'http://' + url + logging.debug("Checking validity of URL: " + url) + self.fex.decode(url) + scheme = self.fex.get_scheme() + host = self.fex.get_host() + if scheme is None or host is None: + reason = "Not a valid http/https URL/URI" + return False, url, reason + self._cache_set(url, (True, url, None), 'valid') + return True, url, None + def is_ip(self, host): + try: + ipaddress.ip_address(host) + return True + except ValueError: + return False -def get_mail_sent(url, day=None): - _cache_init() - if day is None: - day = date.today().isoformat() - else: - day = day.isoformat() - key = date.today().isoformat() + '_mails' - return r_cache.sismember(key, url) - - -def set_mail_sent(url, day=None): - _cache_init() - if day is None: - day = date.today().isoformat() - else: - day = day.isoformat() - key = date.today().isoformat() + '_mails' - return r_cache.sadd(key, url) - - -def is_valid_url(url): - cached = _cache_get(url, 'valid') - key = date.today().isoformat() + '_submissions' - r_cache.zincrby(key, 1, url) - if cached is not None: - return cached - fex = Faup() - if url.startswith('hxxp'): - url = 'http' + url[4:] - elif not url.startswith('http'): - url = 'http://' + url - logging.debug("Checking validity of URL: " + url) - fex.decode(url) - scheme = fex.get_scheme() - host = fex.get_host() - if scheme is None or host is None: - reason = "Not a valid http/https URL/URI" - return False, url, reason - _cache_set(url, (True, url, None), 'valid') - return True, url, None - - -def is_ip(host): - try: - ipaddress.ip_address(host) - return True - except ValueError: - return False - - -def try_resolve(fex, url): - fex.decode(url) - host = fex.get_host().lower() - if is_ip(host): + def try_resolve(self, url): + self.fex.decode(url) + host = self.fex.get_host().lower() + if self.is_ip(host): + return True, None + try: + ipaddr = dns.resolver.query(host, 'A') + except Exception: + reason = "DNS server problem. Check resolver settings." + return False, reason + if not ipaddr: + reason = "Host " + host + " does not exist." + return False, reason return True, None - try: - ipaddr = dns.resolver.query(host, 'A') - except Exception: - reason = "DNS server problem. Check resolver settings." - return False, reason - if not ipaddr: - reason = "Host " + host + " does not exist." - return False, reason - return True, None + def get_urls(self, url, depth=1): + if depth > 5: + print('Too many redirects.') + return -def get_urls(url, depth=1): - if depth > 5: - print('Too many redirects.') - return - fex = Faup() - - def meta_redirect(content): - c = content.lower() - soup = BeautifulSoup(c, "html.parser") - for result in soup.find_all(attrs={'http-equiv': 'refresh'}): - if result: - out = result["content"].split(";") - if len(out) == 2: - wait, text = out - try: - a, url = text.split('=', 1) - return url.strip() - except Exception: - print(text) - return None - - resolve, reason = try_resolve(fex, url) - if not resolve: - # FIXME: inform that the domain does not resolve - yield url - return - - logging.debug("Making HTTP connection to " + url) - - headers = {'User-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0'} - try: - response = requests.get(url, allow_redirects=True, headers=headers, - timeout=15, verify=False) - except Exception: - # That one can fail (DNS for example) - # FIXME: inform that the get failed - yield url - return - if response.history is not None: - for h in response.history: - # Yeld the urls in the order we find them - yield h.url - - yield response.url - - meta_redir_url = meta_redirect(response.content) - if meta_redir_url is not None: - depth += 1 - if not meta_redir_url.startswith('http'): - fex.decode(url) - base = '{}://{}'.format(fex.get_scheme(), fex.get_host()) - port = fex.get_port() - if port is not None: - base += ':{}'.format(port) - if not meta_redir_url.startswith('/'): - # relative redirect. resource_path has the initial '/' - if fex.get_resource_path() is not None: - base += fex.get_resource_path() - if not base.endswith('/'): - base += '/' - meta_redir_url = base + meta_redir_url - for url in get_urls(meta_redir_url, depth): - yield url - - -def url_list(url): - cached = _cache_get(url, 'list') - if cached is not None: - return cached - list_urls = [] - for u in get_urls(url): - if u is None or u in list_urls: - continue - list_urls.append(u) - _cache_set(url, list_urls, 'list') - return list_urls - - -def dns_resolve(url): - cached = _cache_get(url, 'dns') - if cached is not None: - return cached - fex = Faup() - fex.decode(url) - host = fex.get_host().lower() - ipv4 = None - ipv6 = None - if is_ip(host): - if ':' in host: - try: - socket.inet_pton(socket.AF_INET6, host) - ipv6 = [host] - except Exception: - pass - else: - try: - socket.inet_aton(host) - ipv4 = [host] - except Exception: - pass - else: - try: - ipv4 = [str(ip) for ip in dns.resolver.query(host, 'A')] - except Exception: - logging.debug("No IPv4 address assigned to: " + host) - try: - ipv6 = [str(ip) for ip in dns.resolver.query(host, 'AAAA')] - except Exception: - logging.debug("No IPv6 address assigned to: " + host) - _cache_set(url, (ipv4, ipv6), 'dns') - return ipv4, ipv6 - - -def phish_query(url, key, query): - cached = _cache_get(query, 'phishtank') - if cached is not None: - return cached - postfields = {'url': quote(query), 'format': 'json', 'app_key': key} - response = requests.post(url, data=postfields) - res = response.json() - if res["meta"]["status"] == "success": - if res["results"]["in_database"]: - _cache_set(query, res["results"]["phish_detail_page"], 'phishtank') - return res["results"]["phish_detail_page"] - else: - # no information - pass - elif res["meta"]["status"] == 'error': - # Inform the user? - # errormsg = res["errortext"] - pass - return None - - -def sphinxsearch(server, port, url, query): - # WARNING: too dangerous to have on the public interface - return '' - """ - if not sphinx: - return None - cached = _cache_get(query, 'sphinx') - if cached is not None: - return cached - client = sphinxapi.SphinxClient() - client.SetServer(server, port) - client.SetMatchMode(2) - client.SetConnectTimeout(5.0) - result = [] - res = client.Query(query) - if res.get("matches") is not None: - for ticket in res["matches"]: - ticket_id = ticket["id"] - ticket_link = url + str(ticket_id) - result.append(ticket_link) - _cache_set(query, result, 'sphinx') - return result - - """ - - -def vt_query_url(url, url_up, key, query, upload=True): - cached = _cache_get(query, 'vt') - if cached is not None: - return cached - parameters = {"resource": query, "apikey": key} - if upload: - parameters['scan'] = 1 - response = requests.post(url, data=parameters) - if response.text is None or len(response.text) == 0: - return None - res = response.json() - msg = res["verbose_msg"] - link = res.get("permalink") - positives = res.get("positives") - total = res.get("total") - if positives is not None: - _cache_set(query, (msg, link, positives, total), 'vt') - return msg, link, positives, total - - -def gsb_query(url, query): - cached = _cache_get(query, 'gsb') - if cached is not None: - return cached - param = '1\n' + query - response = requests.post(url, data=param) - status = response.status_code - if status == 200: - _cache_set(query, response.text, 'gsb') - return response.text - - -''' -def urlquery_query(url, key, query): - return None - cached = _cache_get(query, 'urlquery') - if cached is not None: - return cached - try: - urlquery.url = url - urlquery.key = key - response = urlquery.search(query) - except Exception: - return None - if response['_response_']['status'] == 'ok': - if response.get('reports') is not None: - total_alert_count = 0 - for r in response['reports']: - total_alert_count += r['urlquery_alert_count'] - total_alert_count += r['ids_alert_count'] - total_alert_count += r['blacklist_alert_count'] - _cache_set(query, total_alert_count, 'urlquery') - return total_alert_count - else: + def meta_redirect(content): + c = content.lower() + soup = BeautifulSoup(c, "html.parser") + for result in soup.find_all(attrs={'http-equiv': 'refresh'}): + if result: + out = result["content"].split(";") + if len(out) == 2: + wait, text = out + try: + a, url = text.split('=', 1) + return url.strip() + except Exception: + print(text) return None -''' + resolve, reason = self.try_resolve(url) + if not resolve: + # FIXME: inform that the domain does not resolve + yield url + return -def process_emails(emails, ignorelist, replacelist): - to_return = list(set(emails)) - for mail in reversed(to_return): - for ignorelist_entry in ignorelist: - if re.search(ignorelist_entry, mail, re.I): - if mail in to_return: - to_return.remove(mail) - for k, v in list(replacelist.items()): - if re.search(k, mail, re.I): - if k in to_return: - to_return.remove(k) - to_return += v - return to_return + logging.debug(f"Making HTTP connection to {url}") + headers = {'User-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0'} + try: + response = requests.get(url, allow_redirects=True, headers=headers, + timeout=15, verify=False) + except Exception: + # That one can fail (DNS for example) + # FIXME: inform that the get failed + yield url + return + if response.history is not None: + for h in response.history: + # Yeld the urls in the order we find them + yield h.url -def whois(server, port, domain, ignorelist, replacelist): - cached = _cache_get(domain, 'whois') - if cached is not None: - return cached - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(15) - try: - s.connect((server, port)) - except Exception: - print("Connection problems - check WHOIS server") - print(("WHOIS request while problem occurred: ", domain)) - print(("WHOIS server: {}:{}".format(server, port))) - sys.exit(0) - if domain.startswith('http'): - fex = Faup() - fex.decode(domain) - d = fex.get_domain().lower() - else: - d = domain - s.send(("{}\r\n".format(d)).encode()) - response = b'' - while True: - d = s.recv(4096) - response += d - if d == b'': - break - s.close() - match = re.findall(r'[\w\.-]+@[\w\.-]+', response.decode()) - emails = process_emails(match, ignorelist, replacelist) - if len(emails) == 0: - return None - list_mail = list(set(emails)) - _cache_set(domain, list_mail, 'whois') - return list_mail + yield response.url + meta_redir_url = meta_redirect(response.content) + if meta_redir_url is not None: + depth += 1 + if not meta_redir_url.startswith('http'): + self.fex.decode(url) + base = '{}://{}'.format(self.fex.get_scheme(), self.fex.get_host()) + port = self.fex.get_port() + if port is not None: + base += f':{port}' + if not meta_redir_url.startswith('/'): + # relative redirect. resource_path has the initial '/' + if self.fex.get_resource_path() is not None: + base += self.fex.get_resource_path() + if not base.endswith('/'): + base += '/' + meta_redir_url = base + meta_redir_url + for url in self.get_urls(meta_redir_url, depth): + yield url -def pdnscircl(url, user, passwd, q): - cached = _cache_get(q, 'pdns') - if cached is not None: - return cached - pdns = PyPDNS(url, basic_auth=(user, passwd)) - response = pdns.query(q) - all_uniq = [] - for e in reversed(response): - host = e['rrname'].lower() - if host in all_uniq: - continue - else: - all_uniq.append(host) - response = (len(all_uniq), all_uniq[:5]) - _cache_set(q, response, 'pdns') - return response - - -def psslcircl(url, user, passwd, q): - cached = _cache_get(q, 'pssl') - if cached is not None: - return cached - pssl = PyPSSL(url, basic_auth=(user, passwd)) - response = pssl.query(q) - if response.get(q) is not None: - certinfo = response.get(q) - entries = {} - for sha1 in certinfo['certificates']: - entries[sha1] = [] - if certinfo['subjects'].get(sha1): - for value in certinfo['subjects'][sha1]['values']: - entries[sha1].append(value) - _cache_set(q, entries, 'pssl') - return entries - return None - - -def eupi(url, key, q): - cached = _cache_get(q, 'eupi') - if cached is not None: - return cached - eu = PyEUPI(key, url) - response = eu.search_url(url=q) - if response.get('results'): - r = response.get('results')[0]['tag_label'] - _cache_set(q, r, 'eupi') - return r - eu.post_submission(q) - return None - - -def bgpranking(ip): - cached = _cache_get(ip, 'ipasn') - if cached is not None: - asn = cached['asn'] - prefix = cached['prefix'] - else: - ipasn = IPASNHistory() - response = ipasn.query(ip) - if 'response' not in response: - asn = None - prefix = None - entry = response['response'][list(response['response'].keys())[0]] - _cache_set(ip, entry, 'ipasn') - asn = entry['asn'] - prefix = entry['prefix'] - - if not asn or not prefix: - # asn, prefix, asn_descr, rank, position, known_asns - return None, None, None, None, None, None - - cached = _cache_get(asn, 'bgp') - if cached is not None: - return cached - bgpranking = BGPRanking() - response = bgpranking.query(asn, date=(date.today() - timedelta(1)).isoformat()) - if 'response' not in response: - return None, None, None, None, None, None - to_return = (asn, prefix, response['response']['asn_description'], response['response']['ranking']['rank'], - response['response']['ranking']['position'], response['response']['ranking']['total_known_asns']) - _cache_set(asn, to_return, 'bgp') - return to_return - - -def _deserialize_cached(entry): - to_return = {} - h = r_cache.hgetall(entry) - for key, value in list(h.items()): - to_return[key] = json.loads(value) - return to_return - - -def get_url_data(url): - data = _deserialize_cached(url) - if data.get('dns') is not None: - ipv4, ipv6 = data['dns'] - ip_data = {} - if ipv4 is not None: - for ip in ipv4: - ip_data[ip] = _deserialize_cached(ip) - if ipv6 is not None: - for ip in ipv6: - ip_data[ip] = _deserialize_cached(ip) - if len(ip_data) > 0: - data.update(ip_data) - return {url: data} - - -def cached(url): - _cache_init() - url_data = get_url_data(url) - to_return = [url_data] - if url_data[url].get('list') is not None: - url_redirs = url_data[url]['list'] - for u in url_redirs: - if u == url: + def url_list(self, url): + cached = self._cache_get(url, 'list') + if cached is not None: + return cached + list_urls = [] + for u in self.get_urls(url): + if u is None or u in list_urls: continue - to_return.append(get_url_data(u)) - return to_return + list_urls.append(u) + self._cache_set(url, list_urls, 'list') + return list_urls + + def dns_resolve(self, url): + cached = self._cache_get(url, 'dns') + if cached is not None: + return cached + self.fex.decode(url) + host = self.fex.get_host().lower() + ipv4 = None + ipv6 = None + if self.is_ip(host): + if ':' in host: + try: + socket.inet_pton(socket.AF_INET6, host) + ipv6 = [host] + except Exception: + pass + else: + try: + socket.inet_aton(host) + ipv4 = [host] + except Exception: + pass + else: + try: + ipv4 = [str(ip) for ip in dns.resolver.query(host, 'A')] + except Exception: + logging.debug("No IPv4 address assigned to: " + host) + try: + ipv6 = [str(ip) for ip in dns.resolver.query(host, 'AAAA')] + except Exception: + logging.debug("No IPv6 address assigned to: " + host) + self._cache_set(url, (ipv4, ipv6), 'dns') + return ipv4, ipv6 + + def phish_query(self, url, key, query): + cached = self._cache_get(query, 'phishtank') + if cached is not None: + return cached + postfields = {'url': quote(query), 'format': 'json', 'app_key': key} + response = requests.post(url, data=postfields) + res = response.json() + if res["meta"]["status"] == "success": + if res["results"]["in_database"]: + self._cache_set(query, res["results"]["phish_detail_page"], 'phishtank') + return res["results"]["phish_detail_page"] + else: + # no information + pass + elif res["meta"]["status"] == 'error': + # Inform the user? + # errormsg = res["errortext"] + pass + return None + + def sphinxsearch(server, port, url, query): + # WARNING: too dangerous to have on the public interface + return '' + """ + if not sphinx: + return None + cached = _cache_get(query, 'sphinx') + if cached is not None: + return cached + client = sphinxapi.SphinxClient() + client.SetServer(server, port) + client.SetMatchMode(2) + client.SetConnectTimeout(5.0) + result = [] + res = client.Query(query) + if res.get("matches") is not None: + for ticket in res["matches"]: + ticket_id = ticket["id"] + ticket_link = url + str(ticket_id) + result.append(ticket_link) + _cache_set(query, result, 'sphinx') + return result + + """ + + def vt_query_url(self, url, url_up, key, query, upload=True): + cached = self._cache_get(query, 'vt') + if cached is not None: + return cached + parameters = {"resource": query, "apikey": key} + if upload: + parameters['scan'] = 1 + response = requests.post(url, data=parameters) + if response.text is None or len(response.text) == 0: + return None + res = response.json() + msg = res["verbose_msg"] + link = res.get("permalink") + positives = res.get("positives") + total = res.get("total") + if positives is not None: + self._cache_set(query, (msg, link, positives, total), 'vt') + return msg, link, positives, total + + def gsb_query(self, url, query): + cached = self._cache_get(query, 'gsb') + if cached is not None: + return cached + param = '1\n' + query + response = requests.post(url, data=param) + status = response.status_code + if status == 200: + self._cache_set(query, response.text, 'gsb') + return response.text + + ''' + def urlquery_query(url, key, query): + return None + cached = _cache_get(query, 'urlquery') + if cached is not None: + return cached + try: + urlquery.url = url + urlquery.key = key + response = urlquery.search(query) + except Exception: + return None + if response['_response_']['status'] == 'ok': + if response.get('reports') is not None: + total_alert_count = 0 + for r in response['reports']: + total_alert_count += r['urlquery_alert_count'] + total_alert_count += r['ids_alert_count'] + total_alert_count += r['blacklist_alert_count'] + _cache_set(query, total_alert_count, 'urlquery') + return total_alert_count + else: + return None + ''' + + def process_emails(self, emails, ignorelist, replacelist): + to_return = list(set(emails)) + for mail in reversed(to_return): + for ignorelist_entry in ignorelist: + if re.search(ignorelist_entry, mail, re.I): + if mail in to_return: + to_return.remove(mail) + for k, v in list(replacelist.items()): + if re.search(k, mail, re.I): + if k in to_return: + to_return.remove(k) + to_return += v + return to_return + + def whois(self, server, port, domain, ignorelist, replacelist): + cached = self._cache_get(domain, 'whois') + if cached is not None: + return cached + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(15) + try: + s.connect((server, port)) + except Exception: + print("Connection problems - check WHOIS server") + print(("WHOIS request while problem occurred: ", domain)) + print(("WHOIS server: {}:{}".format(server, port))) + return None + if domain.startswith('http'): + self.fex.decode(domain) + d = self.fex.get_domain().lower() + else: + d = domain + s.send(("{}\r\n".format(d)).encode()) + response = b'' + while True: + d = s.recv(4096) + response += d + if d == b'': + break + s.close() + match = re.findall(r'[\w\.-]+@[\w\.-]+', response.decode()) + emails = self.process_emails(match, ignorelist, replacelist) + if len(emails) == 0: + return None + list_mail = list(set(emails)) + self._cache_set(domain, list_mail, 'whois') + return list_mail + + def pdnscircl(self, url, user, passwd, q): + cached = self._cache_get(q, 'pdns') + if cached is not None: + return cached + pdns = PyPDNS(url, basic_auth=(user, passwd)) + response = pdns.query(q) + all_uniq = [] + for e in reversed(response): + host = e['rrname'].lower() + if host in all_uniq: + continue + else: + all_uniq.append(host) + response = (len(all_uniq), all_uniq[:5]) + self._cache_set(q, response, 'pdns') + return response + + def psslcircl(self, url, user, passwd, q): + cached = self._cache_get(q, 'pssl') + if cached is not None: + return cached + pssl = PyPSSL(url, basic_auth=(user, passwd)) + response = pssl.query(q) + if response.get(q) is not None: + certinfo = response.get(q) + entries = {} + for sha1 in certinfo['certificates']: + entries[sha1] = [] + if certinfo['subjects'].get(sha1): + for value in certinfo['subjects'][sha1]['values']: + entries[sha1].append(value) + self._cache_set(q, entries, 'pssl') + return entries + return None + + def eupi(self, url, key, q): + cached = self._cache_get(q, 'eupi') + if cached is not None: + return cached + eu = PyEUPI(key, url) + response = eu.search_url(url=q) + if response.get('results'): + r = response.get('results')[0]['tag_label'] + self._cache_set(q, r, 'eupi') + return r + eu.post_submission(q) + return None + + def bgpranking(self, ip): + cached = self._cache_get(ip, 'ipasn') + if cached is not None: + asn = cached['asn'] + prefix = cached['prefix'] + else: + ipasn = IPASNHistory() + response = ipasn.query(ip) + if 'response' not in response: + asn = None + prefix = None + entry = response['response'][list(response['response'].keys())[0]] + if entry: + self._cache_set(ip, entry, 'ipasn') + asn = entry['asn'] + prefix = entry['prefix'] + else: + asn = None + prefix = None + + if not asn or not prefix: + # asn, prefix, asn_descr, rank, position, known_asns + return None, None, None, None, None, None + + cached = self._cache_get(asn, 'bgp') + if cached is not None: + return cached + bgpranking = BGPRanking() + response = bgpranking.query(asn, date=(date.today() - timedelta(1)).isoformat()) + if 'response' not in response or not response['response']: + return None, None, None, None, None, None + to_return = (asn, prefix, response['response']['asn_description'], response['response']['ranking']['rank'], + response['response']['ranking']['position'], response['response']['ranking']['total_known_asns']) + self._cache_set(asn, to_return, 'bgp') + return to_return + + def _deserialize_cached(self, entry): + to_return = {} + h = self.cache.hgetall(entry) + for key, value in list(h.items()): + to_return[key] = json.loads(value) + return to_return + + def get_url_data(self, url): + data = self._deserialize_cached(url) + if data.get('dns') is not None: + ipv4, ipv6 = data['dns'] + ip_data = {} + if ipv4 is not None: + for ip in ipv4: + ip_data[ip] = self._deserialize_cached(ip) + if ipv6 is not None: + for ip in ipv6: + ip_data[ip] = self._deserialize_cached(ip) + if len(ip_data) > 0: + data.update(ip_data) + return {url: data} + + def cached(self, url): + url_data = self.get_url_data(url) + to_return = [url_data] + if url_data[url].get('list') is not None: + url_redirs = url_data[url]['list'] + for u in url_redirs: + if u == url: + continue + to_return.append(self.get_url_data(u)) + return to_return diff --git a/website/web/__init__.py b/website/web/__init__.py index a28d824..cbc7876 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -1,6 +1,7 @@ import json import os from pathlib import Path +import uuid from flask import Flask, render_template, request, Response, redirect, url_for from flask_mail import Mail, Message @@ -14,17 +15,13 @@ import logging from logging.handlers import RotatingFileHandler from logging import Formatter -from rq import Queue -from rq.job import Job from redis import Redis from urlabuse.helpers import get_socket_path +from urlabuse.urlabuse import Query import configparser from .proxied import ReverseProxied -from urlabuse.urlabuse import is_valid_url, url_list, dns_resolve, phish_query, psslcircl, \ - vt_query_url, gsb_query, sphinxsearch, whois, pdnscircl, bgpranking, \ - cached, get_mail_sent, set_mail_sent, get_submissions, eupi config_dir = Path('config') @@ -73,7 +70,9 @@ def create_app(configfile=None): app.logger.addHandler(handler) app.logger.setLevel(logging.INFO) Bootstrap(app) - q = Queue(connection=Redis(unix_socket_path=get_socket_path('cache'))) + queue = Redis(unix_socket_path=get_socket_path('cache'), db=0, + decode_responses=True) + urlabuse_query = Query() # Mail Config app.config['MAIL_SERVER'] = 'localhost' @@ -102,6 +101,9 @@ def create_app(configfile=None): @app.route('/', methods=['GET', 'POST']) def index(): + if request.method == 'HEAD': + # Just returns ack if the webserver is running + return 'Ack' form = URLForm() return render_template('index.html', form=form) @@ -143,38 +145,41 @@ def create_app(configfile=None): @app.route("/_result/", methods=['GET']) def check_valid(job_key): - if job_key is None: + if not job_key or not queue.exists(job_key): return json.dumps(None), 200 - job = Job.fetch(job_key, connection=Redis(unix_socket_path=get_socket_path('cache'))) - if job.is_finished: - return json.dumps(job.result), 200 - else: + if not queue.hexists(job_key, 'result'): return json.dumps("Nay!"), 202 + result = queue.hget(job_key, 'result') + queue.delete(job_key) + return Response(result, mimetype='application/json'), 200 + + def enqueue(method, data): + job_id = str(uuid.uuid4()) + p = queue.pipeline() + p.hmset(job_id, {'method': method, 'data': json.dumps(data)}) + p.sadd('to_process', job_id) + p.execute() + return job_id @app.route('/start', methods=['POST']) def run_query(): - data = json.loads(request.data.decode()) + data = request.get_json(force=True) url = data["url"] ip = _get_user_ip(request) - app.logger.info('{} {}'.format(ip, url)) - if get_submissions(url) and get_submissions(url) >= autosend_threshold: + app.logger.info(f'{ip} {url}') + if urlabuse_query.get_submissions(url) and urlabuse_query.get_submissions(url) >= autosend_threshold: send(url, '', True) - is_valid = q.enqueue_call(func=is_valid_url, args=(url,), result_ttl=500) - return is_valid.get_id() + return enqueue('is_valid_url', {'url': url}) @app.route('/urls', methods=['POST']) def urls(): - data = json.loads(request.data.decode()) - url = data["url"] - u = q.enqueue_call(func=url_list, args=(url,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('url_list', {'url': data["url"]}) @app.route('/resolve', methods=['POST']) def resolve(): - data = json.loads(request.data.decode()) - url = data["url"] - u = q.enqueue_call(func=dns_resolve, args=(url,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('dns_resolve', {'url': data["url"]}) def read_auth(name): key = config_dir / f'{name}.key' @@ -191,25 +196,19 @@ def create_app(configfile=None): auth = read_auth('phishtank') if not auth: return '' - key = auth[0] - data = json.loads(request.data.decode()) - url = parser.get("PHISHTANK", "url") - query = data["query"] - u = q.enqueue_call(func=phish_query, args=(url, key, query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('phish_query', {'url': parser.get("PHISHTANK", "url"), + 'key': auth[0], 'query': data["query"]}) @app.route('/virustotal_report', methods=['POST']) def vt(): auth = read_auth('virustotal') if not auth: return '' - key = auth[0] - data = json.loads(request.data.decode()) - url = parser.get("VIRUSTOTAL", "url_report") - url_up = parser.get("VIRUSTOTAL", "url_upload") - query = data["query"] - u = q.enqueue_call(func=vt_query_url, args=(url, url_up, key, query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('vt_query_url', {'url': parser.get("VIRUSTOTAL", "url_report"), + 'url_up': parser.get("VIRUSTOTAL", "url_upload"), + 'key': auth[0], 'query': data["query"]}) @app.route('/googlesafebrowsing', methods=['POST']) def gsb(): @@ -217,12 +216,10 @@ def create_app(configfile=None): if not auth: return '' key = auth[0] - data = json.loads(request.data.decode()) - url = parser.get("GOOGLESAFEBROWSING", "url") - url = url.format(key) - query = data["query"] - u = q.enqueue_call(func=gsb_query, args=(url, query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + url = parser.get("GOOGLESAFEBROWSING", "url").format(key) + return enqueue('gsb_query', {'url': url, + 'query': data["query"]}) ''' @app.route('/urlquery', methods=['POST']) @@ -236,7 +233,6 @@ def create_app(configfile=None): query = data["query"] u = q.enqueue_call(func=urlquery_query, args=(url, key, query,), result_ttl=500) return u.get_id() - ''' @app.route('/ticket', methods=['POST']) def ticket(): @@ -250,30 +246,24 @@ def create_app(configfile=None): u = q.enqueue_call(func=sphinxsearch, args=(server, port, url, query,), result_ttl=500) return u.get_id() + ''' @app.route('/whois', methods=['POST']) def whoismail(): - # if not request.authorization: - # return '' - server = parser.get("WHOIS", "server") - port = parser.getint("WHOIS", "port") - data = json.loads(request.data.decode()) - query = data["query"] - u = q.enqueue_call(func=whois, args=(server, port, query, ignorelist, replacelist), - result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('whois', {'server': parser.get("WHOIS", "server"), + 'port': parser.getint("WHOIS", "port"), + 'domain': data["query"], + 'ignorelist': ignorelist, 'replacelist': replacelist}) @app.route('/eupi', methods=['POST']) def eu(): auth = read_auth('eupi') if not auth: return '' - key = auth[0] - data = json.loads(request.data.decode()) - url = parser.get("EUPI", "url") - query = data["query"] - u = q.enqueue_call(func=eupi, args=(url, key, query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('eupi', {'url': parser.get("EUPI", "url"), + 'key': auth[0], 'query': data["query"]}) @app.route('/pdnscircl', methods=['POST']) def dnscircl(): @@ -282,18 +272,14 @@ def create_app(configfile=None): return '' user, password = auth url = parser.get("PDNS_CIRCL", "url") - data = json.loads(request.data.decode()) - query = data["query"] - u = q.enqueue_call(func=pdnscircl, args=(url, user.strip(), password.strip(), - query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('pdnscircl', {'url': url, 'user': user.strip(), + 'passwd': password.strip(), 'q': data["query"]}) @app.route('/bgpranking', methods=['POST']) def bgpr(): - data = json.loads(request.data.decode()) - query = data["query"] - u = q.enqueue_call(func=bgpranking, args=(query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('bgpranking', {'ip': data["query"]}) @app.route('/psslcircl', methods=['POST']) def sslcircl(): @@ -301,19 +287,16 @@ def create_app(configfile=None): if not auth: return '' user, password = auth - url = parser.get("PDNS_CIRCL", "url") url = parser.get("PSSL_CIRCL", "url") - data = json.loads(request.data.decode()) - query = data["query"] - u = q.enqueue_call(func=psslcircl, args=(url, user.strip(), password.strip(), - query,), result_ttl=500) - return u.get_id() + data = request.get_json(force=True) + return enqueue('psslcircl', {'url': url, 'user': user.strip(), + 'passwd': password.strip(), 'q': data["query"]}) @app.route('/get_cache', methods=['POST']) def get_cache(): data = json.loads(request.data.decode()) url = data["query"] - data = cached(url) + data = urlabuse_query.cached(url) dumped = json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')) return dumped @@ -356,9 +339,10 @@ def create_app(configfile=None): return to_return def send(url, ip='', autosend=False): - if not get_mail_sent(url): - set_mail_sent(url) - data = cached(url) + return + if not urlabuse_query.get_mail_sent(url): + urlabuse_query.set_mail_sent(url) + data = urlabuse_query.cached(url) if not autosend: subject = 'URL Abuse report from ' + ip else: @@ -373,7 +357,7 @@ def create_app(configfile=None): def send_mail(): data = json.loads(request.data.decode()) url = data["url"] - if not get_mail_sent(url): + if not urlabuse_query.get_mail_sent(url): ip = _get_user_ip(request) send(url, ip) return redirect(url_for('index'))