lookyloo/lookyloo/modules/uwhois.py

111 lines
4.3 KiB
Python
Raw Normal View History

2021-09-16 11:22:02 +02:00
#!/usr/bin/env python3
2024-01-12 17:15:41 +01:00
from __future__ import annotations
import re
2021-09-16 11:22:02 +02:00
import socket
2024-01-13 01:24:32 +01:00
from typing import overload, Literal
2021-09-16 11:22:02 +02:00
2024-02-05 16:33:46 +01:00
from har2tree import CrawledTree, Har2TreeError, HostNode
2021-09-16 11:22:02 +02:00
from .abstractmodule import AbstractModule
2021-09-16 11:22:02 +02:00
class UniversalWhois(AbstractModule):
2021-09-16 11:22:02 +02:00
def module_init(self) -> bool:
if not self.config.get('enabled'):
self.logger.info('Not enabled.')
return False
self.server = self.config.get('ipaddress')
self.port = self.config.get('port')
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
2021-09-16 11:22:02 +02:00
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.server, self.port))
except Exception as e:
self.logger.warning(f'Unable to connect to uwhois ({self.server}:{self.port}): {e}')
return False
return True
2021-09-16 11:22:02 +02:00
def query_whois_hostnode(self, hostnode: HostNode) -> None:
if hasattr(hostnode, 'resolved_ips'):
ip: str
if 'v4' in hostnode.resolved_ips and 'v6' in hostnode.resolved_ips:
_all_ips = set(hostnode.resolved_ips['v4']) | set(hostnode.resolved_ips['v6'])
else:
# old format
_all_ips = hostnode.resolved_ips
for ip in _all_ips:
2024-04-08 20:41:23 +02:00
self.whois(ip, contact_email_only=False)
2021-09-16 11:22:02 +02:00
if hasattr(hostnode, 'cnames'):
cname: str
2021-09-16 11:22:02 +02:00
for cname in hostnode.cnames:
2024-04-08 20:41:23 +02:00
self.whois(cname, contact_email_only=False)
self.whois(hostnode.name, contact_email_only=False)
2021-09-16 11:22:02 +02:00
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None:
'''Run the module on all the nodes up to the final redirect'''
if not self.available:
return None
if auto_trigger and not self.allow_auto_trigger:
return None
try:
hostnode = crawled_tree.root_hartree.get_host_node_by_uuid(crawled_tree.root_hartree.rendered_node.hostnode_uuid)
except Har2TreeError as e:
self.logger.warning(e)
else:
self.query_whois_hostnode(hostnode)
for n in hostnode.get_ancestors():
self.query_whois_hostnode(n)
@overload
2024-01-12 17:15:41 +01:00
def whois(self, query: str, contact_email_only: Literal[True]) -> list[str]:
...
@overload
def whois(self, query: str, contact_email_only: Literal[False]) -> str:
...
@overload
2024-04-08 20:41:23 +02:00
def whois(self, query: str, contact_email_only: bool) -> str | list[str]:
...
2024-01-12 17:15:41 +01:00
def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]:
2021-09-16 11:22:02 +02:00
if not self.available:
return ''
2024-04-09 16:42:31 +02:00
2021-09-16 11:22:02 +02:00
bytes_whois = b''
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.server, self.port))
2022-03-31 11:30:53 +02:00
sock.sendall(f'{query}\n'.encode())
2021-09-16 11:22:02 +02:00
while True:
data = sock.recv(2048)
if not data:
break
bytes_whois += data
2024-04-08 20:41:23 +02:00
# if an abuse-c-Object is found in the whois entry, it will take precedence
2024-04-04 14:22:19 +02:00
abuse_c = re.search(rb'abuse-c:\s+(.*)\s', bytes_whois)
2024-04-09 10:18:16 +02:00
if abuse_c and abuse_c.lastindex: # make sure we have a match and avoid exception on None or missing group 1
2024-04-08 20:41:23 +02:00
# The whois entry has an abuse-c object
_obj_name: str = abuse_c.group(1).decode()
abuse_c_query = self.whois(_obj_name, contact_email_only)
# The object exists
if abuse_c_query and contact_email_only:
# The object exists and we only want the email(s), the response is a list of emails
return abuse_c_query
2024-04-08 20:41:23 +02:00
elif abuse_c_query:
# The object exists and we want the full whois entry, contatenate with a new line.
# contact_email_only is False, so the response is a string, ignore the typing warning accordingy
return '\n'.join([bytes_whois.decode(), abuse_c_query]) # type: ignore[list-item]
# We either dont have an abuse-c object or it does not exist
if not contact_email_only:
return bytes_whois.decode()
2024-04-09 16:42:31 +02:00
emails = list(set(re.findall(rb'[\w\.-]+@[\w\.-]+', bytes_whois)))
return [e.decode() for e in sorted(emails)]