lookyloo/lookyloo/helpers.py

257 lines
9.4 KiB
Python

#!/usr/bin/env python3
import hashlib
import json
import logging
from datetime import datetime, timedelta
from functools import lru_cache
from importlib.metadata import version
from io import BufferedIOBase
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union
from urllib.parse import urlparse
from har2tree import CrawledTree, HostNode, URLNode
from playwrightcapture import get_devices
from publicsuffix2 import PublicSuffixList, fetch # type: ignore
from pytaxonomies import Taxonomies
from ua_parser import user_agent_parser # type: ignore
from werkzeug.user_agent import UserAgent
from werkzeug.utils import cached_property
from .default import get_homedir, safe_create_dir, get_config
logger = logging.getLogger('Lookyloo - Helpers')
# This method is used in json.dump or json.dumps calls as the default parameter:
# json.dumps(..., default=dump_to_json)
def serialize_to_json(obj: Union[Set]) -> Union[List]:
if isinstance(obj, set):
return sorted(obj)
def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
if isinstance(har2tree_container, CrawledTree):
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
elif isinstance(har2tree_container, HostNode):
urlnodes = har2tree_container.urls
elif isinstance(har2tree_container, URLNode):
urlnodes = [har2tree_container]
else:
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
all_ressources_hashes: Set[str] = set()
for urlnode in urlnodes:
if hasattr(urlnode, 'resources_hashes'):
all_ressources_hashes.update(urlnode.resources_hashes)
return all_ressources_hashes
@lru_cache(64)
def get_taxonomies():
return Taxonomies()
@lru_cache(64)
def get_public_suffix_list():
"""Initialize Public Suffix List"""
try:
psl_file = fetch()
psl = PublicSuffixList(psl_file=psl_file)
except Exception:
psl = PublicSuffixList()
return psl
@lru_cache(64)
def get_captures_dir() -> Path:
capture_dir = get_homedir() / 'scraped'
safe_create_dir(capture_dir)
return capture_dir
@lru_cache(64)
def get_email_template() -> str:
with (get_homedir() / 'config' / 'email.tmpl').open() as f:
return f.read()
class UserAgents:
def __init__(self):
if get_config('generic', 'use_user_agents_users'):
self.path = get_homedir() / 'own_user_agents'
else:
self.path = get_homedir() / 'user_agents'
ua_files_path = sorted(self.path.glob('**/*.json'), reverse=True)
# This call *must* be here because otherwise, we get the devices from within the async
# process and as we already have a playwright context, it fails.
# it is not a problem to have it here because the devices do not change
# until we have a new version playwright, and restart everything anyway.
self.playwright_devices = get_devices()
self._load_newest_ua_file(ua_files_path[0])
def _load_newest_ua_file(self, path: Path):
self.most_recent_ua_path = path
with self.most_recent_ua_path.open() as f:
self.most_recent_uas = json.load(f)
self.by_freq = self.most_recent_uas.pop('by_frequency')
self._load_playwright_devices()
def _load_playwright_devices(self):
# Only get default and desktop for now.
for device_name, details in self.playwright_devices['desktop']['default'].items():
parsed_ua = ParsedUserAgent(details['user_agent'])
if not parsed_ua.platform or not parsed_ua.browser:
continue
platform_key = parsed_ua.platform
if parsed_ua.platform_version:
platform_key = f'{platform_key} {parsed_ua.platform_version}'
browser_key = parsed_ua.browser
if parsed_ua.version:
browser_key = f'{browser_key} {parsed_ua.version}'
if platform_key not in self.most_recent_uas:
self.most_recent_uas[platform_key] = {}
if browser_key not in self.most_recent_uas[platform_key]:
self.most_recent_uas[platform_key][browser_key] = []
if parsed_ua.string in self.most_recent_uas[platform_key][browser_key]:
self.most_recent_uas[platform_key][browser_key].remove(parsed_ua.string)
# We want that one at the top of the list.
self.most_recent_uas[platform_key][browser_key].insert(0, parsed_ua.string)
@property
def user_agents(self) -> Dict[str, Dict[str, List[str]]]:
ua_files_path = sorted(self.path.glob('**/*.json'), reverse=True)
if ua_files_path[0] != self.most_recent_ua_path:
self._load_newest_ua_file(ua_files_path[0])
return self.most_recent_uas
@property
def default(self) -> Dict[str, str]:
'''The default useragent for desktop chrome from playwright'''
parsed_ua = ParsedUserAgent(self.playwright_devices['desktop']['default']['Desktop Chrome']['user_agent'])
platform_key = parsed_ua.platform
if parsed_ua.platform_version:
platform_key = f'{platform_key} {parsed_ua.platform_version}'
browser_key = parsed_ua.browser
if parsed_ua.version:
browser_key = f'{browser_key} {parsed_ua.version}'
if not platform_key or not browser_key:
raise Exception(f'Unable to get valid default user agent from playwright: {parsed_ua}')
return {'os': platform_key,
'browser': browser_key,
'useragent': parsed_ua.string}
def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, Any]]:
to_return: Dict[str, Dict[str, Any]] = {}
for known_content_file in (get_homedir() / directory).glob('*.json'):
with known_content_file.open() as f:
to_return[known_content_file.stem] = json.load(f)
return to_return
def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes]]=None) -> List[Dict[str, Union[str, bool]]]:
cookies: List[Dict[str, Union[str, bool]]]
if cookie_pseudofile:
if isinstance(cookie_pseudofile, (str, bytes)):
try:
cookies = json.loads(cookie_pseudofile)
except json.decoder.JSONDecodeError:
logger.warning(f'Unable to load json content: {cookie_pseudofile!r}')
return []
else:
# Note: we might have an empty BytesIO, which is not False.
try:
cookies = json.load(cookie_pseudofile)
except json.decoder.JSONDecodeError:
logger.warning(f'Unable to load json content: {cookie_pseudofile}')
return []
else:
if not (get_homedir() / 'cookies.json').exists():
return []
with (get_homedir() / 'cookies.json').open() as f:
cookies = json.load(f)
to_return: List[Dict[str, Union[str, bool]]] = []
try:
for cookie in cookies:
to_add: Dict[str, Union[str, bool]]
if 'Host raw' in cookie and isinstance(cookie['Host raw'], str):
# Cookie export format for Cookie Quick Manager
u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0]
to_add = {'path': cookie['Path raw'],
'name': cookie['Name raw'],
'httpOnly': cookie['HTTP only raw'] == 'true',
'secure': cookie['Send for'] == 'Encrypted connections only',
'expires': (datetime.now() + timedelta(days=10)).strftime('%Y-%m-%dT%H:%M:%S') + 'Z',
'domain': u,
'value': cookie['Content raw']
}
else:
# Cookie from lookyloo/playwright
to_add = cookie
to_return.append(to_add)
except Exception as e:
print(f'Unable to load the cookie file: {e}')
return to_return
def uniq_domains(uniq_urls):
domains = set()
for url in uniq_urls:
splitted = urlparse(url)
domains.add(splitted.hostname)
return domains
@lru_cache(64)
def get_useragent_for_requests():
return f'Lookyloo / {version("lookyloo")}'
def get_cache_directory(root: Path, identifier: str, namespace: Optional[Union[str, Path]] = None) -> Path:
m = hashlib.md5()
m.update(identifier.encode())
digest = m.hexdigest()
if namespace:
root = root / namespace
return root / digest[0] / digest[1] / digest[2] / digest
class ParsedUserAgent(UserAgent):
# from https://python.tutorialink.com/how-do-i-get-the-user-agent-with-flask/
@cached_property
def _details(self):
return user_agent_parser.Parse(self.string)
@property
def platform(self):
return self._details['os'].get('family')
@property
def platform_version(self) -> Optional[str]:
return self._aggregate_version(self._details['os'])
@property
def browser(self):
return self._details['user_agent'].get('family')
@property
def version(self):
return self._aggregate_version(self._details['user_agent'])
def _aggregate_version(self, details: Dict[str, str]) -> Optional[str]:
return '.'.join(
part
for key in ('major', 'minor', 'patch', 'patch_minor')
if (part := details.get(key)) is not None
)
def __str__(self):
return f'OS: {self.platform} - Browser: {self.browser} {self.version} - UA: {self.string}'