mirror of https://github.com/CIRCL/lookyloo
Merge branch 'AntoniaBK-abuse-c'
commit
45bf7ef7b1
|
@ -40,12 +40,12 @@ class UniversalWhois(AbstractModule):
|
||||||
# old format
|
# old format
|
||||||
_all_ips = hostnode.resolved_ips
|
_all_ips = hostnode.resolved_ips
|
||||||
for ip in _all_ips:
|
for ip in _all_ips:
|
||||||
self.whois(ip)
|
self.whois(ip, contact_email_only=False)
|
||||||
if hasattr(hostnode, 'cnames'):
|
if hasattr(hostnode, 'cnames'):
|
||||||
cname: str
|
cname: str
|
||||||
for cname in hostnode.cnames:
|
for cname in hostnode.cnames:
|
||||||
self.whois(cname)
|
self.whois(cname, contact_email_only=False)
|
||||||
self.whois(hostnode.name)
|
self.whois(hostnode.name, contact_email_only=False)
|
||||||
|
|
||||||
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None:
|
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None:
|
||||||
'''Run the module on all the nodes up to the final redirect'''
|
'''Run the module on all the nodes up to the final redirect'''
|
||||||
|
@ -72,10 +72,13 @@ class UniversalWhois(AbstractModule):
|
||||||
...
|
...
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]:
|
def whois(self, query: str, contact_email_only: bool) -> str | list[str]:
|
||||||
...
|
...
|
||||||
|
|
||||||
def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]:
|
def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]:
|
||||||
|
|
||||||
|
EMAIL_REGEX = rb'(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)'
|
||||||
|
|
||||||
if not self.available:
|
if not self.available:
|
||||||
return ''
|
return ''
|
||||||
bytes_whois = b''
|
bytes_whois = b''
|
||||||
|
@ -87,7 +90,23 @@ class UniversalWhois(AbstractModule):
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
bytes_whois += data
|
bytes_whois += data
|
||||||
|
|
||||||
|
# if an abuse-c-Object is found in the whois entry, it will take precedence
|
||||||
|
abuse_c = re.search(rb'abuse-c:\s+(.*)\s', bytes_whois)
|
||||||
|
if abuse_c and abuse_c.lastindex and abuse_c.lastindex > 0: # make sure we have a match and avoid exception on None or missing group 1
|
||||||
|
# The whois entry has an abuse-c object
|
||||||
|
_obj_name: str = abuse_c.group(1).decode()
|
||||||
|
abuse_c_query = self.whois(_obj_name, contact_email_only)
|
||||||
|
# The object exists
|
||||||
|
if abuse_c_query and contact_email_only:
|
||||||
|
# The object exists and we only want the email(s), the response is a list of emails
|
||||||
|
return abuse_c_query
|
||||||
|
elif abuse_c_query:
|
||||||
|
# The object exists and we want the full whois entry, contatenate with a new line.
|
||||||
|
# contact_email_only is False, so the response is a string, ignore the typing warning accordingy
|
||||||
|
return '\n'.join([bytes_whois.decode(), abuse_c_query]) # type: ignore[list-item]
|
||||||
|
# We either dont have an abuse-c object or it does not exist
|
||||||
if not contact_email_only:
|
if not contact_email_only:
|
||||||
return bytes_whois.decode()
|
return bytes_whois.decode()
|
||||||
emails = list(set(re.findall(rb'[\w\.-]+@[\w\.-]+', bytes_whois)))
|
emails = list(set(re.findall(EMAIL_REGEX, bytes_whois)))
|
||||||
return [e.decode() for e in sorted(emails)]
|
return [e.decode() for e in sorted(emails)]
|
||||||
|
|
Loading…
Reference in New Issue