mirror of https://github.com/MISP/PyMISP
				
				
				
			
		
			
				
	
	
		
			210 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			210 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import ipaddress
 | |
| import socket
 | |
| import idna
 | |
| from publicsuffixlist import PublicSuffixList  # type: ignore
 | |
| from urllib.parse import urlparse, urlunparse, ParseResult
 | |
| 
 | |
| 
 | |
| class UrlNotDecoded(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class PSLFaup:
 | |
|     """
 | |
|     Fake Faup Python Library using PSL for Windows support
 | |
|     """
 | |
| 
 | |
|     def __init__(self) -> None:
 | |
|         self.decoded = False
 | |
|         self.psl = PublicSuffixList()
 | |
|         self._url: ParseResult | None = None
 | |
|         self._retval: dict[str, str | int | None | bytes] = {}
 | |
|         self.ip_as_host = ''
 | |
| 
 | |
|     def _clear(self) -> None:
 | |
|         self.decoded = False
 | |
|         self._url = None
 | |
|         self._retval = {}
 | |
|         self.ip_as_host = ''
 | |
| 
 | |
|     def decode(self, url: str) -> None:
 | |
|         """
 | |
|         This function creates a dict of all the url fields.
 | |
|         :param url: The URL to normalize
 | |
|         """
 | |
|         self._clear()
 | |
|         if isinstance(url, bytes) and b'//' not in url[:10]:
 | |
|             url = b'//' + url
 | |
|         elif '//' not in url[:10]:
 | |
|             url = '//' + url
 | |
|         self._url = urlparse(url)
 | |
| 
 | |
|         if self._url is None:
 | |
|             raise UrlNotDecoded("Unable to parse URL")
 | |
| 
 | |
|         self.ip_as_host = ''
 | |
|         if self._url.hostname is None:
 | |
|             raise UrlNotDecoded("Unable to parse URL")
 | |
|         hostname = _ensure_str(self._url.hostname)
 | |
|         try:
 | |
|             ipv4_bytes = socket.inet_aton(hostname)
 | |
|             ipv4 = ipaddress.IPv4Address(ipv4_bytes)
 | |
|             self.ip_as_host = ipv4.compressed
 | |
|         except (OSError, ValueError):
 | |
|             try:
 | |
|                 addr, _, _ = hostname.partition('%')
 | |
|                 ipv6 = ipaddress.IPv6Address(addr)
 | |
|                 self.ip_as_host = ipv6.compressed
 | |
|             except ValueError:
 | |
|                 pass
 | |
| 
 | |
|         self.decoded = True
 | |
|         self._retval = {}
 | |
| 
 | |
|     @property
 | |
|     def url(self) -> bytes | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if host := self.get_host():
 | |
|             netloc = host + ('' if self.get_port() is None else f':{self.get_port()}')
 | |
|             return _ensure_bytes(
 | |
|                 urlunparse(
 | |
|                     (self.get_scheme(), netloc, self.get_resource_path(),
 | |
|                      '', self.get_query_string(), self.get_fragment(),)
 | |
|                 )
 | |
|             )
 | |
|         return None
 | |
| 
 | |
|     def get_scheme(self) -> str:
 | |
|         """
 | |
|         Get the scheme of the url given in the decode function
 | |
|         :returns: The URL scheme
 | |
|         """
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         return _ensure_str(self._url.scheme if self._url.scheme else '')
 | |
| 
 | |
|     def get_credential(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if self._url.username and self._url.password:
 | |
|             return _ensure_str(self._url.username) + ':' + _ensure_str(self._url.password)
 | |
|         if self._url.username:
 | |
|             return _ensure_str(self._url.username)
 | |
|         return None
 | |
| 
 | |
|     def get_subdomain(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if self.get_host() is not None and not self.ip_as_host:
 | |
|             domain = self.get_domain()
 | |
|             host = self.get_host()
 | |
|             if domain and host and domain in host:
 | |
|                 return host.rsplit(domain, 1)[0].rstrip('.') or None
 | |
|         return None
 | |
| 
 | |
|     def get_domain(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if self.get_host() is not None and not self.ip_as_host:
 | |
|             return self.psl.privatesuffix(self.get_host())
 | |
|         return None
 | |
| 
 | |
|     def get_domain_without_tld(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if self.get_tld() is not None and not self.ip_as_host:
 | |
|             if domain := self.get_domain():
 | |
|                 return domain.rsplit(self.get_tld(), 1)[0].rstrip('.')
 | |
|         return None
 | |
| 
 | |
|     def get_host(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if self._url.hostname is None:
 | |
|             return None
 | |
|         elif self._url.hostname.isascii():
 | |
|             return _ensure_str(self._url.hostname)
 | |
|         else:
 | |
|             return _ensure_str(idna.encode(self._url.hostname, uts46=True))
 | |
| 
 | |
|     def get_unicode_host(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if not self.ip_as_host:
 | |
|             if host := self.get_host():
 | |
|                 return idna.decode(host, uts46=True)
 | |
|         return None
 | |
| 
 | |
|     def get_tld(self) -> str | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         if self.get_host() is not None and not self.ip_as_host:
 | |
|             return self.psl.publicsuffix(self.get_host())
 | |
|         return None
 | |
| 
 | |
|     def get_port(self) -> int | None:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         return self._url.port
 | |
| 
 | |
|     def get_resource_path(self) -> str:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         return _ensure_str(self._url.path)
 | |
| 
 | |
|     def get_query_string(self) -> str:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         return _ensure_str(self._url.query)
 | |
| 
 | |
|     def get_fragment(self) -> str:
 | |
|         if not self.decoded or not self._url:
 | |
|             raise UrlNotDecoded("You must call faup.decode() first")
 | |
| 
 | |
|         return _ensure_str(self._url.fragment)
 | |
| 
 | |
|     def get(self) -> dict[str, str | int | None | bytes]:
 | |
|         self._retval["scheme"] = self.get_scheme()
 | |
|         self._retval["tld"] = self.get_tld()
 | |
|         self._retval["domain"] = self.get_domain()
 | |
|         self._retval["domain_without_tld"] = self.get_domain_without_tld()
 | |
|         self._retval["subdomain"] = self.get_subdomain()
 | |
|         self._retval["host"] = self.get_host()
 | |
|         self._retval["port"] = self.get_port()
 | |
|         self._retval["resource_path"] = self.get_resource_path()
 | |
|         self._retval["query_string"] = self.get_query_string()
 | |
|         self._retval["fragment"] = self.get_fragment()
 | |
|         self._retval["url"] = self.url
 | |
|         return self._retval
 | |
| 
 | |
| 
 | |
| def _ensure_bytes(binary: str | bytes) -> bytes:
 | |
|     if isinstance(binary, bytes):
 | |
|         return binary
 | |
|     else:
 | |
|         return binary.encode('utf-8')
 | |
| 
 | |
| 
 | |
| def _ensure_str(string: str | bytes) -> str:
 | |
|     if isinstance(string, str):
 | |
|         return string
 | |
|     else:
 | |
|         return string.decode('utf-8')
 |