From dec6920cb82ddb7edb637ccf78a50ef82663854a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 6 Jan 2020 15:32:38 +0100 Subject: [PATCH] chg: Add typing --- .travis.yml | 15 ++++++++ Pipfile | 1 + Pipfile.lock | 77 ++++++++++++++++++++++++++++++++++++----- lookyloo/helpers.py | 16 ++++----- lookyloo/lookyloo.py | 77 +++++++++++++++++++++-------------------- setup.py | 2 +- website/web/__init__.py | 19 +++++----- 7 files changed, 144 insertions(+), 63 deletions(-) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..9304d7b --- /dev/null +++ b/.travis.yml @@ -0,0 +1,15 @@ +language: python + +python: + - "3.6" + - "3.6-dev" + - "3.7" + - "3.7-dev" + - "nightly" + +install: + - pip install pipenv + - pipenv install --dev + +script: + - pipenv run mypy . diff --git a/Pipfile b/Pipfile index c398170..54d9788 100644 --- a/Pipfile +++ b/Pipfile @@ -4,6 +4,7 @@ url = "https://pypi.org/simple" verify_ssl = true [dev-packages] +mypy = "*" [packages] scrapysplashwrapper = {editable = true,git = "https://github.com/viper-framework/ScrapySplashWrapper.git"} diff --git a/Pipfile.lock b/Pipfile.lock index aef13ee..64b3b76 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "79c98ed3b6e145e906e0dbe368a5568ee498c330ec7c6ce09020417d3b403cc4" + "sha256": "ec4e1b4e58752b96f77adbe6eee453274648412e3e908a9efa2b1fa06b40b25a" }, "pipfile-spec": 6, "requires": { @@ -32,12 +32,12 @@ }, "beautifulsoup4": { "hashes": [ - "sha256:5279c36b4b2ec2cb4298d723791467e3000e5384a43ea0cdf5d45207c7e97169", - "sha256:6135db2ba678168c07950f9a16c4031822c6f4aec75a65e0a97bc5ca09789931", - "sha256:dcdef580e18a76d54002088602eba453eec38ebbcafafeaabd8cab12b6155d57" + "sha256:05fd825eb01c290877657a56df4c6e4c311b3965bda790c613a3d6fb01a5462a", + "sha256:9fbb4d6e48ecd30bcacc5b63b94088192dcda178513b2ae3c394229f8911b887", + "sha256:e1505eeed31b0f4ce2dbb3bc8eb256c04cc2b3b72af7d551a4ab6efd5cbe5dae" ], "index": "pypi", - "version": "==4.8.1" + "version": "==4.8.2" }, "bootstrap-flask": { "hashes": [ @@ -250,7 +250,7 @@ "har2tree": { "editable": true, "git": "https://github.com/viper-framework/har2tree.git", - "ref": "58481e33788b48364472b44f4b74b22ded66e6fb" + "ref": "09421d04b9e3c985b61404bec828b4be7d892e01" }, "hyperlink": { "hashes": [ @@ -467,7 +467,7 @@ "scrapysplashwrapper": { "editable": true, "git": "https://github.com/viper-framework/ScrapySplashWrapper.git", - "ref": "e4f51e9775af60be8ab9e66d3e1d35581ba3f63a" + "ref": "235b090d5b3024459ba9f91fa5f61660b4af5014" }, "service-identity": { "hashes": [ @@ -581,5 +581,66 @@ "version": "==4.7.1" } }, - "develop": {} + "develop": { + "mypy": { + "hashes": [ + "sha256:0a9a45157e532da06fe56adcfef8a74629566b607fa2c1ac0122d1ff995c748a", + "sha256:2c35cae79ceb20d47facfad51f952df16c2ae9f45db6cb38405a3da1cf8fc0a7", + "sha256:4b9365ade157794cef9685791032521233729cb00ce76b0ddc78749abea463d2", + "sha256:53ea810ae3f83f9c9b452582261ea859828a9ed666f2e1ca840300b69322c474", + "sha256:634aef60b4ff0f650d3e59d4374626ca6153fcaff96ec075b215b568e6ee3cb0", + "sha256:7e396ce53cacd5596ff6d191b47ab0ea18f8e0ec04e15d69728d530e86d4c217", + "sha256:7eadc91af8270455e0d73565b8964da1642fe226665dd5c9560067cd64d56749", + "sha256:7f672d02fffcbace4db2b05369142e0506cdcde20cea0e07c7c2171c4fd11dd6", + "sha256:85baab8d74ec601e86134afe2bcccd87820f79d2f8d5798c889507d1088287bf", + "sha256:87c556fb85d709dacd4b4cb6167eecc5bbb4f0a9864b69136a0d4640fdc76a36", + "sha256:a6bd44efee4dc8c3324c13785a9dc3519b3ee3a92cada42d2b57762b7053b49b", + "sha256:c6d27bd20c3ba60d5b02f20bd28e20091d6286a699174dfad515636cb09b5a72", + "sha256:e2bb577d10d09a2d8822a042a23b8d62bc3b269667c9eb8e60a6edfa000211b1", + "sha256:f97a605d7c8bc2c6d1172c2f0d5a65b24142e11a58de689046e62c2d632ca8c1" + ], + "index": "pypi", + "version": "==0.761" + }, + "mypy-extensions": { + "hashes": [ + "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d", + "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8" + ], + "version": "==0.4.3" + }, + "typed-ast": { + "hashes": [ + "sha256:1170afa46a3799e18b4c977777ce137bb53c7485379d9706af8a59f2ea1aa161", + "sha256:18511a0b3e7922276346bcb47e2ef9f38fb90fd31cb9223eed42c85d1312344e", + "sha256:262c247a82d005e43b5b7f69aff746370538e176131c32dda9cb0f324d27141e", + "sha256:2b907eb046d049bcd9892e3076c7a6456c93a25bebfe554e931620c90e6a25b0", + "sha256:354c16e5babd09f5cb0ee000d54cfa38401d8b8891eefa878ac772f827181a3c", + "sha256:48e5b1e71f25cfdef98b013263a88d7145879fbb2d5185f2a0c79fa7ebbeae47", + "sha256:4e0b70c6fc4d010f8107726af5fd37921b666f5b31d9331f0bd24ad9a088e631", + "sha256:630968c5cdee51a11c05a30453f8cd65e0cc1d2ad0d9192819df9978984529f4", + "sha256:66480f95b8167c9c5c5c87f32cf437d585937970f3fc24386f313a4c97b44e34", + "sha256:71211d26ffd12d63a83e079ff258ac9d56a1376a25bc80b1cdcdf601b855b90b", + "sha256:7954560051331d003b4e2b3eb822d9dd2e376fa4f6d98fee32f452f52dd6ebb2", + "sha256:838997f4310012cf2e1ad3803bce2f3402e9ffb71ded61b5ee22617b3a7f6b6e", + "sha256:95bd11af7eafc16e829af2d3df510cecfd4387f6453355188342c3e79a2ec87a", + "sha256:bc6c7d3fa1325a0c6613512a093bc2a2a15aeec350451cbdf9e1d4bffe3e3233", + "sha256:cc34a6f5b426748a507dd5d1de4c1978f2eb5626d51326e43280941206c209e1", + "sha256:d755f03c1e4a51e9b24d899561fec4ccaf51f210d52abdf8c07ee2849b212a36", + "sha256:d7c45933b1bdfaf9f36c579671fec15d25b06c8398f113dab64c18ed1adda01d", + "sha256:d896919306dd0aa22d0132f62a1b78d11aaf4c9fc5b3410d3c666b818191630a", + "sha256:fdc1c9bbf79510b76408840e009ed65958feba92a88833cdceecff93ae8fff66", + "sha256:ffde2fbfad571af120fcbfbbc61c72469e72f550d676c3342492a9dfdefb8f12" + ], + "version": "==1.4.0" + }, + "typing-extensions": { + "hashes": [ + "sha256:091ecc894d5e908ac75209f10d5b4f118fbdb2eb1ede6a63544054bb1edb41f2", + "sha256:910f4656f54de5993ad9304959ce9bb903f90aadc7c67a0bef07e678014e892d", + "sha256:cf8b63fedea4d89bab840ecbb93e75578af28f76f66c35889bd7065f5af88575" + ], + "version": "==3.7.4.1" + } + } } diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index d7ea478..17a3f78 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -7,17 +7,18 @@ from redis import Redis from redis.exceptions import ConnectionError from datetime import datetime, timedelta import time -from bs4 import BeautifulSoup +from glob import glob import json + +from bs4 import BeautifulSoup # type: ignore try: - import cfscrape + import cfscrape # type: ignore HAS_CF = True except ImportError: HAS_CF = False -from glob import glob -def get_homedir(): +def get_homedir() -> Path: if not os.environ.get('LOOKYLOO_HOME'): guessed_home = Path(__file__).resolve().parent.parent raise MissingEnv(f"LOOKYLOO_HOME is missing. \ @@ -59,8 +60,7 @@ def check_running(name: str) -> bool: socket_path = get_socket_path(name) try: r = Redis(unix_socket_path=socket_path) - if r.ping(): - return True + return True if r.ping() else False except ConnectionError: return False @@ -68,7 +68,7 @@ def check_running(name: str) -> bool: def shutdown_requested() -> bool: try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) - return r.exists('shutdown') + return True if r.exists('shutdown') else False except ConnectionRefusedError: return True except ConnectionError: @@ -119,7 +119,7 @@ def update_user_agents(): json.dump(to_store, f, indent=2) -def get_user_agents(): +def get_user_agents() -> dict: ua_files_path = str(get_homedir() / 'user_agents' / '*' / '*' / '*.json') paths = sorted(glob(ua_files_path), reverse=True) if not paths: diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 0f26fb1..881a435 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -3,8 +3,6 @@ import json -from scrapysplashwrapper import crawl -from har2tree import CrawledTree, Har2TreeError import pickle from datetime import datetime @@ -21,24 +19,28 @@ from io import BytesIO import base64 from uuid import uuid4 -from pysanejs import SaneJS - from pathlib import Path from .helpers import get_homedir, get_socket_path from .exceptions import NoValidHarFile from redis import Redis +from typing import Union, Dict, List, Tuple + import logging +from pysanejs import SaneJS # type: ignore +from scrapysplashwrapper import crawl # type: ignore +from har2tree import CrawledTree, Har2TreeError # type: ignore + class Lookyloo(): - def __init__(self, splash_url: str='http://127.0.0.1:8050', loglevel: int=logging.DEBUG, only_global_lookups=False): + def __init__(self, splash_url: str='http://127.0.0.1:8050', loglevel: int=logging.DEBUG, only_global_lookups: bool=False) -> None: self.__init_logger(loglevel) - self.redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True) - self.scrape_dir = get_homedir() / 'scraped' - self.splash_url = splash_url - self.only_global_lookups = only_global_lookups + self.redis: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True) + self.scrape_dir: Path = get_homedir() / 'scraped' + self.splash_url: str = splash_url + self.only_global_lookups: bool = only_global_lookups if not self.scrape_dir.exists(): self.scrape_dir.mkdir(parents=True, exist_ok=True) @@ -50,11 +52,11 @@ class Lookyloo(): if not self.sanejs.is_up: self.sanejs = None - def __init_logger(self, loglevel) -> None: + def __init_logger(self, loglevel: int) -> None: self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(loglevel) - def _set_report_cache(self, report_dir: str): + def _set_report_cache(self, report_dir: Path) -> None: if self.redis.exists(str(report_dir)): return har_files = sorted(report_dir.glob('*.har')) @@ -80,19 +82,19 @@ class Lookyloo(): self.redis.hmset(str(report_dir), cache) self.redis.hset('lookup_dirs', uuid, str(report_dir)) - def report_cache(self, report_dir) -> dict: + def report_cache(self, report_dir: Union[str, Path]) -> Dict: if isinstance(report_dir, Path): report_dir = str(report_dir) return self.redis.hgetall(report_dir) - def _init_existing_dumps(self): + def _init_existing_dumps(self) -> None: for report_dir in self.report_dirs: if report_dir.exists(): self._set_report_cache(report_dir) self.redis.set('cache_loaded', 1) @property - def report_dirs(self): + def report_dirs(self) -> List[Path]: for report_dir in self.scrape_dir.iterdir(): if report_dir.is_dir() and not report_dir.iterdir(): # Cleanup self.scrape_dir of failed runs. @@ -103,13 +105,13 @@ class Lookyloo(): f.write(str(uuid4())) return sorted(self.scrape_dir.iterdir(), reverse=True) - def lookup_report_dir(self, uuid) -> Path: + def lookup_report_dir(self, uuid) -> Union[Path, None]: report_dir = self.redis.hget('lookup_dirs', uuid) if report_dir: return Path(report_dir) return None - def enqueue_scrape(self, query: dict): + def enqueue_scrape(self, query: dict) -> str: perma_uuid = str(uuid4()) p = self.redis.pipeline() p.hmset(perma_uuid, query) @@ -117,7 +119,7 @@ class Lookyloo(): p.execute() return perma_uuid - def process_scrape_queue(self): + def process_scrape_queue(self) -> Union[bool, None]: uuid = self.redis.spop('to_scrape') if not uuid: return None @@ -129,7 +131,7 @@ class Lookyloo(): return True return False - def load_tree(self, report_dir: Path): + def load_tree(self, report_dir: Path) -> Tuple[str, dict, str, str, str, dict]: har_files = sorted(report_dir.glob('*.har')) try: meta = {} @@ -151,25 +153,26 @@ class Lookyloo(): if time.time() - tmpfile.stat().st_atime > 36000: tmpfile.unlink() - def load_image(self, report_dir): + def load_image(self, report_dir: Path) -> BytesIO: with open(list(report_dir.glob('*.png'))[0], 'rb') as f: return BytesIO(f.read()) - def sane_js_query(self, sha512: str): + def sane_js_query(self, sha512: str) -> Dict: if self.sanejs: return self.sanejs.sha512(sha512) return {'response': []} - def scrape(self, url, depth: int=1, listing: bool=True, user_agent: str=None, perma_uuid: str=None, - os: str=None, browser: str=None): + def scrape(self, url: str, depth: int=1, listing: bool=True, user_agent: str=None, perma_uuid: str=None, + os: str=None, browser: str=None) -> Union[bool, str]: if not url.startswith('http'): url = f'http://{url}' if self.only_global_lookups: splitted_url = urlsplit(url) if splitted_url.netloc: - ip = socket.gethostbyname(splitted_url.hostname) - if not ipaddress.ip_address(ip).is_global: - return False + if splitted_url.hostname: + ip = socket.gethostbyname(splitted_url.hostname) + if not ipaddress.ip_address(ip).is_global: + return False else: return False @@ -187,16 +190,16 @@ class Lookyloo(): png = base64.b64decode(item['png']) child_frames = item['childFrames'] html = item['html'] - with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as f: - json.dump(harfile, f) - with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as f: - f.write(png) - with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as f: - f.write(html) - with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as f: - json.dump(child_frames, f) - with (dirpath / 'uuid').open('w') as f: - f.write(perma_uuid) + with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as _har: + json.dump(harfile, _har) + with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as _img: + _img.write(png) + with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as _html: + _html.write(html) + with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as _iframes: + json.dump(child_frames, _iframes) + with (dirpath / 'uuid').open('w') as _uuid: + _uuid.write(perma_uuid) if not listing: # Write no_index marker (dirpath / 'no_index').touch() if os or browser: @@ -205,7 +208,7 @@ class Lookyloo(): meta['os'] = os if browser: meta['browser'] = browser - with (dirpath / 'meta').open('w') as f: - json.dump(meta, f) + with (dirpath / 'meta').open('w') as _meta: + json.dump(meta, _meta) self._set_report_cache(dirpath) return perma_uuid diff --git a/setup.py b/setup.py index c346f11..591379a 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from setuptools import setup +from setuptools import setup # type: ignore setup( diff --git a/website/web/__init__.py b/website/web/__init__.py index 06230e4..bf4e165 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -7,18 +7,20 @@ from zipfile import ZipFile, ZIP_DEFLATED from io import BytesIO import os import logging +from pathlib import Path from flask import Flask, render_template, request, session, send_file, redirect, url_for, Response -from flask_bootstrap import Bootstrap +from flask_bootstrap import Bootstrap # type: ignore from lookyloo.helpers import get_homedir, update_user_agents, get_user_agents from lookyloo.lookyloo import Lookyloo from lookyloo.exceptions import NoValidHarFile +from typing import Tuple -app = Flask(__name__) +app: Flask = Flask(__name__) -secret_file_path = get_homedir() / 'secret_key' +secret_file_path: Path = get_homedir() / 'secret_key' if not secret_file_path.exists() or secret_file_path.stat().st_size < 64: with secret_file_path.open('wb') as f: @@ -32,21 +34,20 @@ app.config['BOOTSTRAP_SERVE_LOCAL'] = True app.config['SESSION_COOKIE_NAME'] = 'lookyloo' app.debug = False +splash_url: str = 'http://127.0.0.1:8050' # API entry point for splash if os.environ.get('SPLASH_URL'): - splash_url = os.environ.get('SPLASH_URL') -else: - splash_url = 'http://127.0.0.1:8050' + splash_url = os.environ['SPLASH_URL'] # Splash log level loglevel = logging.DEBUG # Set it to True if your instance is publicly available so users aren't able to scan your internal network -only_global_lookups = False +only_global_lookups: bool = False -lookyloo = Lookyloo(splash_url=splash_url, loglevel=loglevel, only_global_lookups=only_global_lookups) +lookyloo: Lookyloo = Lookyloo(splash_url=splash_url, loglevel=loglevel, only_global_lookups=only_global_lookups) # keep -def load_tree(report_dir): +def load_tree(report_dir: Path) -> Tuple[dict, str, str, str, dict]: session.clear() temp_file_name, tree_json, tree_time, tree_ua, tree_root_url, meta = lookyloo.load_tree(report_dir) session["tree"] = temp_file_name