mirror of https://github.com/CIRCL/lookyloo
177 lines
5.7 KiB
Python
177 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from enum import IntEnum, unique
|
|
from functools import lru_cache
|
|
from io import BufferedIOBase
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import pkg_resources
|
|
import requests
|
|
from har2tree import CrawledTree, HostNode, URLNode
|
|
from publicsuffix2 import PublicSuffixList, fetch # type: ignore
|
|
from pytaxonomies import Taxonomies
|
|
from requests.exceptions import HTTPError
|
|
|
|
from .default import get_homedir, safe_create_dir, get_config
|
|
|
|
logger = logging.getLogger('Lookyloo - Helpers')
|
|
|
|
|
|
@unique
|
|
class CaptureStatus(IntEnum):
|
|
UNKNOWN = -1
|
|
QUEUED = 0
|
|
DONE = 1
|
|
ONGOING = 2
|
|
|
|
|
|
# This method is used in json.dump or json.dumps calls as the default parameter:
|
|
# json.dumps(..., default=dump_to_json)
|
|
def serialize_to_json(obj: Union[Set]) -> Union[List]:
|
|
if isinstance(obj, set):
|
|
return list(obj)
|
|
|
|
|
|
def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
|
|
if isinstance(har2tree_container, CrawledTree):
|
|
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
|
|
elif isinstance(har2tree_container, HostNode):
|
|
urlnodes = har2tree_container.urls
|
|
elif isinstance(har2tree_container, URLNode):
|
|
urlnodes = [har2tree_container]
|
|
else:
|
|
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
|
|
all_ressources_hashes: Set[str] = set()
|
|
for urlnode in urlnodes:
|
|
if hasattr(urlnode, 'resources_hashes'):
|
|
all_ressources_hashes.update(urlnode.resources_hashes)
|
|
return all_ressources_hashes
|
|
|
|
|
|
@lru_cache(64)
|
|
def get_taxonomies():
|
|
return Taxonomies()
|
|
|
|
|
|
@lru_cache(64)
|
|
def get_public_suffix_list():
|
|
"""Initialize Public Suffix List"""
|
|
try:
|
|
psl_file = fetch()
|
|
psl = PublicSuffixList(psl_file=psl_file)
|
|
except Exception:
|
|
psl = PublicSuffixList()
|
|
return psl
|
|
|
|
|
|
@lru_cache(64)
|
|
def get_captures_dir() -> Path:
|
|
capture_dir = get_homedir() / 'scraped'
|
|
safe_create_dir(capture_dir)
|
|
return capture_dir
|
|
|
|
|
|
@lru_cache(64)
|
|
def get_email_template() -> str:
|
|
with (get_homedir() / 'config' / 'email.tmpl').open() as f:
|
|
return f.read()
|
|
|
|
|
|
def get_user_agents(directory: str='user_agents') -> Dict[str, Any]:
|
|
ua_files_path = sorted((get_homedir() / directory).glob('**/*.json'), reverse=True)
|
|
with ua_files_path[0].open() as f:
|
|
return json.load(f)
|
|
|
|
|
|
def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, Any]]:
|
|
to_return: Dict[str, Dict[str, Any]] = {}
|
|
for known_content_file in (get_homedir() / directory).glob('*.json'):
|
|
with known_content_file.open() as f:
|
|
to_return[known_content_file.stem] = json.load(f)
|
|
return to_return
|
|
|
|
|
|
def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str]]=None) -> List[Dict[str, Union[str, bool]]]:
|
|
cookies: List[Dict[str, Union[str, bool]]]
|
|
if cookie_pseudofile:
|
|
if isinstance(cookie_pseudofile, str):
|
|
try:
|
|
cookies = json.loads(cookie_pseudofile)
|
|
except json.decoder.JSONDecodeError:
|
|
logger.warning(f'Unable to load json content: {cookie_pseudofile}')
|
|
return []
|
|
else:
|
|
cookies = json.load(cookie_pseudofile)
|
|
else:
|
|
if not (get_homedir() / 'cookies.json').exists():
|
|
return []
|
|
|
|
with (get_homedir() / 'cookies.json').open() as f:
|
|
cookies = json.load(f)
|
|
to_return: List[Dict[str, Union[str, bool]]] = []
|
|
try:
|
|
for cookie in cookies:
|
|
to_add: Dict[str, Union[str, bool]]
|
|
if 'Host raw' in cookie:
|
|
# Cookie export format for Cookie Quick Manager
|
|
u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0] # type: ignore
|
|
to_add = {'path': cookie['Path raw'],
|
|
'name': cookie['Name raw'],
|
|
'httpOnly': cookie['HTTP only raw'] == 'true',
|
|
'secure': cookie['Send for'] == 'Encrypted connections only',
|
|
'expires': (datetime.now() + timedelta(days=10)).strftime('%Y-%m-%dT%H:%M:%S') + 'Z',
|
|
'domain': u,
|
|
'value': cookie['Content raw']
|
|
}
|
|
else:
|
|
# Cookie from lookyloo/splash
|
|
to_add = cookie
|
|
to_return.append(to_add)
|
|
except Exception as e:
|
|
print(f'Unable to load the cookie file: {e}')
|
|
return to_return
|
|
|
|
|
|
def uniq_domains(uniq_urls):
|
|
domains = set()
|
|
for url in uniq_urls:
|
|
splitted = urlparse(url)
|
|
domains.add(splitted.hostname)
|
|
return domains
|
|
|
|
|
|
@lru_cache(64)
|
|
def get_useragent_for_requests():
|
|
version = pkg_resources.get_distribution('lookyloo').version
|
|
return f'Lookyloo / {version}'
|
|
|
|
|
|
@lru_cache(64)
|
|
def get_splash_url() -> str:
|
|
if os.environ.get('SPLASH_URL_DOCKER'):
|
|
# In order to have a working default for the docker image, it is easier to use an environment variable
|
|
return os.environ['SPLASH_URL_DOCKER']
|
|
else:
|
|
return get_config('generic', 'splash_url')
|
|
|
|
|
|
def splash_status() -> Tuple[bool, str]:
|
|
try:
|
|
splash_status = requests.get(urljoin(get_splash_url(), '_ping'))
|
|
splash_status.raise_for_status()
|
|
json_status = splash_status.json()
|
|
if json_status['status'] == 'ok':
|
|
return True, 'Splash is up'
|
|
else:
|
|
return False, str(json_status)
|
|
except HTTPError as http_err:
|
|
return False, f'HTTP error occurred: {http_err}'
|
|
except Exception as err:
|
|
return False, f'Other error occurred: {err}'
|