Refactor more generators

pull/154/head
Kevin Holvoet 2020-07-21 13:42:50 +02:00
parent 623ccd6c44
commit 610292e90f
13 changed files with 12434 additions and 9584 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,19 @@
{
"description": "Wikimedia address ranges (http://noc.wikimedia.org/conf/reverse-proxy.php.txt)",
"list": [
"10.128.0.0/24",
"10.132.0.0/24",
"10.192.0.0/22",
"10.192.16.0/22",
"10.192.32.0/22",
"10.192.48.0/22",
"10.20.0.0/24",
"10.64.0.0/22",
"10.64.16.0/22",
"10.64.32.0/22",
"10.64.48.0/22",
"2001:df2:e500:101::/64",
"202.63.61.242",
"208.80.153.0/27",
"208.80.153.32/27",
"208.80.153.64/27",
@ -9,15 +22,28 @@
"208.80.154.128/26",
"208.80.154.64/26",
"208.80.155.96/27",
"217.94.171.96",
"2620:0:860:101::/64",
"2620:0:860:102::/64",
"2620:0:860:103::/64",
"2620:0:860:104::/64",
"2620:0:860:1::/64",
"2620:0:860:2::/64",
"2620:0:860:3::/64",
"2620:0:860:4::/64",
"2620:0:861:101::/64",
"2620:0:861:102::/64",
"2620:0:861:103::/64",
"2620:0:861:107::/64",
"2620:0:861:1::/64",
"2620:0:861:2::/64",
"2620:0:861:3::/64",
"2620:0:861:4::/64",
"2620:0:862:102::/64",
"2620:0:862:1::/64",
"2620:0:863:101::/64",
"62.214.230.86",
"68.124.59.186",
"91.198.174.0/25"
],
"matching_attributes": [
@ -27,5 +53,5 @@
],
"name": "List of known Wikimedia address ranges",
"type": "cidr",
"version": 20190912
"version": 20200721
}

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import zipfile
from generator import download_to_file, get_version, write_to_file

View File

@ -3,23 +3,16 @@
import requests
from generator import get_version, write_to_file
from generator import process_stream, get_version, write_to_file
def process(url, dst):
r = requests.get(url, stream=True)
domains = []
for ip in r.iter_lines():
v = ip.decode('utf-8')
if not v.startswith("#"):
if v:
domains.append(v)
warninglist = {
'name': 'List of disposable email domains',
'version': get_version(),
'description': 'List of disposable email domains',
'list': domains,
'list': process_stream(url),
'type': 'substring',
'matching_attributes': ["email-src", "email-dst", "whois-registrant-email", "domain|ip", "dns-soa-email"]
}

View File

@ -3,7 +3,6 @@
import csv
import ipaddress
import json
import logging
from generator import download_to_file, get_version, write_to_file
@ -46,7 +45,7 @@ def process(file):
def generate(data_list, warninglist, dst):
warninglist['version'] = get_version()
warninglist['list'] = sorted(set(data_list))
warninglist['list'] = data_list
write_to_file(warninglist, dst)

View File

@ -5,22 +5,21 @@ from generator import download, get_version, write_to_file
def process(url, dst):
r = download(url)
tlds = []
for tld in r.text.splitlines():
if tld.startswith('#'):
continue
tlds.append(tld)
warninglist = {
'name': 'TLDs as known by IANA',
'version': get_version(),
'description': 'Event contains one or more TLDs as attribute with an IDS flag set',
'list': tlds,
'list': [],
'matching_attributes': ["hostname", "domain", "domain|ip"],
'type': 'string'
}
r = download(url)
for tld in r.text.splitlines():
if tld.startswith('#'):
continue
warninglist['list'].append(tld)
write_to_file(warninglist, dst)

View File

@ -1,49 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import json
import datetime
url = 'https://raw.githubusercontent.com/ejrv/VPNs/master/vpn-ipv4.txt'
r = requests.get(url, stream=True)
ipsv4 = []
for ip in r.iter_lines():
v = ip.decode('utf-8')
if not v.startswith("#"):
if v: ipsv4.append(v)
warninglist = {}
warninglist['name'] = 'Specialized list of IPv4 addresses belonging to common VPN providers and datacenters'
warninglist['version'] = int(datetime.date.today().strftime('%Y%m%d'))
warninglist['description'] = 'Specialized list of IPv4 addresses belonging to common VPN providers and datacenters'
warninglist['list'] = sorted(set(ipsv4))
warninglist['type'] = 'cidr'
warninglist['matching_attributes'] = ["ip-src", "ip-dst", "domain|ip"]
from generator import process_stream, get_version, write_to_file
with open('../lists/vpn-ipv4/list.json', 'w') as data_file:
json.dump(warninglist, data_file, indent=4, sort_keys=True)
def process(url, dst):
warninglist = {
'name': 'Specialized list of {} addresses belonging to common VPN providers and datacenters'.format(dst.split('-')[1].replace('ip', 'IP')),
'version': get_version(),
'description': 'Specialized list of {} addresses belonging to common VPN providers and datacenters'.format(dst.split('-')[1].replace('ip', 'IP')),
'list': process_stream(url),
'type': 'cidr',
'matching_attributes': ["ip-src", "ip-dst", "domain|ip"]
}
write_to_file(warninglist, dst)
url = 'https://raw.githubusercontent.com/ejrv/VPNs/master/vpn-ipv6.txt'
r = requests.get(url, stream=True)
ipsv6 = []
for ip in r.iter_lines():
v = ip.decode('utf-8')
if not v.startswith("#"):
if v: ipsv6.append(v)
warninglist = {}
warninglist['name'] = 'Specialized list of IPv6 addresses belonging to common VPN providers and datacenters'
warninglist['version'] = int(datetime.date.today().strftime('%Y%m%d'))
warninglist['description'] = 'Specialized list of IPv6 addresses belonging to common VPN providers and datacenters'
warninglist['list'] = sorted(set(ipsv6))
warninglist['type'] = 'cidr'
warninglist['matching_attributes'] = ["ip-src", "ip-dst", "domain|ip"]
with open('../lists/vpn-ipv6/list.json', 'w') as data_file:
json.dump(warninglist, data_file, indent=4, sort_keys=True)
if __name__ == '__main__':
vpn_base_url = 'https://raw.githubusercontent.com/ejrv/VPNs/master/'
uri_list = ['vpn-ipv4', 'vpn-ipv6']
for uri in uri_list:
url = vpn_base_url + uri + '.txt'
uri.split('-')[1].replace('ip', 'IP')
process(url, uri)

View File

@ -1,34 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import datetime
import urllib.request
import codecs
import ipaddress
import re
import ipaddress
res = urllib.request.urlopen('http://noc.wikimedia.org/conf/reverse-proxy.php.txt')
from generator import download, get_version, write_to_file
res_body = res.read()
decoded = res_body.decode("unicode_escape", "utf-8")
l = []
for line in decoded.split('\n'):
if re.search("public", line):
matched = re.findall(r'\'(.*?)\'', line)
if matched:
try:
ipaddress.ip_network(matched[0])
l.append(matched[0])
except ValueError:
pass
warninglist = {}
warninglist['name'] = 'List of known Wikimedia address ranges'
warninglist['version'] = int(datetime.date.today().strftime('%Y%m%d'))
warninglist['description'] = 'Wikimedia address ranges (http://noc.wikimedia.org/conf/reverse-proxy.php.txt)'
warninglist['type'] = 'cidr'
warninglist['list'] = sorted(set(l))
warninglist['matching_attributes'] = ["ip-src", "ip-dst", "domain|ip"]
def process(url, dst):
warninglist = {
'name': 'List of known Wikimedia address ranges',
'version': get_version(),
'description': 'Wikimedia address ranges (http://noc.wikimedia.org/conf/reverse-proxy.php.txt)',
'type': 'cidr',
'list': [],
'matching_attributes': ["ip-src", "ip-dst", "domain|ip"]
}
print(json.dumps(warninglist))
matched = re.findall(
r'\'(.*?)\'', codecs.decode(download(url).content, 'UTF-8'))
for ip in matched:
try:
ipaddress.ip_network(ip)
warninglist['list'].append(ip)
except ValueError:
pass
write_to_file(warninglist, dst)
if __name__ == '__main__':
wikimedia_url = 'http://noc.wikimedia.org/conf/reverse-proxy.php.txt'
wikimedia_dst = 'wikimedia'
process(wikimedia_url, wikimedia_dst)

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import zipfile
from generator import download_to_file, get_version, write_to_file

View File

@ -1,8 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from generator import download_to_file, get_version, write_to_file

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import zipfile
from generator import download_to_file, get_version, write_to_file

View File

@ -5,26 +5,47 @@ import datetime
import json
from inspect import currentframe, getframeinfo
from os import path
import logging
import requests
from dateutil.parser import parse as parsedate
def download_to_file(url, file):
user_agent = {
"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"}
r = requests.head(url, headers=user_agent)
url_datetime = parsedate(r.headers['Last-Modified']).astimezone()
file_datetime = datetime.datetime.fromtimestamp(
path.getmtime(file)).astimezone()
user_agent = {"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"}
try:
r = requests.head(url, headers=user_agent)
url_datetime = parsedate(r.headers['Last-Modified']).astimezone()
file_datetime = datetime.datetime.fromtimestamp(
path.getmtime(file)).astimezone()
if(url_datetime > file_datetime):
r = requests.get(url, headers=user_agent)
with open(file, 'wb') as fd:
for chunk in r.iter_content(4096):
fd.write(chunk)
if(url_datetime > file_datetime):
actual_download_to_file(url, file, user_agent)
except KeyError as ex:
logging.warning(str(ex))
actual_download_to_file(url, file, user_agent)
def actual_download_to_file(url, file, user_agent):
r = requests.get(url, headers=user_agent)
with open(file, 'wb') as fd:
for chunk in r.iter_content(4096):
fd.write(chunk)
def process_stream(url):
r = requests.get(url, stream=True)
data_list = []
for line in r.iter_lines():
v = line.decode('utf-8')
if not v.startswith("#"):
if v:
data_list.append(v)
return data_list
def download(url):
user_agent = {
"User-agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:46.0) Gecko/20100101 Firefox/46.0"}