From 466cb5fcd5861ecdf420598713968bbd56b4e48a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 18 May 2020 18:32:59 +0200 Subject: [PATCH] chg: Improve typing --- lookyloo/abstractmanager.py | 6 +-- lookyloo/helpers.py | 14 +++---- lookyloo/lookyloo.py | 24 ++++++------ lookyloo/modules.py | 6 +-- poetry.lock | 76 ++++++++++++++++++------------------- website/web/__init__.py | 75 ++++++++++++++++++++---------------- website/web/proxied.py | 5 ++- 7 files changed, 108 insertions(+), 98 deletions(-) diff --git a/lookyloo/abstractmanager.py b/lookyloo/abstractmanager.py index 2afd9097..8b8ef243 100644 --- a/lookyloo/abstractmanager.py +++ b/lookyloo/abstractmanager.py @@ -15,13 +15,13 @@ class AbstractManager(ABC): self.logger.setLevel(loglevel) self.logger.info(f'Initializing {self.__class__.__name__}') - async def _to_run_forever_async(self): + async def _to_run_forever_async(self) -> None: pass - def _to_run_forever(self): + def _to_run_forever(self) -> None: pass - def run(self, sleep_in_sec: int): + def run(self, sleep_in_sec: int) -> None: self.logger.info(f'Launching {self.__class__.__name__}') while True: if shutdown_requested(): diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index ccefb0cd..bb22a923 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -41,7 +41,7 @@ Run the following command (assuming you run the code from the clonned repository return Path(os.environ['LOOKYLOO_HOME']) -def get_email_template(): +def get_email_template() -> str: with (get_homedir() / 'config' / 'email.tmpl').open() as f: return f.read() @@ -66,7 +66,7 @@ def load_configs(path_to_config_files: Optional[Union[str, Path]]=None) -> Dict[ return to_return -def safe_create_dir(to_create: Path): +def safe_create_dir(to_create: Path) -> None: if to_create.exists() and not to_create.is_dir(): raise CreateDirectoryException(f'The path {to_create} already exists and is not a directory') to_create.mkdir(parents=True, exist_ok=True) @@ -82,7 +82,7 @@ def unset_running(name: str) -> None: r.hdel('running', name) -def is_running() -> dict: +def is_running() -> Dict[Any, Any]: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) return r.hgetall('running') @@ -125,7 +125,7 @@ def long_sleep(sleep_in_sec: int, shutdown_check: int=10) -> bool: return True -def update_user_agents(): +def update_user_agents() -> None: if not HAS_CF: # The website with the UAs is behind Cloudflare's anti-bot page, we need cloudscraper return @@ -151,7 +151,7 @@ def update_user_agents(): traceback.print_exc() return - to_store = {'by_frequency': []} + to_store: Dict[str, Any] = {'by_frequency': []} for ua in json.loads(uas): os = ua['system'].split(' ')[-1] if os not in to_store: @@ -165,7 +165,7 @@ def update_user_agents(): json.dump(to_store, f, indent=2) -def get_user_agents() -> dict: +def get_user_agents() -> Dict[str, Any]: ua_files_path = str(get_homedir() / 'user_agents' / '*' / '*' / '*.json') paths = sorted(glob(ua_files_path), reverse=True) if not paths: @@ -175,7 +175,7 @@ def get_user_agents() -> dict: return json.load(f) -def load_cookies(cookie_pseudofile: Optional[BufferedIOBase]=None) -> List[dict]: +def load_cookies(cookie_pseudofile: Optional[BufferedIOBase]=None) -> List[Dict[str, str]]: if cookie_pseudofile: cookies = json.load(cookie_pseudofile) else: diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 152ce65e..b27bd943 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -12,7 +12,7 @@ from pathlib import Path import pickle import smtplib import socket -from typing import Union, Dict, List, Tuple, Optional, Any +from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping from urllib.parse import urlsplit from uuid import uuid4 from zipfile import ZipFile @@ -61,15 +61,15 @@ class Lookyloo(): else: self.use_sane_js = True - def rebuild_cache(self): + def rebuild_cache(self) -> None: self.redis.flushdb() self._init_existing_dumps() - def remove_pickle(self, capture_dir: Path): + def remove_pickle(self, capture_dir: Path) -> None: if (capture_dir / 'tree.pickle').exists(): (capture_dir / 'tree.pickle').unlink() - def rebuild_all(self): + def rebuild_all(self) -> None: for capture_dir in self.capture_dirs: self.remove_pickle(capture_dir) self.rebuild_cache() @@ -88,7 +88,7 @@ class Lookyloo(): sample_config = json.load(_c) return sample_config[entry] - def get_statistics(self, capture_dir: Path) -> Dict: + def get_statistics(self, capture_dir: Path) -> Dict[str, Any]: # We need the pickle ct = self._load_pickle(capture_dir / 'tree.pickle') if not ct: @@ -110,7 +110,7 @@ class Lookyloo(): else: self.vt.url_lookup(ct.root_hartree.har.first_url, force) - def get_modules_responses(self, capture_dir: Path) -> Optional[Dict]: + def get_modules_responses(self, capture_dir: Path) -> Optional[Dict[str, Any]]: ct = self._load_pickle(capture_dir / 'tree.pickle') if not ct: self.logger.warning('Unable to get the modules responses unless the tree ({capture_dir}) is cached.') @@ -176,7 +176,7 @@ class Lookyloo(): self.redis.hmset(str(capture_dir), cache) self.redis.hset('lookup_dirs', uuid, str(capture_dir)) - def capture_cache(self, capture_dir: Path) -> Optional[Dict[str, Union[str, int]]]: + def capture_cache(self, capture_dir: Path) -> Optional[Dict[str, Any]]: if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1': # try to rebuild the cache self._set_capture_cache(capture_dir, force=True) @@ -208,13 +208,13 @@ class Lookyloo(): f.write(str(uuid4())) return sorted(self.scrape_dir.iterdir(), reverse=True) - def lookup_capture_dir(self, uuid) -> Union[Path, None]: + def lookup_capture_dir(self, uuid: str) -> Union[Path, None]: capture_dir = self.redis.hget('lookup_dirs', uuid) if capture_dir: return Path(capture_dir) return None - def enqueue_scrape(self, query: dict) -> str: + def enqueue_scrape(self, query: MutableMapping[str, Any]) -> str: perma_uuid = str(uuid4()) p = self.redis.pipeline() for key, value in query.items(): @@ -244,7 +244,7 @@ class Lookyloo(): return pickle.load(_p) return None - def send_mail(self, capture_uuid: str, comment: str=''): + def send_mail(self, capture_uuid: str, comment: str='') -> None: if not self.get_config('enable_mail_notification'): return email_config = self.get_config('email') @@ -268,7 +268,7 @@ class Lookyloo(): except Exception as e: logging.exception(e) - def load_tree(self, capture_dir: Path) -> Tuple[str, dict, str, str, str, dict]: + def load_tree(self, capture_dir: Path) -> Tuple[str, str, str, str, str, Dict[str, str]]: har_files = sorted(capture_dir.glob('*.har')) pickle_file = capture_dir / 'tree.pickle' try: @@ -312,7 +312,7 @@ class Lookyloo(): def get_capture(self, capture_dir: Path) -> BytesIO: return self._get_raw(capture_dir) - def sane_js_query(self, sha512: str) -> Dict: + def sane_js_query(self, sha512: str) -> Dict[str, Any]: if self.use_sane_js: return self.sanejs.sha512(sha512) return {'response': []} diff --git a/lookyloo/modules.py b/lookyloo/modules.py index 22e57231..ed8fe0ca 100644 --- a/lookyloo/modules.py +++ b/lookyloo/modules.py @@ -30,7 +30,7 @@ class VirusTotal(): self.storage_dir_vt = get_homedir() / 'vt_url' self.storage_dir_vt.mkdir(parents=True, exist_ok=True) - def __del__(self): + def __del__(self) -> None: if hasattr(self, 'client'): self.client.close() @@ -40,7 +40,7 @@ class VirusTotal(): m.update(url_id.encode()) return self.storage_dir_vt / m.hexdigest() - def get_url_lookup(self, url: str) -> Optional[Dict]: + def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: url_storage_dir = self.__get_cache_directory(url) if not url_storage_dir.exists(): return None @@ -51,7 +51,7 @@ class VirusTotal(): with cached_entries[0].open() as f: return json.load(f) - def url_lookup(self, url: str, force: bool=False): + def url_lookup(self, url: str, force: bool=False) -> None: '''Lookup an URL on VT Note: force means 2 things: * (re)scan of the URL diff --git a/poetry.lock b/poetry.lock index dfa19174..8b8b647e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -94,7 +94,7 @@ description = "Screen-scraping library" name = "beautifulsoup4" optional = false python-versions = "*" -version = "4.9.0" +version = "4.9.1" [package.dependencies] soupsieve = [">1.2", "<2.0"] @@ -167,7 +167,7 @@ description = "A Python module to bypass Cloudflare's anti-bot page." name = "cloudscraper" optional = false python-versions = "*" -version = "1.2.36" +version = "1.2.38" [package.dependencies] pyparsing = ">=2.4.7" @@ -306,7 +306,7 @@ publicsuffix2 = "^2.20191221" six = "^1.14.0" [package.source] -reference = "59195eab02971545cc7adbec01af1ea472a7b2bc" +reference = "543e21d64c8f6828a6094575601a8b498f3e4125" type = "git" url = "https://github.com/viper-framework/har2tree.git" [[package]] @@ -462,7 +462,7 @@ description = "More routines for operating on iterables, beyond itertools" name = "more-itertools" optional = false python-versions = ">=3.5" -version = "8.2.0" +version = "8.3.0" [[package]] category = "main" @@ -470,7 +470,7 @@ description = "multidict implementation" name = "multidict" optional = false python-versions = ">=3.5" -version = "4.7.5" +version = "4.7.6" [[package]] category = "dev" @@ -788,7 +788,7 @@ description = "Python client for Redis key-value store" name = "redis" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -version = "3.5.1" +version = "3.5.2" [package.extras] hiredis = ["hiredis (>=0.1.3)"] @@ -907,7 +907,7 @@ description = "A modern CSS selector implementation for Beautiful Soup." name = "soupsieve" optional = false python-versions = "*" -version = "1.9.5" +version = "1.9.6" [[package]] category = "dev" @@ -1004,7 +1004,7 @@ description = "Library of web-related functions" name = "w3lib" optional = false python-versions = "*" -version = "1.21.0" +version = "1.22.0" [package.dependencies] six = ">=1.4.1" @@ -1120,9 +1120,9 @@ backcall = [ {file = "backcall-0.1.0.zip", hash = "sha256:bbbf4b1e5cd2bdb08f915895b51081c041bac22394fdfcfdfbe9f14b77c08bf2"}, ] beautifulsoup4 = [ - {file = "beautifulsoup4-4.9.0-py2-none-any.whl", hash = "sha256:a4bbe77fd30670455c5296242967a123ec28c37e9702a8a81bd2f20a4baf0368"}, - {file = "beautifulsoup4-4.9.0-py3-none-any.whl", hash = "sha256:d4e96ac9b0c3a6d3f0caae2e4124e6055c5dcafde8e2f831ff194c104f0775a0"}, - {file = "beautifulsoup4-4.9.0.tar.gz", hash = "sha256:594ca51a10d2b3443cbac41214e12dbb2a1cd57e1a7344659849e2e20ba6a8d8"}, + {file = "beautifulsoup4-4.9.1-py2-none-any.whl", hash = "sha256:e718f2342e2e099b640a34ab782407b7b676f47ee272d6739e60b8ea23829f2c"}, + {file = "beautifulsoup4-4.9.1-py3-none-any.whl", hash = "sha256:a6237df3c32ccfaee4fd201c8f5f9d9df619b93121d01353a64a73ce8c6ef9a8"}, + {file = "beautifulsoup4-4.9.1.tar.gz", hash = "sha256:73cc4d115b96f79c7d77c1c7f7a0a8d4c57860d1041df407dd1aae7f07a77fd7"}, ] bootstrap-flask = [ {file = "Bootstrap-Flask-1.3.1.tar.gz", hash = "sha256:fca79b590de6bcdd2ca555899a49bbd8eb784ecdb358ca1fe2ce5fe13a8621fe"}, @@ -1202,8 +1202,8 @@ click = [ {file = "click-7.1.2.tar.gz", hash = "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a"}, ] cloudscraper = [ - {file = "cloudscraper-1.2.36-py2.py3-none-any.whl", hash = "sha256:06eb4fd7462dc08a193228830f45097993efc8af4fd75a74815ba16a05c6a0fd"}, - {file = "cloudscraper-1.2.36.tar.gz", hash = "sha256:dec9d92a323e85d390af8d02e475de425604212bc6e50c78c0897bf05d355352"}, + {file = "cloudscraper-1.2.38-py2.py3-none-any.whl", hash = "sha256:3893be3c281ddb7a39bf35e558da19247a39a408ef48078fdabf5058b7659d6c"}, + {file = "cloudscraper-1.2.38.tar.gz", hash = "sha256:db295c5ca33f22ae058f317b07c6842a2b16d75c9e11e38d21395363d089692f"}, ] colorama = [ {file = "colorama-0.4.3-py2.py3-none-any.whl", hash = "sha256:7d73d2a99753107a36ac6b455ee49046802e59d9d076ef8e47b61499fa29afff"}, @@ -1357,27 +1357,27 @@ markupsafe = [ {file = "MarkupSafe-1.1.1.tar.gz", hash = "sha256:29872e92839765e546828bb7754a68c418d927cd064fd4708fab9fe9c8bb116b"}, ] more-itertools = [ - {file = "more-itertools-8.2.0.tar.gz", hash = "sha256:b1ddb932186d8a6ac451e1d95844b382f55e12686d51ca0c68b6f61f2ab7a507"}, - {file = "more_itertools-8.2.0-py3-none-any.whl", hash = "sha256:5dd8bcf33e5f9513ffa06d5ad33d78f31e1931ac9a18f33d37e77a180d393a7c"}, + {file = "more-itertools-8.3.0.tar.gz", hash = "sha256:558bb897a2232f5e4f8e2399089e35aecb746e1f9191b6584a151647e89267be"}, + {file = "more_itertools-8.3.0-py3-none-any.whl", hash = "sha256:7818f596b1e87be009031c7653d01acc46ed422e6656b394b0f765ce66ed4982"}, ] multidict = [ - {file = "multidict-4.7.5-cp35-cp35m-macosx_10_13_x86_64.whl", hash = "sha256:fc3b4adc2ee8474cb3cd2a155305d5f8eda0a9c91320f83e55748e1fcb68f8e3"}, - {file = "multidict-4.7.5-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:42f56542166040b4474c0c608ed051732033cd821126493cf25b6c276df7dd35"}, - {file = "multidict-4.7.5-cp35-cp35m-win32.whl", hash = "sha256:7774e9f6c9af3f12f296131453f7b81dabb7ebdb948483362f5afcaac8a826f1"}, - {file = "multidict-4.7.5-cp35-cp35m-win_amd64.whl", hash = "sha256:c2c37185fb0af79d5c117b8d2764f4321eeb12ba8c141a95d0aa8c2c1d0a11dd"}, - {file = "multidict-4.7.5-cp36-cp36m-macosx_10_13_x86_64.whl", hash = "sha256:e439c9a10a95cb32abd708bb8be83b2134fa93790a4fb0535ca36db3dda94d20"}, - {file = "multidict-4.7.5-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:85cb26c38c96f76b7ff38b86c9d560dea10cf3459bb5f4caf72fc1bb932c7136"}, - {file = "multidict-4.7.5-cp36-cp36m-win32.whl", hash = "sha256:620b37c3fea181dab09267cd5a84b0f23fa043beb8bc50d8474dd9694de1fa6e"}, - {file = "multidict-4.7.5-cp36-cp36m-win_amd64.whl", hash = "sha256:6e6fef114741c4d7ca46da8449038ec8b1e880bbe68674c01ceeb1ac8a648e78"}, - {file = "multidict-4.7.5-cp37-cp37m-macosx_10_13_x86_64.whl", hash = "sha256:a326f4240123a2ac66bb163eeba99578e9d63a8654a59f4688a79198f9aa10f8"}, - {file = "multidict-4.7.5-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:dc561313279f9d05a3d0ffa89cd15ae477528ea37aa9795c4654588a3287a9ab"}, - {file = "multidict-4.7.5-cp37-cp37m-win32.whl", hash = "sha256:4b7df040fb5fe826d689204f9b544af469593fb3ff3a069a6ad3409f742f5928"}, - {file = "multidict-4.7.5-cp37-cp37m-win_amd64.whl", hash = "sha256:317f96bc0950d249e96d8d29ab556d01dd38888fbe68324f46fd834b430169f1"}, - {file = "multidict-4.7.5-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:b51249fdd2923739cd3efc95a3d6c363b67bbf779208e9f37fd5e68540d1a4d4"}, - {file = "multidict-4.7.5-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:ae402f43604e3b2bc41e8ea8b8526c7fa7139ed76b0d64fc48e28125925275b2"}, - {file = "multidict-4.7.5-cp38-cp38-win32.whl", hash = "sha256:bb519becc46275c594410c6c28a8a0adc66fe24fef154a9addea54c1adb006f5"}, - {file = "multidict-4.7.5-cp38-cp38-win_amd64.whl", hash = "sha256:544fae9261232a97102e27a926019100a9db75bec7b37feedd74b3aa82f29969"}, - {file = "multidict-4.7.5.tar.gz", hash = "sha256:aee283c49601fa4c13adc64c09c978838a7e812f85377ae130a24d7198c0331e"}, + {file = "multidict-4.7.6-cp35-cp35m-macosx_10_14_x86_64.whl", hash = "sha256:275ca32383bc5d1894b6975bb4ca6a7ff16ab76fa622967625baeebcf8079000"}, + {file = "multidict-4.7.6-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:1ece5a3369835c20ed57adadc663400b5525904e53bae59ec854a5d36b39b21a"}, + {file = "multidict-4.7.6-cp35-cp35m-win32.whl", hash = "sha256:5141c13374e6b25fe6bf092052ab55c0c03d21bd66c94a0e3ae371d3e4d865a5"}, + {file = "multidict-4.7.6-cp35-cp35m-win_amd64.whl", hash = "sha256:9456e90649005ad40558f4cf51dbb842e32807df75146c6d940b6f5abb4a78f3"}, + {file = "multidict-4.7.6-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:e0d072ae0f2a179c375f67e3da300b47e1a83293c554450b29c900e50afaae87"}, + {file = "multidict-4.7.6-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:3750f2205b800aac4bb03b5ae48025a64e474d2c6cc79547988ba1d4122a09e2"}, + {file = "multidict-4.7.6-cp36-cp36m-win32.whl", hash = "sha256:f07acae137b71af3bb548bd8da720956a3bc9f9a0b87733e0899226a2317aeb7"}, + {file = "multidict-4.7.6-cp36-cp36m-win_amd64.whl", hash = "sha256:6513728873f4326999429a8b00fc7ceddb2509b01d5fd3f3be7881a257b8d463"}, + {file = "multidict-4.7.6-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:feed85993dbdb1dbc29102f50bca65bdc68f2c0c8d352468c25b54874f23c39d"}, + {file = "multidict-4.7.6-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:fcfbb44c59af3f8ea984de67ec7c306f618a3ec771c2843804069917a8f2e255"}, + {file = "multidict-4.7.6-cp37-cp37m-win32.whl", hash = "sha256:4538273208e7294b2659b1602490f4ed3ab1c8cf9dbdd817e0e9db8e64be2507"}, + {file = "multidict-4.7.6-cp37-cp37m-win_amd64.whl", hash = "sha256:d14842362ed4cf63751648e7672f7174c9818459d169231d03c56e84daf90b7c"}, + {file = "multidict-4.7.6-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:c026fe9a05130e44157b98fea3ab12969e5b60691a276150db9eda71710cd10b"}, + {file = "multidict-4.7.6-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:51a4d210404ac61d32dada00a50ea7ba412e6ea945bbe992e4d7a595276d2ec7"}, + {file = "multidict-4.7.6-cp38-cp38-win32.whl", hash = "sha256:5cf311a0f5ef80fe73e4f4c0f0998ec08f954a6ec72b746f3c179e37de1d210d"}, + {file = "multidict-4.7.6-cp38-cp38-win_amd64.whl", hash = "sha256:7388d2ef3c55a8ba80da62ecfafa06a1c097c18032a501ffd4cabbc52d7f2b19"}, + {file = "multidict-4.7.6.tar.gz", hash = "sha256:fbb77a75e529021e7c4a8d4e823d88ef4d23674a202be4f5addffc72cbb91430"}, ] mypy = [ {file = "mypy-0.761-cp35-cp35m-macosx_10_6_x86_64.whl", hash = "sha256:7f672d02fffcbace4db2b05369142e0506cdcde20cea0e07c7c2171c4fd11dd6"}, @@ -1514,8 +1514,8 @@ queuelib = [ {file = "queuelib-1.5.0.tar.gz", hash = "sha256:42b413295551bdc24ed9376c1a2cd7d0b1b0fa4746b77b27ca2b797a276a1a17"}, ] redis = [ - {file = "redis-3.5.1-py2.py3-none-any.whl", hash = "sha256:a5b0e25890d216d8189636742c50ab992e42eea699bcc1b08cc2d6bf3adff52a"}, - {file = "redis-3.5.1.tar.gz", hash = "sha256:6e9d2722a95d10ddf854596e66516d316d99c6a483e5db3b35c34e1158b2bfa1"}, + {file = "redis-3.5.2-py2.py3-none-any.whl", hash = "sha256:2ef11f489003f151777c064c5dbc6653dfb9f3eade159bcadc524619fddc2242"}, + {file = "redis-3.5.2.tar.gz", hash = "sha256:6d65e84bc58091140081ee9d9c187aab0480097750fac44239307a3bdf0b1251"}, ] requests = [ {file = "requests-2.23.0-py2.py3-none-any.whl", hash = "sha256:43999036bfa82904b6af1d99e4882b560e5e2c68e5c4b0aa03b655f3d7d73fee"}, @@ -1543,8 +1543,8 @@ six = [ {file = "six-1.14.0.tar.gz", hash = "sha256:236bdbdce46e6e6a3d61a337c0f8b763ca1e8717c03b369e87a7ec7ce1319c0a"}, ] soupsieve = [ - {file = "soupsieve-1.9.5-py2.py3-none-any.whl", hash = "sha256:bdb0d917b03a1369ce964056fc195cfdff8819c40de04695a80bc813c3cfa1f5"}, - {file = "soupsieve-1.9.5.tar.gz", hash = "sha256:e2c1c5dee4a1c36bcb790e0fabd5492d874b8ebd4617622c4f6a731701060dda"}, + {file = "soupsieve-1.9.6-py2.py3-none-any.whl", hash = "sha256:feb1e937fa26a69e08436aad4a9037cd7e1d4c7212909502ba30701247ff8abd"}, + {file = "soupsieve-1.9.6.tar.gz", hash = "sha256:7985bacc98c34923a439967c1a602dc4f1e15f923b6fcf02344184f86cc7efaa"}, ] traitlets = [ {file = "traitlets-4.3.3-py2.py3-none-any.whl", hash = "sha256:70b4c6a1d9019d7b4f6846832288f86998aa3b9207c6821f3578a6a6a467fe44"}, @@ -1611,8 +1611,8 @@ vt-py = [ {file = "vt-py-0.5.3.tar.gz", hash = "sha256:0a52d58976ec3baf24ade11d0473773d6c7a8ccf862c86f34bc74216ffbe920f"}, ] w3lib = [ - {file = "w3lib-1.21.0-py2.py3-none-any.whl", hash = "sha256:847704b837b2b973cddef6938325d466628e6078266bc2e1f7ac49ba85c34823"}, - {file = "w3lib-1.21.0.tar.gz", hash = "sha256:8b1854fef570b5a5fc84d960e025debd110485d73fd283580376104762774315"}, + {file = "w3lib-1.22.0-py2.py3-none-any.whl", hash = "sha256:0161d55537063e00d95a241663ede3395c4c6d7b777972ba2fd58bbab2001e53"}, + {file = "w3lib-1.22.0.tar.gz", hash = "sha256:0ad6d0203157d61149fd45aaed2e24f53902989c32fc1dccc2e2bfba371560df"}, ] wcwidth = [ {file = "wcwidth-0.1.9-py2.py3-none-any.whl", hash = "sha256:cafe2186b3c009a04067022ce1dcd79cb38d8d65ee4f4791b8888d6599d1bbe1"}, diff --git a/website/web/__init__.py b/website/web/__init__.py index e5b072dd..ac289848 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -18,7 +18,7 @@ from lookyloo.lookyloo import Lookyloo from lookyloo.exceptions import NoValidHarFile from .proxied import ReverseProxied -from typing import Tuple +from typing import Tuple, Optional, Dict, Any import logging @@ -49,7 +49,7 @@ logging.basicConfig(level=lookyloo.get_config('loglevel')) @auth.get_password -def get_pw(username): +def get_pw(username: str) -> Optional[str]: if username in user: return user.get(username) return None @@ -69,9 +69,9 @@ def rebuild_cache(): return redirect(url_for('index')) -@app.route('/tree//rebuild') +@app.route('/tree//rebuild') @auth.login_required -def rebuild_tree(tree_uuid): +def rebuild_tree(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if capture_dir: lookyloo.remove_pickle(capture_dir) @@ -80,7 +80,7 @@ def rebuild_tree(tree_uuid): # keep -def load_tree(capture_dir: Path) -> Tuple[dict, str, str, str, dict]: +def load_tree(capture_dir: Path) -> Tuple[str, str, str, str, Dict[str, Any]]: session.clear() temp_file_name, tree_json, tree_time, tree_ua, tree_root_url, meta = lookyloo.load_tree(capture_dir) session["tree"] = temp_file_name @@ -102,19 +102,22 @@ def scrape_web(): cookie_file = request.files['cookies'].stream else: cookie_file = None - perma_uuid = lookyloo.scrape(url=request.form.get('url'), - cookies_pseudofile=cookie_file, - depth=request.form.get('depth'), - listing=request.form.get('listing'), user_agent=request.form.get('user_agent'), - os=request.form.get('os'), browser=request.form.get('browser')) - return redirect(url_for('tree', tree_uuid=perma_uuid)) + url = request.form.get('url') + if url: + depth: int = request.form.get('depth') if request.form.get('depth') else 1 # type: ignore + listing: bool = request.form.get('listing') if request.form.get('listing') else False # type: ignore + perma_uuid = lookyloo.scrape(url=url, cookies_pseudofile=cookie_file, + depth=depth, listing=listing, + user_agent=request.form.get('user_agent'), + os=request.form.get('os'), browser=request.form.get('browser')) + return redirect(url_for('tree', tree_uuid=perma_uuid)) user_agents = get_user_agents() user_agents.pop('by_frequency') return render_template('scrape.html', user_agents=user_agents) -@app.route('/tree/hostname//text', methods=['GET']) -def hostnode_details_text(node_uuid): +@app.route('/tree/hostname//text', methods=['GET']) +def hostnode_details_text(node_uuid: str): with open(session["tree"], 'rb') as f: ct = pickle.load(f) hostnode = ct.root_hartree.get_host_node_by_uuid(node_uuid) @@ -131,8 +134,8 @@ def hostnode_details_text(node_uuid): as_attachment=True, attachment_filename='file.md') -@app.route('/tree/hostname/', methods=['GET']) -def hostnode_details(node_uuid): +@app.route('/tree/hostname/', methods=['GET']) +def hostnode_details(node_uuid: str): with open(session["tree"], 'rb') as f: ct = pickle.load(f) hostnode = ct.root_hartree.get_host_node_by_uuid(node_uuid) @@ -147,8 +150,8 @@ def hostnode_details(node_uuid): return json.dumps(urls) -@app.route('/tree/url/', methods=['GET']) -def urlnode_details(node_uuid): +@app.route('/tree/url/', methods=['GET']) +def urlnode_details(node_uuid: str): with open(session["tree"], 'rb') as f: ct = pickle.load(f) urlnode = ct.root_hartree.get_url_node_by_uuid(node_uuid) @@ -170,16 +173,16 @@ def urlnode_details(node_uuid): @app.route('/tree//trigger_modules/', defaults={'force': False}) @app.route('/tree//trigger_modules/', methods=['GET']) -def trigger_modules(tree_uuid, force): +def trigger_modules(tree_uuid: str, force: int): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') - lookyloo.trigger_modules(capture_dir, force) + lookyloo.trigger_modules(capture_dir, True if force else False) return redirect(url_for('modules', tree_uuid=tree_uuid)) @app.route('/tree//stats', methods=['GET']) -def stats(tree_uuid): +def stats(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') @@ -188,7 +191,7 @@ def stats(tree_uuid): @app.route('/tree//modules', methods=['GET']) -def modules(tree_uuid): +def modules(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') @@ -196,7 +199,7 @@ def modules(tree_uuid): if not modules_responses: return redirect(url_for('tree', tree_uuid=tree_uuid)) - vt_short_result = {} + vt_short_result: Dict[str, Dict[str, Any]] = {} if 'vt' in modules_responses: # VirusTotal cleanup vt = modules_responses.pop('vt') @@ -214,7 +217,7 @@ def modules(tree_uuid): @app.route('/tree//image', methods=['GET']) -def image(tree_uuid): +def image(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') @@ -224,7 +227,7 @@ def image(tree_uuid): @app.route('/tree//html', methods=['GET']) -def html(tree_uuid): +def html(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') @@ -234,7 +237,7 @@ def html(tree_uuid): @app.route('/tree//export', methods=['GET']) -def export(tree_uuid): +def export(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') @@ -244,11 +247,13 @@ def export(tree_uuid): @app.route('/redirects/', methods=['GET']) -def redirects(tree_uuid): +def redirects(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if not capture_dir: return Response('Not available.', mimetype='text/text') cache = lookyloo.capture_cache(capture_dir) + if not cache: + return Response('Not available.', mimetype='text/text') if not cache['redirects']: return Response('No redirects.', mimetype='text/text') to_return = BytesIO('\n'.join(cache['redirects']).encode()) @@ -257,7 +262,7 @@ def redirects(tree_uuid): @app.route('/cache_tree/', methods=['GET']) -def cache_tree(tree_uuid): +def cache_tree(tree_uuid: str): capture_dir = lookyloo.lookup_capture_dir(tree_uuid) if capture_dir: lookyloo.load_tree(capture_dir) @@ -265,14 +270,14 @@ def cache_tree(tree_uuid): @app.route('/tree//send_mail', methods=['POST', 'GET']) -def send_mail(tree_uuid): - comment = request.form.get('comment') if request.form.get('comment') else '' +def send_mail(tree_uuid: str): + comment: str = request.form.get('comment') if request.form.get('comment') else '' # type: ignore lookyloo.send_mail(tree_uuid, comment) return redirect(url_for('tree', tree_uuid=tree_uuid)) @app.route('/tree/', methods=['GET']) -def tree(tree_uuid): +def tree(tree_uuid: str): if tree_uuid == 'False': flash("Unable to process your request. The domain may not exist, or splash isn't started", 'error') return redirect(url_for('index')) @@ -282,6 +287,10 @@ def tree(tree_uuid): return redirect(url_for('index')) cache = lookyloo.capture_cache(capture_dir) + if not cache: + flash(f'Invalid cache.', 'error') + return redirect(url_for('index')) + if 'error' in cache: flash(cache['error'], 'error') return redirect(url_for('index')) @@ -299,13 +308,13 @@ def tree(tree_uuid): return render_template('error.html', error_message=e) -def index_generic(show_hidden=False): +def index_generic(show_hidden: bool=False): titles = [] if time_delta_on_index: # We want to filter the captures on the index cut_time = datetime.now() - timedelta(**time_delta_on_index) else: - cut_time = None + cut_time = None # type: ignore for capture_dir in lookyloo.capture_dirs: cached = lookyloo.capture_cache(capture_dir) if not cached or 'error' in cached: @@ -316,7 +325,7 @@ def index_generic(show_hidden=False): continue elif 'no_index' in cached: continue - if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time: + if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time: # type: ignore continue titles.append((cached['uuid'], cached['title'], cached['timestamp'], cached['url'], cached['redirects'], True if cached['incomplete_redirects'] == '1' else False)) diff --git a/website/web/proxied.py b/website/web/proxied.py index 439624f0..507a949a 100644 --- a/website/web/proxied.py +++ b/website/web/proxied.py @@ -1,12 +1,13 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from typing import MutableMapping, Any class ReverseProxied(): - def __init__(self, app): + def __init__(self, app: Any) -> None: self.app = app - def __call__(self, environ, start_response): + def __call__(self, environ: MutableMapping[str, Any], start_response: Any) -> Any: scheme = environ.get('HTTP_X_FORWARDED_PROTO') if not scheme: scheme = environ.get('HTTP_X_SCHEME')