chg: Add typing

pull/62/head
Raphaël Vinot 2020-01-06 15:32:38 +01:00
parent 5cfe5306b2
commit dec6920cb8
7 changed files with 144 additions and 63 deletions

15
.travis.yml Normal file
View File

@ -0,0 +1,15 @@
language: python
python:
- "3.6"
- "3.6-dev"
- "3.7"
- "3.7-dev"
- "nightly"
install:
- pip install pipenv
- pipenv install --dev
script:
- pipenv run mypy .

View File

@ -4,6 +4,7 @@ url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
mypy = "*"
[packages]
scrapysplashwrapper = {editable = true,git = "https://github.com/viper-framework/ScrapySplashWrapper.git"}

77
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "79c98ed3b6e145e906e0dbe368a5568ee498c330ec7c6ce09020417d3b403cc4"
"sha256": "ec4e1b4e58752b96f77adbe6eee453274648412e3e908a9efa2b1fa06b40b25a"
},
"pipfile-spec": 6,
"requires": {
@ -32,12 +32,12 @@
},
"beautifulsoup4": {
"hashes": [
"sha256:5279c36b4b2ec2cb4298d723791467e3000e5384a43ea0cdf5d45207c7e97169",
"sha256:6135db2ba678168c07950f9a16c4031822c6f4aec75a65e0a97bc5ca09789931",
"sha256:dcdef580e18a76d54002088602eba453eec38ebbcafafeaabd8cab12b6155d57"
"sha256:05fd825eb01c290877657a56df4c6e4c311b3965bda790c613a3d6fb01a5462a",
"sha256:9fbb4d6e48ecd30bcacc5b63b94088192dcda178513b2ae3c394229f8911b887",
"sha256:e1505eeed31b0f4ce2dbb3bc8eb256c04cc2b3b72af7d551a4ab6efd5cbe5dae"
],
"index": "pypi",
"version": "==4.8.1"
"version": "==4.8.2"
},
"bootstrap-flask": {
"hashes": [
@ -250,7 +250,7 @@
"har2tree": {
"editable": true,
"git": "https://github.com/viper-framework/har2tree.git",
"ref": "58481e33788b48364472b44f4b74b22ded66e6fb"
"ref": "09421d04b9e3c985b61404bec828b4be7d892e01"
},
"hyperlink": {
"hashes": [
@ -467,7 +467,7 @@
"scrapysplashwrapper": {
"editable": true,
"git": "https://github.com/viper-framework/ScrapySplashWrapper.git",
"ref": "e4f51e9775af60be8ab9e66d3e1d35581ba3f63a"
"ref": "235b090d5b3024459ba9f91fa5f61660b4af5014"
},
"service-identity": {
"hashes": [
@ -581,5 +581,66 @@
"version": "==4.7.1"
}
},
"develop": {}
"develop": {
"mypy": {
"hashes": [
"sha256:0a9a45157e532da06fe56adcfef8a74629566b607fa2c1ac0122d1ff995c748a",
"sha256:2c35cae79ceb20d47facfad51f952df16c2ae9f45db6cb38405a3da1cf8fc0a7",
"sha256:4b9365ade157794cef9685791032521233729cb00ce76b0ddc78749abea463d2",
"sha256:53ea810ae3f83f9c9b452582261ea859828a9ed666f2e1ca840300b69322c474",
"sha256:634aef60b4ff0f650d3e59d4374626ca6153fcaff96ec075b215b568e6ee3cb0",
"sha256:7e396ce53cacd5596ff6d191b47ab0ea18f8e0ec04e15d69728d530e86d4c217",
"sha256:7eadc91af8270455e0d73565b8964da1642fe226665dd5c9560067cd64d56749",
"sha256:7f672d02fffcbace4db2b05369142e0506cdcde20cea0e07c7c2171c4fd11dd6",
"sha256:85baab8d74ec601e86134afe2bcccd87820f79d2f8d5798c889507d1088287bf",
"sha256:87c556fb85d709dacd4b4cb6167eecc5bbb4f0a9864b69136a0d4640fdc76a36",
"sha256:a6bd44efee4dc8c3324c13785a9dc3519b3ee3a92cada42d2b57762b7053b49b",
"sha256:c6d27bd20c3ba60d5b02f20bd28e20091d6286a699174dfad515636cb09b5a72",
"sha256:e2bb577d10d09a2d8822a042a23b8d62bc3b269667c9eb8e60a6edfa000211b1",
"sha256:f97a605d7c8bc2c6d1172c2f0d5a65b24142e11a58de689046e62c2d632ca8c1"
],
"index": "pypi",
"version": "==0.761"
},
"mypy-extensions": {
"hashes": [
"sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d",
"sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"
],
"version": "==0.4.3"
},
"typed-ast": {
"hashes": [
"sha256:1170afa46a3799e18b4c977777ce137bb53c7485379d9706af8a59f2ea1aa161",
"sha256:18511a0b3e7922276346bcb47e2ef9f38fb90fd31cb9223eed42c85d1312344e",
"sha256:262c247a82d005e43b5b7f69aff746370538e176131c32dda9cb0f324d27141e",
"sha256:2b907eb046d049bcd9892e3076c7a6456c93a25bebfe554e931620c90e6a25b0",
"sha256:354c16e5babd09f5cb0ee000d54cfa38401d8b8891eefa878ac772f827181a3c",
"sha256:48e5b1e71f25cfdef98b013263a88d7145879fbb2d5185f2a0c79fa7ebbeae47",
"sha256:4e0b70c6fc4d010f8107726af5fd37921b666f5b31d9331f0bd24ad9a088e631",
"sha256:630968c5cdee51a11c05a30453f8cd65e0cc1d2ad0d9192819df9978984529f4",
"sha256:66480f95b8167c9c5c5c87f32cf437d585937970f3fc24386f313a4c97b44e34",
"sha256:71211d26ffd12d63a83e079ff258ac9d56a1376a25bc80b1cdcdf601b855b90b",
"sha256:7954560051331d003b4e2b3eb822d9dd2e376fa4f6d98fee32f452f52dd6ebb2",
"sha256:838997f4310012cf2e1ad3803bce2f3402e9ffb71ded61b5ee22617b3a7f6b6e",
"sha256:95bd11af7eafc16e829af2d3df510cecfd4387f6453355188342c3e79a2ec87a",
"sha256:bc6c7d3fa1325a0c6613512a093bc2a2a15aeec350451cbdf9e1d4bffe3e3233",
"sha256:cc34a6f5b426748a507dd5d1de4c1978f2eb5626d51326e43280941206c209e1",
"sha256:d755f03c1e4a51e9b24d899561fec4ccaf51f210d52abdf8c07ee2849b212a36",
"sha256:d7c45933b1bdfaf9f36c579671fec15d25b06c8398f113dab64c18ed1adda01d",
"sha256:d896919306dd0aa22d0132f62a1b78d11aaf4c9fc5b3410d3c666b818191630a",
"sha256:fdc1c9bbf79510b76408840e009ed65958feba92a88833cdceecff93ae8fff66",
"sha256:ffde2fbfad571af120fcbfbbc61c72469e72f550d676c3342492a9dfdefb8f12"
],
"version": "==1.4.0"
},
"typing-extensions": {
"hashes": [
"sha256:091ecc894d5e908ac75209f10d5b4f118fbdb2eb1ede6a63544054bb1edb41f2",
"sha256:910f4656f54de5993ad9304959ce9bb903f90aadc7c67a0bef07e678014e892d",
"sha256:cf8b63fedea4d89bab840ecbb93e75578af28f76f66c35889bd7065f5af88575"
],
"version": "==3.7.4.1"
}
}
}

View File

@ -7,17 +7,18 @@ from redis import Redis
from redis.exceptions import ConnectionError
from datetime import datetime, timedelta
import time
from bs4 import BeautifulSoup
from glob import glob
import json
from bs4 import BeautifulSoup # type: ignore
try:
import cfscrape
import cfscrape # type: ignore
HAS_CF = True
except ImportError:
HAS_CF = False
from glob import glob
def get_homedir():
def get_homedir() -> Path:
if not os.environ.get('LOOKYLOO_HOME'):
guessed_home = Path(__file__).resolve().parent.parent
raise MissingEnv(f"LOOKYLOO_HOME is missing. \
@ -59,8 +60,7 @@ def check_running(name: str) -> bool:
socket_path = get_socket_path(name)
try:
r = Redis(unix_socket_path=socket_path)
if r.ping():
return True
return True if r.ping() else False
except ConnectionError:
return False
@ -68,7 +68,7 @@ def check_running(name: str) -> bool:
def shutdown_requested() -> bool:
try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
return r.exists('shutdown')
return True if r.exists('shutdown') else False
except ConnectionRefusedError:
return True
except ConnectionError:
@ -119,7 +119,7 @@ def update_user_agents():
json.dump(to_store, f, indent=2)
def get_user_agents():
def get_user_agents() -> dict:
ua_files_path = str(get_homedir() / 'user_agents' / '*' / '*' / '*.json')
paths = sorted(glob(ua_files_path), reverse=True)
if not paths:

View File

@ -3,8 +3,6 @@
import json
from scrapysplashwrapper import crawl
from har2tree import CrawledTree, Har2TreeError
import pickle
from datetime import datetime
@ -21,24 +19,28 @@ from io import BytesIO
import base64
from uuid import uuid4
from pysanejs import SaneJS
from pathlib import Path
from .helpers import get_homedir, get_socket_path
from .exceptions import NoValidHarFile
from redis import Redis
from typing import Union, Dict, List, Tuple
import logging
from pysanejs import SaneJS # type: ignore
from scrapysplashwrapper import crawl # type: ignore
from har2tree import CrawledTree, Har2TreeError # type: ignore
class Lookyloo():
def __init__(self, splash_url: str='http://127.0.0.1:8050', loglevel: int=logging.DEBUG, only_global_lookups=False):
def __init__(self, splash_url: str='http://127.0.0.1:8050', loglevel: int=logging.DEBUG, only_global_lookups: bool=False) -> None:
self.__init_logger(loglevel)
self.redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
self.scrape_dir = get_homedir() / 'scraped'
self.splash_url = splash_url
self.only_global_lookups = only_global_lookups
self.redis: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
self.scrape_dir: Path = get_homedir() / 'scraped'
self.splash_url: str = splash_url
self.only_global_lookups: bool = only_global_lookups
if not self.scrape_dir.exists():
self.scrape_dir.mkdir(parents=True, exist_ok=True)
@ -50,11 +52,11 @@ class Lookyloo():
if not self.sanejs.is_up:
self.sanejs = None
def __init_logger(self, loglevel) -> None:
def __init_logger(self, loglevel: int) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(loglevel)
def _set_report_cache(self, report_dir: str):
def _set_report_cache(self, report_dir: Path) -> None:
if self.redis.exists(str(report_dir)):
return
har_files = sorted(report_dir.glob('*.har'))
@ -80,19 +82,19 @@ class Lookyloo():
self.redis.hmset(str(report_dir), cache)
self.redis.hset('lookup_dirs', uuid, str(report_dir))
def report_cache(self, report_dir) -> dict:
def report_cache(self, report_dir: Union[str, Path]) -> Dict:
if isinstance(report_dir, Path):
report_dir = str(report_dir)
return self.redis.hgetall(report_dir)
def _init_existing_dumps(self):
def _init_existing_dumps(self) -> None:
for report_dir in self.report_dirs:
if report_dir.exists():
self._set_report_cache(report_dir)
self.redis.set('cache_loaded', 1)
@property
def report_dirs(self):
def report_dirs(self) -> List[Path]:
for report_dir in self.scrape_dir.iterdir():
if report_dir.is_dir() and not report_dir.iterdir():
# Cleanup self.scrape_dir of failed runs.
@ -103,13 +105,13 @@ class Lookyloo():
f.write(str(uuid4()))
return sorted(self.scrape_dir.iterdir(), reverse=True)
def lookup_report_dir(self, uuid) -> Path:
def lookup_report_dir(self, uuid) -> Union[Path, None]:
report_dir = self.redis.hget('lookup_dirs', uuid)
if report_dir:
return Path(report_dir)
return None
def enqueue_scrape(self, query: dict):
def enqueue_scrape(self, query: dict) -> str:
perma_uuid = str(uuid4())
p = self.redis.pipeline()
p.hmset(perma_uuid, query)
@ -117,7 +119,7 @@ class Lookyloo():
p.execute()
return perma_uuid
def process_scrape_queue(self):
def process_scrape_queue(self) -> Union[bool, None]:
uuid = self.redis.spop('to_scrape')
if not uuid:
return None
@ -129,7 +131,7 @@ class Lookyloo():
return True
return False
def load_tree(self, report_dir: Path):
def load_tree(self, report_dir: Path) -> Tuple[str, dict, str, str, str, dict]:
har_files = sorted(report_dir.glob('*.har'))
try:
meta = {}
@ -151,25 +153,26 @@ class Lookyloo():
if time.time() - tmpfile.stat().st_atime > 36000:
tmpfile.unlink()
def load_image(self, report_dir):
def load_image(self, report_dir: Path) -> BytesIO:
with open(list(report_dir.glob('*.png'))[0], 'rb') as f:
return BytesIO(f.read())
def sane_js_query(self, sha512: str):
def sane_js_query(self, sha512: str) -> Dict:
if self.sanejs:
return self.sanejs.sha512(sha512)
return {'response': []}
def scrape(self, url, depth: int=1, listing: bool=True, user_agent: str=None, perma_uuid: str=None,
os: str=None, browser: str=None):
def scrape(self, url: str, depth: int=1, listing: bool=True, user_agent: str=None, perma_uuid: str=None,
os: str=None, browser: str=None) -> Union[bool, str]:
if not url.startswith('http'):
url = f'http://{url}'
if self.only_global_lookups:
splitted_url = urlsplit(url)
if splitted_url.netloc:
ip = socket.gethostbyname(splitted_url.hostname)
if not ipaddress.ip_address(ip).is_global:
return False
if splitted_url.hostname:
ip = socket.gethostbyname(splitted_url.hostname)
if not ipaddress.ip_address(ip).is_global:
return False
else:
return False
@ -187,16 +190,16 @@ class Lookyloo():
png = base64.b64decode(item['png'])
child_frames = item['childFrames']
html = item['html']
with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as f:
json.dump(harfile, f)
with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as f:
f.write(png)
with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as f:
f.write(html)
with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as f:
json.dump(child_frames, f)
with (dirpath / 'uuid').open('w') as f:
f.write(perma_uuid)
with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as _har:
json.dump(harfile, _har)
with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as _img:
_img.write(png)
with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as _html:
_html.write(html)
with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as _iframes:
json.dump(child_frames, _iframes)
with (dirpath / 'uuid').open('w') as _uuid:
_uuid.write(perma_uuid)
if not listing: # Write no_index marker
(dirpath / 'no_index').touch()
if os or browser:
@ -205,7 +208,7 @@ class Lookyloo():
meta['os'] = os
if browser:
meta['browser'] = browser
with (dirpath / 'meta').open('w') as f:
json.dump(meta, f)
with (dirpath / 'meta').open('w') as _meta:
json.dump(meta, _meta)
self._set_report_cache(dirpath)
return perma_uuid

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from setuptools import setup
from setuptools import setup # type: ignore
setup(

View File

@ -7,18 +7,20 @@ from zipfile import ZipFile, ZIP_DEFLATED
from io import BytesIO
import os
import logging
from pathlib import Path
from flask import Flask, render_template, request, session, send_file, redirect, url_for, Response
from flask_bootstrap import Bootstrap
from flask_bootstrap import Bootstrap # type: ignore
from lookyloo.helpers import get_homedir, update_user_agents, get_user_agents
from lookyloo.lookyloo import Lookyloo
from lookyloo.exceptions import NoValidHarFile
from typing import Tuple
app = Flask(__name__)
app: Flask = Flask(__name__)
secret_file_path = get_homedir() / 'secret_key'
secret_file_path: Path = get_homedir() / 'secret_key'
if not secret_file_path.exists() or secret_file_path.stat().st_size < 64:
with secret_file_path.open('wb') as f:
@ -32,21 +34,20 @@ app.config['BOOTSTRAP_SERVE_LOCAL'] = True
app.config['SESSION_COOKIE_NAME'] = 'lookyloo'
app.debug = False
splash_url: str = 'http://127.0.0.1:8050'
# API entry point for splash
if os.environ.get('SPLASH_URL'):
splash_url = os.environ.get('SPLASH_URL')
else:
splash_url = 'http://127.0.0.1:8050'
splash_url = os.environ['SPLASH_URL']
# Splash log level
loglevel = logging.DEBUG
# Set it to True if your instance is publicly available so users aren't able to scan your internal network
only_global_lookups = False
only_global_lookups: bool = False
lookyloo = Lookyloo(splash_url=splash_url, loglevel=loglevel, only_global_lookups=only_global_lookups)
lookyloo: Lookyloo = Lookyloo(splash_url=splash_url, loglevel=loglevel, only_global_lookups=only_global_lookups)
# keep
def load_tree(report_dir):
def load_tree(report_dir: Path) -> Tuple[dict, str, str, str, dict]:
session.clear()
temp_file_name, tree_json, tree_time, tree_ua, tree_root_url, meta = lookyloo.load_tree(report_dir)
session["tree"] = temp_file_name