mirror of https://github.com/CIRCL/lookyloo
new: Cloudflare lookup feature to flag IPs appropriately
parent
584eb5e87d
commit
ecb4623b86
|
@ -26,6 +26,7 @@ from .helpers import get_captures_dir
|
|||
from .indexing import Indexing
|
||||
from .default import LookylooException, try_make_file, get_config
|
||||
from .exceptions import MissingCaptureDirectory, NoValidHarFile, MissingUUID, TreeNeedsRebuild
|
||||
from .modules import Cloudflare
|
||||
|
||||
|
||||
class CaptureCache():
|
||||
|
@ -133,6 +134,13 @@ class CapturesIndex(Mapping):
|
|||
# Unable to setup IPASN History
|
||||
self.logger.warning(f'Unable to setup IPASN History: {e}')
|
||||
self.ipasnhistory = None
|
||||
try:
|
||||
self.cloudflare: Optional[Cloudflare] = Cloudflare()
|
||||
if not self.cloudflare.available:
|
||||
self.cloudflare = None
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Unable to setup Cloudflare: {e}')
|
||||
self.cloudflare = None
|
||||
|
||||
@property
|
||||
def cached_captures(self) -> Set[str]:
|
||||
|
@ -474,6 +482,9 @@ class CapturesIndex(Mapping):
|
|||
elif node.name in host_ips:
|
||||
node.add_feature('resolved_ips', host_ips[node.name])
|
||||
|
||||
if self.cloudflare:
|
||||
cflare_hits = self.cloudflare.ips_lookup(_all_ips)
|
||||
|
||||
if self.ipasnhistory:
|
||||
# Throw all the IPs to IPASN History for query later.
|
||||
if ips := [{'ip': ip} for ip in _all_ips]:
|
||||
|
@ -490,23 +501,29 @@ class CapturesIndex(Mapping):
|
|||
r = list(response['response'].values())[0]
|
||||
if ip not in ipasn and r:
|
||||
ipasn[ip] = r
|
||||
if ipasn:
|
||||
# retraverse tree to populate it with the features
|
||||
for node in ct.root_hartree.hostname_tree.traverse():
|
||||
if not hasattr(node, 'resolved_ips'):
|
||||
continue
|
||||
ipasn_entries = {}
|
||||
if 'v4' in node.resolved_ips and 'v6' in node.resolved_ips:
|
||||
_all_ips = node.resolved_ips['v4'] | node.resolved_ips['v6']
|
||||
else:
|
||||
# old format
|
||||
_all_ips = node.resolved_ips
|
||||
for ip in _all_ips:
|
||||
if ip not in ipasn:
|
||||
continue
|
||||
|
||||
if ipasn or cflare_hits:
|
||||
# retraverse tree to populate it with the features
|
||||
for node in ct.root_hartree.hostname_tree.traverse():
|
||||
if not hasattr(node, 'resolved_ips'):
|
||||
continue
|
||||
ipasn_entries = {}
|
||||
cflare_entries = {}
|
||||
if 'v4' in node.resolved_ips and 'v6' in node.resolved_ips:
|
||||
_all_ips = set(node.resolved_ips['v4']) | set(node.resolved_ips['v6'])
|
||||
else:
|
||||
# old format
|
||||
_all_ips = node.resolved_ips
|
||||
for ip in _all_ips:
|
||||
if ip in ipasn:
|
||||
ipasn_entries[ip] = ipasn[ip]
|
||||
if ipasn_entries:
|
||||
node.add_feature('ipasn', ipasn_entries)
|
||||
if ip in cflare_hits:
|
||||
cflare_entries[ip] = True
|
||||
|
||||
if ipasn_entries:
|
||||
node.add_feature('ipasn', ipasn_entries)
|
||||
if cflare_entries:
|
||||
node.add_feature('cloudflare', cflare_entries)
|
||||
|
||||
with cnames_path.open('w') as f:
|
||||
json.dump(host_cnames, f)
|
||||
|
|
|
@ -12,3 +12,4 @@ from .phishtank import Phishtank # noqa
|
|||
from .hashlookup import HashlookupModule as Hashlookup # noqa
|
||||
from .riskiq import RiskIQ, RiskIQError # noqa
|
||||
from .urlhaus import URLhaus # noqa
|
||||
from .cloudflare import Cloudflare # noqa
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import ipaddress
|
||||
import logging
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
|
||||
from har2tree import CrawledTree
|
||||
|
||||
from ..default import ConfigError, get_config
|
||||
|
||||
|
||||
class Cloudflare():
|
||||
'''This module checks if an IP is announced by Cloudflare.'''
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(f'{self.__class__.__name__}')
|
||||
self.logger.setLevel(get_config('generic', 'loglevel'))
|
||||
|
||||
# Get IPv4
|
||||
r = requests.get('https://www.cloudflare.com/ips-v4')
|
||||
try:
|
||||
r.raise_for_status()
|
||||
ipv4_list = r.text
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Unable to get Cloudflare IPv4 list: {e}')
|
||||
self.available = False
|
||||
return
|
||||
# Get IPv6
|
||||
try:
|
||||
r = requests.get('https://www.cloudflare.com/ips-v6')
|
||||
ipv6_list = r.text
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Unable to get Cloudflare IPv6 list: {e}')
|
||||
self.available = False
|
||||
return
|
||||
|
||||
self.available = True
|
||||
|
||||
self.v4_list = [ipaddress.ip_network(net) for net in ipv4_list.split('\n')]
|
||||
self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')]
|
||||
|
||||
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict:
|
||||
'''Run the module on all the nodes up to the final redirect'''
|
||||
if not self.available:
|
||||
return {'error': 'Module not available'}
|
||||
if auto_trigger and not self.allow_auto_trigger:
|
||||
return {'error': 'Auto trigger not allowed on module'}
|
||||
|
||||
# TODO: trigger something?
|
||||
return {'success': 'Module triggered'}
|
||||
|
||||
def ips_lookup(self, ips: List[str]) -> Dict[str, bool]:
|
||||
'''Lookup a list of IPs. True means it is a known Cloudflare IP'''
|
||||
if not self.available:
|
||||
raise ConfigError('Hashlookup not available, probably not enabled.')
|
||||
|
||||
to_return: Dict[str, bool] = {}
|
||||
for ip_s, ip_p in [(ip, ipaddress.ip_address(ip)) for ip in ips]:
|
||||
if ip_p.version == 4:
|
||||
to_return[ip_s] = any(ip_p in net for net in self.v4_list)
|
||||
else:
|
||||
to_return[ip_s] = any(ip_p in net for net in self.v6_list)
|
||||
return to_return
|
|
@ -124,12 +124,14 @@
|
|||
<li>
|
||||
{{ ip }}{% if uwhois_available %} (<a href="{{ url_for('whois', query=ip)}}">whois</a>){% endif %}
|
||||
{% if 'ipasn' in hostnode.features and hostnode.ipasn.get(ip) %}- AS{{ hostnode.ipasn[ip]['asn'] }} {% if uwhois_available %} (<a href="{{ url_for('whois', query='AS'+hostnode.ipasn[ip]['asn'])}}">whois</a>){% endif %}{% endif %}
|
||||
{% if 'cloudflare' in hostnode.features and hostnode.cloudflare.get(ip) %} - Known Cloudflare IP{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
{% for ip in hostnode.resolved_ips['v6'] %}
|
||||
<li>
|
||||
{{ ip }}{% if uwhois_available %} (<a href="{{ url_for('whois', query=ip)}}">whois</a>){% endif %}
|
||||
{% if 'ipasn' in hostnode.features and hostnode.ipasn.get(ip) %}- AS{{ hostnode.ipasn[ip]['asn'] }} {% if uwhois_available %} (<a href="{{ url_for('whois', query='AS'+hostnode.ipasn[ip]['asn'])}}">whois</a>){% endif %}{% endif %}
|
||||
{% if 'cloudflare' in hostnode.features and hostnode.cloudflare.get(ip) %} - Known Cloudflare IP{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
|
|
Loading…
Reference in New Issue