From 4f52804361466b3a17fd8c4b3a23a1144e84a3af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 12 Oct 2020 12:15:07 +0200 Subject: [PATCH] chg: Make new version of mypy happy --- lookyloo/helpers.py | 13 ++++++--- lookyloo/indexing.py | 9 +++--- lookyloo/lookyloo.py | 62 ++++++++++++++++++++--------------------- website/web/__init__.py | 8 +++--- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index ef3f5750..cbee9e84 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -257,21 +257,26 @@ def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, An return to_return -def load_cookies(cookie_pseudofile: Optional[BufferedIOBase]=None) -> List[Dict[str, str]]: +def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str]]=None) -> List[Dict[str, Union[str, bool]]]: + cookies: List[Dict[str, Union[str, bool]]] if cookie_pseudofile: - cookies = json.load(cookie_pseudofile) + if isinstance(cookie_pseudofile, str): + cookies = json.loads(cookie_pseudofile) + else: + cookies = json.load(cookie_pseudofile) else: if not (get_homedir() / 'cookies.json').exists(): return [] with (get_homedir() / 'cookies.json').open() as f: cookies = json.load(f) - to_return = [] + to_return: List[Dict[str, Union[str, bool]]] = [] try: for cookie in cookies: + to_add: Dict[str, Union[str, bool]] if 'Host raw' in cookie: # Cookie export format for Cookie Quick Manager - u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0] + u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0] # type: ignore to_add = {'path': cookie['Path raw'], 'name': cookie['Name raw'], 'httpOnly': cookie['HTTP only raw'] == 'true', diff --git a/lookyloo/indexing.py b/lookyloo/indexing.py index 66739bc1..6f400d8c 100644 --- a/lookyloo/indexing.py +++ b/lookyloo/indexing.py @@ -34,7 +34,7 @@ class Indexing(): return self.redis.zrevrange(f'cn|{cookie_name}', 0, -1, withscores=True) def get_cookies_names_captures(self, cookie_name: str) -> List[Tuple[str, str]]: - return [uuids.split('|')for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')] + return [uuids.split('|') for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')] # type: ignore def index_cookies_capture(self, crawled_tree: CrawledTree) -> None: if self.redis.sismember('indexed_cookies', crawled_tree.uuid): @@ -118,7 +118,7 @@ class Indexing(): filter_capture_uuid: Optional[str]=None, limit: int=20) -> Tuple[int, List[Tuple[str, str, str, bool]]]: to_return: List[Tuple[str, str, str, bool]] = [] - all_captures = self.redis.smembers(f'bh|{body_hash}|captures') + all_captures: Set[str] = self.redis.smembers(f'bh|{body_hash}|captures') # type: ignore len_captures = len(all_captures) for capture_uuid in list(all_captures)[:limit]: if capture_uuid == filter_capture_uuid: @@ -127,10 +127,11 @@ class Indexing(): continue for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1): url_uuid, hostnode_uuid, url = entry.split('|', 2) + hostname: str = urlsplit(url).hostname if filter_url: - to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, url == filter_url)) + to_return.append((capture_uuid, hostnode_uuid, hostname, url == filter_url)) else: - to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, False)) + to_return.append((capture_uuid, hostnode_uuid, hostname, False)) return len_captures, to_return def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]: diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index a753a992..62d7efb1 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -95,15 +95,15 @@ class Lookyloo(): uas = Counter([entry.split('|', 1)[1] for entry in entries]) for ua, count in uas.most_common(): parsed_ua = UserAgent(ua) - if not parsed_ua.platform or not parsed_ua.browser: # type: ignore + if not parsed_ua.platform or not parsed_ua.browser: continue - if parsed_ua.platform not in to_store: # type: ignore - to_store[parsed_ua.platform] = {} # type: ignore - if f'{parsed_ua.browser} {parsed_ua.version}' not in to_store[parsed_ua.platform]: # type: ignore - to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'] = [] # type: ignore - to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'].append(parsed_ua.string) # type: ignore - to_store['by_frequency'].append({'os': parsed_ua.platform, # type: ignore - 'browser': f'{parsed_ua.browser} {parsed_ua.version}', # type: ignore + if parsed_ua.platform not in to_store: + to_store[parsed_ua.platform] = {} + if f'{parsed_ua.browser} {parsed_ua.version}' not in to_store[parsed_ua.platform]: + to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'] = [] + to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'].append(parsed_ua.string) + to_store['by_frequency'].append({'os': parsed_ua.platform, + 'browser': f'{parsed_ua.browser} {parsed_ua.version}', 'useragent': parsed_ua.string}) with self_generated_ua_file.open('w') as f: json.dump(to_store, f, indent=2) @@ -355,7 +355,7 @@ class Lookyloo(): if error_cache: self.logger.warning(error_cache['error']) - self.redis.hmset(str(capture_dir), error_cache) + self.redis.hmset(str(capture_dir), error_cache) # type: ignore self.redis.hset('lookup_dirs', uuid, str(capture_dir)) if fatal_error: @@ -382,7 +382,7 @@ class Lookyloo(): if (capture_dir / 'no_index').exists(): # If the folders claims anonymity cache['no_index'] = 1 - self.redis.hmset(str(capture_dir), cache) + self.redis.hmset(str(capture_dir), cache) # type: ignore self.redis.hset('lookup_dirs', uuid, str(capture_dir)) def hide_capture(self, capture_uuid: str) -> None: @@ -399,16 +399,16 @@ class Lookyloo(): def capture_uuids(self): return self.redis.hkeys('lookup_dirs') - def capture_cache(self, capture_uuid: str) -> Dict[str, Any]: + def capture_cache(self, capture_uuid: str) -> Dict[str, Union[str, Path]]: capture_dir = self.lookup_capture_dir(capture_uuid) if not capture_dir: raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache') if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1': # try to rebuild the cache self._set_capture_cache(capture_dir, force=True) - cached = self.redis.hgetall(str(capture_dir)) + cached: Dict[str, Union[str, Path]] = self.redis.hgetall(str(capture_dir)) # type: ignore if all(key in cached.keys() for key in ['uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir']): - cached['redirects'] = json.loads(cached['redirects']) + cached['redirects'] = json.loads(cached['redirects']) # type: ignore cached['capture_dir'] = Path(cached['capture_dir']) return cached elif 'error' in cached: @@ -436,7 +436,7 @@ class Lookyloo(): return sorted(self.scrape_dir.iterdir(), reverse=True) def lookup_capture_dir(self, capture_uuid: str) -> Union[Path, None]: - capture_dir = self.redis.hget('lookup_dirs', capture_uuid) + capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore if capture_dir: return Path(capture_dir) return None @@ -457,10 +457,10 @@ class Lookyloo(): uuid = self.redis.spop('to_scrape') if not uuid: return None - to_scrape = self.redis.hgetall(uuid) + to_scrape: Dict[str, Union[str, int, float]] = self.redis.hgetall(uuid) # type: ignore self.redis.delete(uuid) to_scrape['perma_uuid'] = uuid - if self.scrape(**to_scrape): + if self.scrape(**to_scrape): # type: ignore self.logger.info(f'Processed {to_scrape["url"]}') return True return False @@ -473,10 +473,10 @@ class Lookyloo(): initial_url = '' cache = self.capture_cache(capture_uuid) if cache: - initial_url = cache['url'] + initial_url = cache['url'] # type: ignore if 'redirects' in cache and cache['redirects']: redirects = "Redirects:\n" - redirects += '\n'.join(cache['redirects']) + redirects += '\n'.join(cache['redirects']) # type: ignore else: redirects = "No redirects." @@ -512,15 +512,15 @@ class Lookyloo(): return ua = UserAgent(tree.root_hartree.user_agent) to_dump = {} - if ua.platform: # type: ignore - to_dump['os'] = ua.platform # type: ignore - if ua.browser: # type: ignore - if ua.version: # type: ignore - to_dump['browser'] = f'{ua.browser} {ua.version}' # type: ignore + if ua.platform: + to_dump['os'] = ua.platform + if ua.browser: + if ua.version: + to_dump['browser'] = f'{ua.browser} {ua.version}' else: - to_dump['browser'] = ua.browser # type: ignore - if ua.language: # type: ignore - to_dump['language'] = ua.language # type: ignore + to_dump['browser'] = ua.browser + if ua.language: + to_dump['language'] = ua.language if not to_dump: # UA not recognized @@ -559,7 +559,7 @@ class Lookyloo(): def get_capture(self, capture_uuid: str) -> BytesIO: return self._get_raw(capture_uuid) - def scrape(self, url: str, cookies_pseudofile: Optional[BufferedIOBase]=None, + def scrape(self, url: str, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None, depth: int=1, listing: bool=True, user_agent: Optional[str]=None, referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None, browser: Optional[str]=None) -> Union[bool, str]: @@ -648,12 +648,12 @@ class Lookyloo(): return perma_uuid def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]: - captures = [] + captures: List[Tuple[str, str]] = [] total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1) for capture_uuid, url_uuid, url_hostname, _ in details: cache = self.capture_cache(capture_uuid) if cache: - captures.append((capture_uuid, cache['title'])) + captures.append((capture_uuid, cache['title'])) # type: ignore domains = self.indexing.get_body_hash_domains(body_hash) return captures, domains @@ -674,9 +674,9 @@ class Lookyloo(): cache = self.capture_cache(h_capture_uuid) if cache: if same_url: - captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) + captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) # type: ignore else: - captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) + captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) # type: ignore return total_captures, captures_list def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode): diff --git a/website/web/__init__.py b/website/web/__init__.py index 5dcb8c63..2c355751 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -224,10 +224,10 @@ def redirects(tree_uuid: str): return Response('Not available.', mimetype='text/text') if not cache['redirects']: return Response('No redirects.', mimetype='text/text') - if cache['url'] == cache['redirects'][0]: - to_return = BytesIO('\n'.join(cache['redirects']).encode()) + if cache['url'] == cache['redirects'][0]: # type: ignore + to_return = BytesIO('\n'.join(cache['redirects']).encode()) # type: ignore else: - to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode()) + to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode()) # type: ignore return send_file(to_return, mimetype='text/text', as_attachment=True, attachment_filename='redirects.txt') @@ -364,7 +364,7 @@ def index_generic(show_hidden: bool=False): if 'timestamp' not in cached: # this is a buggy capture, skip continue - if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time: + if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time: # type: ignore continue titles.append((cached['uuid'], cached['title'], cached['timestamp'], cached['url'], cached['redirects'], True if cached['incomplete_redirects'] == '1' else False))