chg: Remove RQ dependency, big cleanup.

pull/14/head
Raphaël Vinot 2019-01-11 14:00:25 +01:00
parent 8630ba8a98
commit 3499911435
8 changed files with 566 additions and 573 deletions

View File

@ -2,17 +2,29 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import argparse import argparse
from multiprocessing import Pool from multiprocessing import Pool
from rq import Worker, Queue, Connection
from redis import Redis from redis import Redis
from urlabuse.helpers import get_socket_path from urlabuse.helpers import get_socket_path
from urlabuse.urlabuse import Query
import json
import time
def worker(process_id: int): def worker(process_id: int):
listen = ['default'] urlabuse_query = Query()
cache_socket = get_socket_path('cache') queue = Redis(unix_socket_path=get_socket_path('cache'), db=0,
with Connection(Redis(unix_socket_path=cache_socket)): decode_responses=True)
worker = Worker(list(map(Queue, listen))) while True:
worker.work() jobid = queue.spop('to_process')
if not jobid:
time.sleep(.1)
continue
to_process = queue.hgetall(jobid)
parameters = json.loads(to_process['data'])
try:
result = getattr(urlabuse_query, to_process['method'])(**parameters)
queue.hset(jobid, 'result', json.dumps(result))
except Exception as e:
print(e, to_process)
if __name__ == '__main__': if __name__ == '__main__':

24
client/bin/urlabuse Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
from pyurlabuse import PyURLAbuse
import json
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run a query against URL Abuse')
parser.add_argument('--url', type=str, help='URL of the instance.')
parser.add_argument('--query', help='URL to lookup')
args = parser.parse_args()
if args.url:
urlabuse = PyURLAbuse(args.url)
else:
urlabuse = PyURLAbuse()
response = urlabuse.run_query(args.query)
print(json.dumps(response, indent=2))

View File

@ -1 +1 @@
from api import PyURLAbuse from .api import PyURLAbuse

View File

@ -4,27 +4,31 @@
import json import json
import requests import requests
import time import time
from urllib.parse import urljoin
class PyURLAbuse(object): class PyURLAbuse(object):
# def __init__(self, url='https://www.circl.lu/urlabuse/'): def __init__(self, url='https://www.circl.lu/urlabuse/'):
def __init__(self, url='http://0.0.0.0:5100/'):
self.url = url self.url = url
self.session = requests.Session() self.session = requests.Session()
self.session.headers.update({'content-type': 'application/json'}) self.session.headers.update({'content-type': 'application/json'})
@property
def is_up(self):
r = self.session.head(self.root_url)
return r.status_code == 200
def get_result(self, job_id): def get_result(self, job_id):
response = self.session.get('{}_result/{}' .format(self.url, job_id)) response = self.session.get(urljoin(self.url, f'_result/{job_id}'))
if response.status_code == 202: if response.status_code == 202:
return None return None
else: else:
return response.json() return response.json()
def _async(self, path, query): def _async(self, path, query):
response = self.session.post('{}{}' .format(self.url, path), response = self.session.post(urljoin(self.url, path), data=json.dumps(query))
data=json.dumps(query))
return response.text return response.text
def start(self, q): def start(self, q):
@ -102,7 +106,7 @@ class PyURLAbuse(object):
done = [] done = []
while waiting: while waiting:
waiting = False waiting = False
for u, job_id in res.iteritems(): for u, job_id in res.items():
if job_id in done: if job_id in done:
continue continue
ips = self.get_result(job_id) ips = self.get_result(job_id)
@ -132,5 +136,5 @@ class PyURLAbuse(object):
def get_cache(self, q): def get_cache(self, q):
query = {'query': q} query = {'query': q}
response = self.session.post('{}get_cache' .format(self.url), data=json.dumps(query)) response = self.session.post(urljoin(self.url, 'get_cache'), data=json.dumps(query))
return response.json() return response.json()

View File

@ -12,6 +12,7 @@ setup(
description='Python API for URL Abuse.', description='Python API for URL Abuse.',
long_description=open('README.md').read(), long_description=open('README.md').read(),
packages=['pyurlabuse'], packages=['pyurlabuse'],
scripts=['bin/urlabuse'],
classifiers=[ classifiers=[
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Development Status :: 3 - Alpha', 'Development Status :: 3 - Alpha',

View File

@ -1,4 +1,3 @@
rq
redis>=3 redis>=3
pypssl pypssl
pypdns pypdns

View File

@ -7,7 +7,7 @@
from datetime import date, timedelta from datetime import date, timedelta
import json import json
import redis from redis import Redis
from urllib.parse import quote from urllib.parse import quote
from .helpers import get_socket_path from .helpers import get_socket_path
import ipaddress import ipaddress
@ -17,7 +17,6 @@ from pyfaup.faup import Faup
import socket import socket
import dns.resolver import dns.resolver
import re import re
import sys
import logging import logging
from pypdns import PyPDNS from pypdns import PyPDNS
from pyipasnhistory import IPASNHistory from pyipasnhistory import IPASNHistory
@ -35,503 +34,473 @@ try:
except Exception: except Exception:
sphinx = False sphinx = False
r_cache = None
class Query():
def _cache_init(): def __init__(self, loglevel: int=logging.DEBUG):
global r_cache self.__init_logger(loglevel)
if r_cache is None: self.fex = Faup()
r_cache = redis.Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) self.cache = Redis(unix_socket_path=get_socket_path('cache'), db=1,
decode_responses=True)
def __init_logger(self, loglevel) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(loglevel)
def _cache_set(key, value, field=None): def _cache_set(self, key, value, field=None):
_cache_init() if field is None:
if field is None: self.cache.setex(key, json.dumps(value), 3600)
r_cache.setex(key, json.dumps(value), 3600) else:
else: self.cache.hset(key, field, json.dumps(value))
r_cache.hset(key, field, json.dumps(value)) self.cache.expire(key, 3600)
r_cache.expire(key, 3600)
def _cache_get(self, key, field=None):
if field is None:
value_json = self.cache.get(key)
else:
value_json = self.cache.hget(key, field)
if value_json is not None:
return json.loads(value_json)
return None
def _cache_get(key, field=None): def to_bool(self, s):
_cache_init() """
if field is None: Converts the given string to a boolean.
value_json = r_cache.get(key) """
else: return s.lower() in ('1', 'true', 'yes', 'on')
value_json = r_cache.hget(key, field)
if value_json is not None:
return json.loads(value_json)
return None
def get_submissions(self, url, day=None):
if day is None:
day = date.today().isoformat()
else:
day = day.isoformat()
return self.cache.zscore(f'{day}_submissions', url)
def to_bool(s): def get_mail_sent(self, url, day=None):
""" if day is None:
Converts the given string to a boolean. day = date.today().isoformat()
""" else:
return s.lower() in ('1', 'true', 'yes', 'on') day = day.isoformat()
return self.cache.sismember(f'{day}_mails', url)
def set_mail_sent(self, url, day=None):
if day is None:
day = date.today().isoformat()
else:
day = day.isoformat()
return self.cache.sadd(f'{day}_mails', url)
def get_submissions(url, day=None): def is_valid_url(self, url):
_cache_init() cached = self._cache_get(url, 'valid')
if day is None: key = f'{date.today().isoformat()}_submissions'
day = date.today().isoformat() self.cache.zincrby(key, 1, url)
else: if cached is not None:
day = day.isoformat() return cached
key = date.today().isoformat() + '_submissions' if url.startswith('hxxp'):
return r_cache.zscore(key, url) url = 'http' + url[4:]
elif not url.startswith('http'):
url = 'http://' + url
logging.debug("Checking validity of URL: " + url)
self.fex.decode(url)
scheme = self.fex.get_scheme()
host = self.fex.get_host()
if scheme is None or host is None:
reason = "Not a valid http/https URL/URI"
return False, url, reason
self._cache_set(url, (True, url, None), 'valid')
return True, url, None
def is_ip(self, host):
try:
ipaddress.ip_address(host)
return True
except ValueError:
return False
def get_mail_sent(url, day=None): def try_resolve(self, url):
_cache_init() self.fex.decode(url)
if day is None: host = self.fex.get_host().lower()
day = date.today().isoformat() if self.is_ip(host):
else: return True, None
day = day.isoformat() try:
key = date.today().isoformat() + '_mails' ipaddr = dns.resolver.query(host, 'A')
return r_cache.sismember(key, url) except Exception:
reason = "DNS server problem. Check resolver settings."
return False, reason
def set_mail_sent(url, day=None): if not ipaddr:
_cache_init() reason = "Host " + host + " does not exist."
if day is None: return False, reason
day = date.today().isoformat()
else:
day = day.isoformat()
key = date.today().isoformat() + '_mails'
return r_cache.sadd(key, url)
def is_valid_url(url):
cached = _cache_get(url, 'valid')
key = date.today().isoformat() + '_submissions'
r_cache.zincrby(key, 1, url)
if cached is not None:
return cached
fex = Faup()
if url.startswith('hxxp'):
url = 'http' + url[4:]
elif not url.startswith('http'):
url = 'http://' + url
logging.debug("Checking validity of URL: " + url)
fex.decode(url)
scheme = fex.get_scheme()
host = fex.get_host()
if scheme is None or host is None:
reason = "Not a valid http/https URL/URI"
return False, url, reason
_cache_set(url, (True, url, None), 'valid')
return True, url, None
def is_ip(host):
try:
ipaddress.ip_address(host)
return True
except ValueError:
return False
def try_resolve(fex, url):
fex.decode(url)
host = fex.get_host().lower()
if is_ip(host):
return True, None return True, None
try:
ipaddr = dns.resolver.query(host, 'A')
except Exception:
reason = "DNS server problem. Check resolver settings."
return False, reason
if not ipaddr:
reason = "Host " + host + " does not exist."
return False, reason
return True, None
def get_urls(self, url, depth=1):
if depth > 5:
print('Too many redirects.')
return
def get_urls(url, depth=1): def meta_redirect(content):
if depth > 5: c = content.lower()
print('Too many redirects.') soup = BeautifulSoup(c, "html.parser")
return for result in soup.find_all(attrs={'http-equiv': 'refresh'}):
fex = Faup() if result:
out = result["content"].split(";")
def meta_redirect(content): if len(out) == 2:
c = content.lower() wait, text = out
soup = BeautifulSoup(c, "html.parser") try:
for result in soup.find_all(attrs={'http-equiv': 'refresh'}): a, url = text.split('=', 1)
if result: return url.strip()
out = result["content"].split(";") except Exception:
if len(out) == 2: print(text)
wait, text = out
try:
a, url = text.split('=', 1)
return url.strip()
except Exception:
print(text)
return None
resolve, reason = try_resolve(fex, url)
if not resolve:
# FIXME: inform that the domain does not resolve
yield url
return
logging.debug("Making HTTP connection to " + url)
headers = {'User-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0'}
try:
response = requests.get(url, allow_redirects=True, headers=headers,
timeout=15, verify=False)
except Exception:
# That one can fail (DNS for example)
# FIXME: inform that the get failed
yield url
return
if response.history is not None:
for h in response.history:
# Yeld the urls in the order we find them
yield h.url
yield response.url
meta_redir_url = meta_redirect(response.content)
if meta_redir_url is not None:
depth += 1
if not meta_redir_url.startswith('http'):
fex.decode(url)
base = '{}://{}'.format(fex.get_scheme(), fex.get_host())
port = fex.get_port()
if port is not None:
base += ':{}'.format(port)
if not meta_redir_url.startswith('/'):
# relative redirect. resource_path has the initial '/'
if fex.get_resource_path() is not None:
base += fex.get_resource_path()
if not base.endswith('/'):
base += '/'
meta_redir_url = base + meta_redir_url
for url in get_urls(meta_redir_url, depth):
yield url
def url_list(url):
cached = _cache_get(url, 'list')
if cached is not None:
return cached
list_urls = []
for u in get_urls(url):
if u is None or u in list_urls:
continue
list_urls.append(u)
_cache_set(url, list_urls, 'list')
return list_urls
def dns_resolve(url):
cached = _cache_get(url, 'dns')
if cached is not None:
return cached
fex = Faup()
fex.decode(url)
host = fex.get_host().lower()
ipv4 = None
ipv6 = None
if is_ip(host):
if ':' in host:
try:
socket.inet_pton(socket.AF_INET6, host)
ipv6 = [host]
except Exception:
pass
else:
try:
socket.inet_aton(host)
ipv4 = [host]
except Exception:
pass
else:
try:
ipv4 = [str(ip) for ip in dns.resolver.query(host, 'A')]
except Exception:
logging.debug("No IPv4 address assigned to: " + host)
try:
ipv6 = [str(ip) for ip in dns.resolver.query(host, 'AAAA')]
except Exception:
logging.debug("No IPv6 address assigned to: " + host)
_cache_set(url, (ipv4, ipv6), 'dns')
return ipv4, ipv6
def phish_query(url, key, query):
cached = _cache_get(query, 'phishtank')
if cached is not None:
return cached
postfields = {'url': quote(query), 'format': 'json', 'app_key': key}
response = requests.post(url, data=postfields)
res = response.json()
if res["meta"]["status"] == "success":
if res["results"]["in_database"]:
_cache_set(query, res["results"]["phish_detail_page"], 'phishtank')
return res["results"]["phish_detail_page"]
else:
# no information
pass
elif res["meta"]["status"] == 'error':
# Inform the user?
# errormsg = res["errortext"]
pass
return None
def sphinxsearch(server, port, url, query):
# WARNING: too dangerous to have on the public interface
return ''
"""
if not sphinx:
return None
cached = _cache_get(query, 'sphinx')
if cached is not None:
return cached
client = sphinxapi.SphinxClient()
client.SetServer(server, port)
client.SetMatchMode(2)
client.SetConnectTimeout(5.0)
result = []
res = client.Query(query)
if res.get("matches") is not None:
for ticket in res["matches"]:
ticket_id = ticket["id"]
ticket_link = url + str(ticket_id)
result.append(ticket_link)
_cache_set(query, result, 'sphinx')
return result
"""
def vt_query_url(url, url_up, key, query, upload=True):
cached = _cache_get(query, 'vt')
if cached is not None:
return cached
parameters = {"resource": query, "apikey": key}
if upload:
parameters['scan'] = 1
response = requests.post(url, data=parameters)
if response.text is None or len(response.text) == 0:
return None
res = response.json()
msg = res["verbose_msg"]
link = res.get("permalink")
positives = res.get("positives")
total = res.get("total")
if positives is not None:
_cache_set(query, (msg, link, positives, total), 'vt')
return msg, link, positives, total
def gsb_query(url, query):
cached = _cache_get(query, 'gsb')
if cached is not None:
return cached
param = '1\n' + query
response = requests.post(url, data=param)
status = response.status_code
if status == 200:
_cache_set(query, response.text, 'gsb')
return response.text
'''
def urlquery_query(url, key, query):
return None
cached = _cache_get(query, 'urlquery')
if cached is not None:
return cached
try:
urlquery.url = url
urlquery.key = key
response = urlquery.search(query)
except Exception:
return None
if response['_response_']['status'] == 'ok':
if response.get('reports') is not None:
total_alert_count = 0
for r in response['reports']:
total_alert_count += r['urlquery_alert_count']
total_alert_count += r['ids_alert_count']
total_alert_count += r['blacklist_alert_count']
_cache_set(query, total_alert_count, 'urlquery')
return total_alert_count
else:
return None return None
'''
resolve, reason = self.try_resolve(url)
if not resolve:
# FIXME: inform that the domain does not resolve
yield url
return
def process_emails(emails, ignorelist, replacelist): logging.debug(f"Making HTTP connection to {url}")
to_return = list(set(emails))
for mail in reversed(to_return):
for ignorelist_entry in ignorelist:
if re.search(ignorelist_entry, mail, re.I):
if mail in to_return:
to_return.remove(mail)
for k, v in list(replacelist.items()):
if re.search(k, mail, re.I):
if k in to_return:
to_return.remove(k)
to_return += v
return to_return
headers = {'User-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0'}
try:
response = requests.get(url, allow_redirects=True, headers=headers,
timeout=15, verify=False)
except Exception:
# That one can fail (DNS for example)
# FIXME: inform that the get failed
yield url
return
if response.history is not None:
for h in response.history:
# Yeld the urls in the order we find them
yield h.url
def whois(server, port, domain, ignorelist, replacelist): yield response.url
cached = _cache_get(domain, 'whois')
if cached is not None:
return cached
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(15)
try:
s.connect((server, port))
except Exception:
print("Connection problems - check WHOIS server")
print(("WHOIS request while problem occurred: ", domain))
print(("WHOIS server: {}:{}".format(server, port)))
sys.exit(0)
if domain.startswith('http'):
fex = Faup()
fex.decode(domain)
d = fex.get_domain().lower()
else:
d = domain
s.send(("{}\r\n".format(d)).encode())
response = b''
while True:
d = s.recv(4096)
response += d
if d == b'':
break
s.close()
match = re.findall(r'[\w\.-]+@[\w\.-]+', response.decode())
emails = process_emails(match, ignorelist, replacelist)
if len(emails) == 0:
return None
list_mail = list(set(emails))
_cache_set(domain, list_mail, 'whois')
return list_mail
meta_redir_url = meta_redirect(response.content)
if meta_redir_url is not None:
depth += 1
if not meta_redir_url.startswith('http'):
self.fex.decode(url)
base = '{}://{}'.format(self.fex.get_scheme(), self.fex.get_host())
port = self.fex.get_port()
if port is not None:
base += f':{port}'
if not meta_redir_url.startswith('/'):
# relative redirect. resource_path has the initial '/'
if self.fex.get_resource_path() is not None:
base += self.fex.get_resource_path()
if not base.endswith('/'):
base += '/'
meta_redir_url = base + meta_redir_url
for url in self.get_urls(meta_redir_url, depth):
yield url
def pdnscircl(url, user, passwd, q): def url_list(self, url):
cached = _cache_get(q, 'pdns') cached = self._cache_get(url, 'list')
if cached is not None: if cached is not None:
return cached return cached
pdns = PyPDNS(url, basic_auth=(user, passwd)) list_urls = []
response = pdns.query(q) for u in self.get_urls(url):
all_uniq = [] if u is None or u in list_urls:
for e in reversed(response):
host = e['rrname'].lower()
if host in all_uniq:
continue
else:
all_uniq.append(host)
response = (len(all_uniq), all_uniq[:5])
_cache_set(q, response, 'pdns')
return response
def psslcircl(url, user, passwd, q):
cached = _cache_get(q, 'pssl')
if cached is not None:
return cached
pssl = PyPSSL(url, basic_auth=(user, passwd))
response = pssl.query(q)
if response.get(q) is not None:
certinfo = response.get(q)
entries = {}
for sha1 in certinfo['certificates']:
entries[sha1] = []
if certinfo['subjects'].get(sha1):
for value in certinfo['subjects'][sha1]['values']:
entries[sha1].append(value)
_cache_set(q, entries, 'pssl')
return entries
return None
def eupi(url, key, q):
cached = _cache_get(q, 'eupi')
if cached is not None:
return cached
eu = PyEUPI(key, url)
response = eu.search_url(url=q)
if response.get('results'):
r = response.get('results')[0]['tag_label']
_cache_set(q, r, 'eupi')
return r
eu.post_submission(q)
return None
def bgpranking(ip):
cached = _cache_get(ip, 'ipasn')
if cached is not None:
asn = cached['asn']
prefix = cached['prefix']
else:
ipasn = IPASNHistory()
response = ipasn.query(ip)
if 'response' not in response:
asn = None
prefix = None
entry = response['response'][list(response['response'].keys())[0]]
_cache_set(ip, entry, 'ipasn')
asn = entry['asn']
prefix = entry['prefix']
if not asn or not prefix:
# asn, prefix, asn_descr, rank, position, known_asns
return None, None, None, None, None, None
cached = _cache_get(asn, 'bgp')
if cached is not None:
return cached
bgpranking = BGPRanking()
response = bgpranking.query(asn, date=(date.today() - timedelta(1)).isoformat())
if 'response' not in response:
return None, None, None, None, None, None
to_return = (asn, prefix, response['response']['asn_description'], response['response']['ranking']['rank'],
response['response']['ranking']['position'], response['response']['ranking']['total_known_asns'])
_cache_set(asn, to_return, 'bgp')
return to_return
def _deserialize_cached(entry):
to_return = {}
h = r_cache.hgetall(entry)
for key, value in list(h.items()):
to_return[key] = json.loads(value)
return to_return
def get_url_data(url):
data = _deserialize_cached(url)
if data.get('dns') is not None:
ipv4, ipv6 = data['dns']
ip_data = {}
if ipv4 is not None:
for ip in ipv4:
ip_data[ip] = _deserialize_cached(ip)
if ipv6 is not None:
for ip in ipv6:
ip_data[ip] = _deserialize_cached(ip)
if len(ip_data) > 0:
data.update(ip_data)
return {url: data}
def cached(url):
_cache_init()
url_data = get_url_data(url)
to_return = [url_data]
if url_data[url].get('list') is not None:
url_redirs = url_data[url]['list']
for u in url_redirs:
if u == url:
continue continue
to_return.append(get_url_data(u)) list_urls.append(u)
return to_return self._cache_set(url, list_urls, 'list')
return list_urls
def dns_resolve(self, url):
cached = self._cache_get(url, 'dns')
if cached is not None:
return cached
self.fex.decode(url)
host = self.fex.get_host().lower()
ipv4 = None
ipv6 = None
if self.is_ip(host):
if ':' in host:
try:
socket.inet_pton(socket.AF_INET6, host)
ipv6 = [host]
except Exception:
pass
else:
try:
socket.inet_aton(host)
ipv4 = [host]
except Exception:
pass
else:
try:
ipv4 = [str(ip) for ip in dns.resolver.query(host, 'A')]
except Exception:
logging.debug("No IPv4 address assigned to: " + host)
try:
ipv6 = [str(ip) for ip in dns.resolver.query(host, 'AAAA')]
except Exception:
logging.debug("No IPv6 address assigned to: " + host)
self._cache_set(url, (ipv4, ipv6), 'dns')
return ipv4, ipv6
def phish_query(self, url, key, query):
cached = self._cache_get(query, 'phishtank')
if cached is not None:
return cached
postfields = {'url': quote(query), 'format': 'json', 'app_key': key}
response = requests.post(url, data=postfields)
res = response.json()
if res["meta"]["status"] == "success":
if res["results"]["in_database"]:
self._cache_set(query, res["results"]["phish_detail_page"], 'phishtank')
return res["results"]["phish_detail_page"]
else:
# no information
pass
elif res["meta"]["status"] == 'error':
# Inform the user?
# errormsg = res["errortext"]
pass
return None
def sphinxsearch(server, port, url, query):
# WARNING: too dangerous to have on the public interface
return ''
"""
if not sphinx:
return None
cached = _cache_get(query, 'sphinx')
if cached is not None:
return cached
client = sphinxapi.SphinxClient()
client.SetServer(server, port)
client.SetMatchMode(2)
client.SetConnectTimeout(5.0)
result = []
res = client.Query(query)
if res.get("matches") is not None:
for ticket in res["matches"]:
ticket_id = ticket["id"]
ticket_link = url + str(ticket_id)
result.append(ticket_link)
_cache_set(query, result, 'sphinx')
return result
"""
def vt_query_url(self, url, url_up, key, query, upload=True):
cached = self._cache_get(query, 'vt')
if cached is not None:
return cached
parameters = {"resource": query, "apikey": key}
if upload:
parameters['scan'] = 1
response = requests.post(url, data=parameters)
if response.text is None or len(response.text) == 0:
return None
res = response.json()
msg = res["verbose_msg"]
link = res.get("permalink")
positives = res.get("positives")
total = res.get("total")
if positives is not None:
self._cache_set(query, (msg, link, positives, total), 'vt')
return msg, link, positives, total
def gsb_query(self, url, query):
cached = self._cache_get(query, 'gsb')
if cached is not None:
return cached
param = '1\n' + query
response = requests.post(url, data=param)
status = response.status_code
if status == 200:
self._cache_set(query, response.text, 'gsb')
return response.text
'''
def urlquery_query(url, key, query):
return None
cached = _cache_get(query, 'urlquery')
if cached is not None:
return cached
try:
urlquery.url = url
urlquery.key = key
response = urlquery.search(query)
except Exception:
return None
if response['_response_']['status'] == 'ok':
if response.get('reports') is not None:
total_alert_count = 0
for r in response['reports']:
total_alert_count += r['urlquery_alert_count']
total_alert_count += r['ids_alert_count']
total_alert_count += r['blacklist_alert_count']
_cache_set(query, total_alert_count, 'urlquery')
return total_alert_count
else:
return None
'''
def process_emails(self, emails, ignorelist, replacelist):
to_return = list(set(emails))
for mail in reversed(to_return):
for ignorelist_entry in ignorelist:
if re.search(ignorelist_entry, mail, re.I):
if mail in to_return:
to_return.remove(mail)
for k, v in list(replacelist.items()):
if re.search(k, mail, re.I):
if k in to_return:
to_return.remove(k)
to_return += v
return to_return
def whois(self, server, port, domain, ignorelist, replacelist):
cached = self._cache_get(domain, 'whois')
if cached is not None:
return cached
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(15)
try:
s.connect((server, port))
except Exception:
print("Connection problems - check WHOIS server")
print(("WHOIS request while problem occurred: ", domain))
print(("WHOIS server: {}:{}".format(server, port)))
return None
if domain.startswith('http'):
self.fex.decode(domain)
d = self.fex.get_domain().lower()
else:
d = domain
s.send(("{}\r\n".format(d)).encode())
response = b''
while True:
d = s.recv(4096)
response += d
if d == b'':
break
s.close()
match = re.findall(r'[\w\.-]+@[\w\.-]+', response.decode())
emails = self.process_emails(match, ignorelist, replacelist)
if len(emails) == 0:
return None
list_mail = list(set(emails))
self._cache_set(domain, list_mail, 'whois')
return list_mail
def pdnscircl(self, url, user, passwd, q):
cached = self._cache_get(q, 'pdns')
if cached is not None:
return cached
pdns = PyPDNS(url, basic_auth=(user, passwd))
response = pdns.query(q)
all_uniq = []
for e in reversed(response):
host = e['rrname'].lower()
if host in all_uniq:
continue
else:
all_uniq.append(host)
response = (len(all_uniq), all_uniq[:5])
self._cache_set(q, response, 'pdns')
return response
def psslcircl(self, url, user, passwd, q):
cached = self._cache_get(q, 'pssl')
if cached is not None:
return cached
pssl = PyPSSL(url, basic_auth=(user, passwd))
response = pssl.query(q)
if response.get(q) is not None:
certinfo = response.get(q)
entries = {}
for sha1 in certinfo['certificates']:
entries[sha1] = []
if certinfo['subjects'].get(sha1):
for value in certinfo['subjects'][sha1]['values']:
entries[sha1].append(value)
self._cache_set(q, entries, 'pssl')
return entries
return None
def eupi(self, url, key, q):
cached = self._cache_get(q, 'eupi')
if cached is not None:
return cached
eu = PyEUPI(key, url)
response = eu.search_url(url=q)
if response.get('results'):
r = response.get('results')[0]['tag_label']
self._cache_set(q, r, 'eupi')
return r
eu.post_submission(q)
return None
def bgpranking(self, ip):
cached = self._cache_get(ip, 'ipasn')
if cached is not None:
asn = cached['asn']
prefix = cached['prefix']
else:
ipasn = IPASNHistory()
response = ipasn.query(ip)
if 'response' not in response:
asn = None
prefix = None
entry = response['response'][list(response['response'].keys())[0]]
if entry:
self._cache_set(ip, entry, 'ipasn')
asn = entry['asn']
prefix = entry['prefix']
else:
asn = None
prefix = None
if not asn or not prefix:
# asn, prefix, asn_descr, rank, position, known_asns
return None, None, None, None, None, None
cached = self._cache_get(asn, 'bgp')
if cached is not None:
return cached
bgpranking = BGPRanking()
response = bgpranking.query(asn, date=(date.today() - timedelta(1)).isoformat())
if 'response' not in response or not response['response']:
return None, None, None, None, None, None
to_return = (asn, prefix, response['response']['asn_description'], response['response']['ranking']['rank'],
response['response']['ranking']['position'], response['response']['ranking']['total_known_asns'])
self._cache_set(asn, to_return, 'bgp')
return to_return
def _deserialize_cached(self, entry):
to_return = {}
h = self.cache.hgetall(entry)
for key, value in list(h.items()):
to_return[key] = json.loads(value)
return to_return
def get_url_data(self, url):
data = self._deserialize_cached(url)
if data.get('dns') is not None:
ipv4, ipv6 = data['dns']
ip_data = {}
if ipv4 is not None:
for ip in ipv4:
ip_data[ip] = self._deserialize_cached(ip)
if ipv6 is not None:
for ip in ipv6:
ip_data[ip] = self._deserialize_cached(ip)
if len(ip_data) > 0:
data.update(ip_data)
return {url: data}
def cached(self, url):
url_data = self.get_url_data(url)
to_return = [url_data]
if url_data[url].get('list') is not None:
url_redirs = url_data[url]['list']
for u in url_redirs:
if u == url:
continue
to_return.append(self.get_url_data(u))
return to_return

View File

@ -1,6 +1,7 @@
import json import json
import os import os
from pathlib import Path from pathlib import Path
import uuid
from flask import Flask, render_template, request, Response, redirect, url_for from flask import Flask, render_template, request, Response, redirect, url_for
from flask_mail import Mail, Message from flask_mail import Mail, Message
@ -14,17 +15,13 @@ import logging
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
from logging import Formatter from logging import Formatter
from rq import Queue
from rq.job import Job
from redis import Redis from redis import Redis
from urlabuse.helpers import get_socket_path from urlabuse.helpers import get_socket_path
from urlabuse.urlabuse import Query
import configparser import configparser
from .proxied import ReverseProxied from .proxied import ReverseProxied
from urlabuse.urlabuse import is_valid_url, url_list, dns_resolve, phish_query, psslcircl, \
vt_query_url, gsb_query, sphinxsearch, whois, pdnscircl, bgpranking, \
cached, get_mail_sent, set_mail_sent, get_submissions, eupi
config_dir = Path('config') config_dir = Path('config')
@ -73,7 +70,9 @@ def create_app(configfile=None):
app.logger.addHandler(handler) app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO) app.logger.setLevel(logging.INFO)
Bootstrap(app) Bootstrap(app)
q = Queue(connection=Redis(unix_socket_path=get_socket_path('cache'))) queue = Redis(unix_socket_path=get_socket_path('cache'), db=0,
decode_responses=True)
urlabuse_query = Query()
# Mail Config # Mail Config
app.config['MAIL_SERVER'] = 'localhost' app.config['MAIL_SERVER'] = 'localhost'
@ -102,6 +101,9 @@ def create_app(configfile=None):
@app.route('/', methods=['GET', 'POST']) @app.route('/', methods=['GET', 'POST'])
def index(): def index():
if request.method == 'HEAD':
# Just returns ack if the webserver is running
return 'Ack'
form = URLForm() form = URLForm()
return render_template('index.html', form=form) return render_template('index.html', form=form)
@ -143,38 +145,41 @@ def create_app(configfile=None):
@app.route("/_result/<job_key>", methods=['GET']) @app.route("/_result/<job_key>", methods=['GET'])
def check_valid(job_key): def check_valid(job_key):
if job_key is None: if not job_key or not queue.exists(job_key):
return json.dumps(None), 200 return json.dumps(None), 200
job = Job.fetch(job_key, connection=Redis(unix_socket_path=get_socket_path('cache'))) if not queue.hexists(job_key, 'result'):
if job.is_finished:
return json.dumps(job.result), 200
else:
return json.dumps("Nay!"), 202 return json.dumps("Nay!"), 202
result = queue.hget(job_key, 'result')
queue.delete(job_key)
return Response(result, mimetype='application/json'), 200
def enqueue(method, data):
job_id = str(uuid.uuid4())
p = queue.pipeline()
p.hmset(job_id, {'method': method, 'data': json.dumps(data)})
p.sadd('to_process', job_id)
p.execute()
return job_id
@app.route('/start', methods=['POST']) @app.route('/start', methods=['POST'])
def run_query(): def run_query():
data = json.loads(request.data.decode()) data = request.get_json(force=True)
url = data["url"] url = data["url"]
ip = _get_user_ip(request) ip = _get_user_ip(request)
app.logger.info('{} {}'.format(ip, url)) app.logger.info(f'{ip} {url}')
if get_submissions(url) and get_submissions(url) >= autosend_threshold: if urlabuse_query.get_submissions(url) and urlabuse_query.get_submissions(url) >= autosend_threshold:
send(url, '', True) send(url, '', True)
is_valid = q.enqueue_call(func=is_valid_url, args=(url,), result_ttl=500) return enqueue('is_valid_url', {'url': url})
return is_valid.get_id()
@app.route('/urls', methods=['POST']) @app.route('/urls', methods=['POST'])
def urls(): def urls():
data = json.loads(request.data.decode()) data = request.get_json(force=True)
url = data["url"] return enqueue('url_list', {'url': data["url"]})
u = q.enqueue_call(func=url_list, args=(url,), result_ttl=500)
return u.get_id()
@app.route('/resolve', methods=['POST']) @app.route('/resolve', methods=['POST'])
def resolve(): def resolve():
data = json.loads(request.data.decode()) data = request.get_json(force=True)
url = data["url"] return enqueue('dns_resolve', {'url': data["url"]})
u = q.enqueue_call(func=dns_resolve, args=(url,), result_ttl=500)
return u.get_id()
def read_auth(name): def read_auth(name):
key = config_dir / f'{name}.key' key = config_dir / f'{name}.key'
@ -191,25 +196,19 @@ def create_app(configfile=None):
auth = read_auth('phishtank') auth = read_auth('phishtank')
if not auth: if not auth:
return '' return ''
key = auth[0] data = request.get_json(force=True)
data = json.loads(request.data.decode()) return enqueue('phish_query', {'url': parser.get("PHISHTANK", "url"),
url = parser.get("PHISHTANK", "url") 'key': auth[0], 'query': data["query"]})
query = data["query"]
u = q.enqueue_call(func=phish_query, args=(url, key, query,), result_ttl=500)
return u.get_id()
@app.route('/virustotal_report', methods=['POST']) @app.route('/virustotal_report', methods=['POST'])
def vt(): def vt():
auth = read_auth('virustotal') auth = read_auth('virustotal')
if not auth: if not auth:
return '' return ''
key = auth[0] data = request.get_json(force=True)
data = json.loads(request.data.decode()) return enqueue('vt_query_url', {'url': parser.get("VIRUSTOTAL", "url_report"),
url = parser.get("VIRUSTOTAL", "url_report") 'url_up': parser.get("VIRUSTOTAL", "url_upload"),
url_up = parser.get("VIRUSTOTAL", "url_upload") 'key': auth[0], 'query': data["query"]})
query = data["query"]
u = q.enqueue_call(func=vt_query_url, args=(url, url_up, key, query,), result_ttl=500)
return u.get_id()
@app.route('/googlesafebrowsing', methods=['POST']) @app.route('/googlesafebrowsing', methods=['POST'])
def gsb(): def gsb():
@ -217,12 +216,10 @@ def create_app(configfile=None):
if not auth: if not auth:
return '' return ''
key = auth[0] key = auth[0]
data = json.loads(request.data.decode()) data = request.get_json(force=True)
url = parser.get("GOOGLESAFEBROWSING", "url") url = parser.get("GOOGLESAFEBROWSING", "url").format(key)
url = url.format(key) return enqueue('gsb_query', {'url': url,
query = data["query"] 'query': data["query"]})
u = q.enqueue_call(func=gsb_query, args=(url, query,), result_ttl=500)
return u.get_id()
''' '''
@app.route('/urlquery', methods=['POST']) @app.route('/urlquery', methods=['POST'])
@ -236,7 +233,6 @@ def create_app(configfile=None):
query = data["query"] query = data["query"]
u = q.enqueue_call(func=urlquery_query, args=(url, key, query,), result_ttl=500) u = q.enqueue_call(func=urlquery_query, args=(url, key, query,), result_ttl=500)
return u.get_id() return u.get_id()
'''
@app.route('/ticket', methods=['POST']) @app.route('/ticket', methods=['POST'])
def ticket(): def ticket():
@ -250,30 +246,24 @@ def create_app(configfile=None):
u = q.enqueue_call(func=sphinxsearch, args=(server, port, url, query,), u = q.enqueue_call(func=sphinxsearch, args=(server, port, url, query,),
result_ttl=500) result_ttl=500)
return u.get_id() return u.get_id()
'''
@app.route('/whois', methods=['POST']) @app.route('/whois', methods=['POST'])
def whoismail(): def whoismail():
# if not request.authorization: data = request.get_json(force=True)
# return '' return enqueue('whois', {'server': parser.get("WHOIS", "server"),
server = parser.get("WHOIS", "server") 'port': parser.getint("WHOIS", "port"),
port = parser.getint("WHOIS", "port") 'domain': data["query"],
data = json.loads(request.data.decode()) 'ignorelist': ignorelist, 'replacelist': replacelist})
query = data["query"]
u = q.enqueue_call(func=whois, args=(server, port, query, ignorelist, replacelist),
result_ttl=500)
return u.get_id()
@app.route('/eupi', methods=['POST']) @app.route('/eupi', methods=['POST'])
def eu(): def eu():
auth = read_auth('eupi') auth = read_auth('eupi')
if not auth: if not auth:
return '' return ''
key = auth[0] data = request.get_json(force=True)
data = json.loads(request.data.decode()) return enqueue('eupi', {'url': parser.get("EUPI", "url"),
url = parser.get("EUPI", "url") 'key': auth[0], 'query': data["query"]})
query = data["query"]
u = q.enqueue_call(func=eupi, args=(url, key, query,), result_ttl=500)
return u.get_id()
@app.route('/pdnscircl', methods=['POST']) @app.route('/pdnscircl', methods=['POST'])
def dnscircl(): def dnscircl():
@ -282,18 +272,14 @@ def create_app(configfile=None):
return '' return ''
user, password = auth user, password = auth
url = parser.get("PDNS_CIRCL", "url") url = parser.get("PDNS_CIRCL", "url")
data = json.loads(request.data.decode()) data = request.get_json(force=True)
query = data["query"] return enqueue('pdnscircl', {'url': url, 'user': user.strip(),
u = q.enqueue_call(func=pdnscircl, args=(url, user.strip(), password.strip(), 'passwd': password.strip(), 'q': data["query"]})
query,), result_ttl=500)
return u.get_id()
@app.route('/bgpranking', methods=['POST']) @app.route('/bgpranking', methods=['POST'])
def bgpr(): def bgpr():
data = json.loads(request.data.decode()) data = request.get_json(force=True)
query = data["query"] return enqueue('bgpranking', {'ip': data["query"]})
u = q.enqueue_call(func=bgpranking, args=(query,), result_ttl=500)
return u.get_id()
@app.route('/psslcircl', methods=['POST']) @app.route('/psslcircl', methods=['POST'])
def sslcircl(): def sslcircl():
@ -301,19 +287,16 @@ def create_app(configfile=None):
if not auth: if not auth:
return '' return ''
user, password = auth user, password = auth
url = parser.get("PDNS_CIRCL", "url")
url = parser.get("PSSL_CIRCL", "url") url = parser.get("PSSL_CIRCL", "url")
data = json.loads(request.data.decode()) data = request.get_json(force=True)
query = data["query"] return enqueue('psslcircl', {'url': url, 'user': user.strip(),
u = q.enqueue_call(func=psslcircl, args=(url, user.strip(), password.strip(), 'passwd': password.strip(), 'q': data["query"]})
query,), result_ttl=500)
return u.get_id()
@app.route('/get_cache', methods=['POST']) @app.route('/get_cache', methods=['POST'])
def get_cache(): def get_cache():
data = json.loads(request.data.decode()) data = json.loads(request.data.decode())
url = data["query"] url = data["query"]
data = cached(url) data = urlabuse_query.cached(url)
dumped = json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')) dumped = json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '))
return dumped return dumped
@ -356,9 +339,10 @@ def create_app(configfile=None):
return to_return return to_return
def send(url, ip='', autosend=False): def send(url, ip='', autosend=False):
if not get_mail_sent(url): return
set_mail_sent(url) if not urlabuse_query.get_mail_sent(url):
data = cached(url) urlabuse_query.set_mail_sent(url)
data = urlabuse_query.cached(url)
if not autosend: if not autosend:
subject = 'URL Abuse report from ' + ip subject = 'URL Abuse report from ' + ip
else: else:
@ -373,7 +357,7 @@ def create_app(configfile=None):
def send_mail(): def send_mail():
data = json.loads(request.data.decode()) data = json.loads(request.data.decode())
url = data["url"] url = data["url"]
if not get_mail_sent(url): if not urlabuse_query.get_mail_sent(url):
ip = _get_user_ip(request) ip = _get_user_ip(request)
send(url, ip) send(url, ip)
return redirect(url_for('index')) return redirect(url_for('index'))