chg: Make new version of mypy happy

pull/100/head
Raphaël Vinot 2020-10-12 12:15:07 +02:00
parent 1f13745985
commit 4f52804361
4 changed files with 49 additions and 43 deletions

View File

@ -257,21 +257,26 @@ def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, An
return to_return
def load_cookies(cookie_pseudofile: Optional[BufferedIOBase]=None) -> List[Dict[str, str]]:
def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str]]=None) -> List[Dict[str, Union[str, bool]]]:
cookies: List[Dict[str, Union[str, bool]]]
if cookie_pseudofile:
cookies = json.load(cookie_pseudofile)
if isinstance(cookie_pseudofile, str):
cookies = json.loads(cookie_pseudofile)
else:
cookies = json.load(cookie_pseudofile)
else:
if not (get_homedir() / 'cookies.json').exists():
return []
with (get_homedir() / 'cookies.json').open() as f:
cookies = json.load(f)
to_return = []
to_return: List[Dict[str, Union[str, bool]]] = []
try:
for cookie in cookies:
to_add: Dict[str, Union[str, bool]]
if 'Host raw' in cookie:
# Cookie export format for Cookie Quick Manager
u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0]
u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0] # type: ignore
to_add = {'path': cookie['Path raw'],
'name': cookie['Name raw'],
'httpOnly': cookie['HTTP only raw'] == 'true',

View File

@ -34,7 +34,7 @@ class Indexing():
return self.redis.zrevrange(f'cn|{cookie_name}', 0, -1, withscores=True)
def get_cookies_names_captures(self, cookie_name: str) -> List[Tuple[str, str]]:
return [uuids.split('|')for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')]
return [uuids.split('|') for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')] # type: ignore
def index_cookies_capture(self, crawled_tree: CrawledTree) -> None:
if self.redis.sismember('indexed_cookies', crawled_tree.uuid):
@ -118,7 +118,7 @@ class Indexing():
filter_capture_uuid: Optional[str]=None,
limit: int=20) -> Tuple[int, List[Tuple[str, str, str, bool]]]:
to_return: List[Tuple[str, str, str, bool]] = []
all_captures = self.redis.smembers(f'bh|{body_hash}|captures')
all_captures: Set[str] = self.redis.smembers(f'bh|{body_hash}|captures') # type: ignore
len_captures = len(all_captures)
for capture_uuid in list(all_captures)[:limit]:
if capture_uuid == filter_capture_uuid:
@ -127,10 +127,11 @@ class Indexing():
continue
for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1):
url_uuid, hostnode_uuid, url = entry.split('|', 2)
hostname: str = urlsplit(url).hostname
if filter_url:
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, url == filter_url))
to_return.append((capture_uuid, hostnode_uuid, hostname, url == filter_url))
else:
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, False))
to_return.append((capture_uuid, hostnode_uuid, hostname, False))
return len_captures, to_return
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:

View File

@ -95,15 +95,15 @@ class Lookyloo():
uas = Counter([entry.split('|', 1)[1] for entry in entries])
for ua, count in uas.most_common():
parsed_ua = UserAgent(ua)
if not parsed_ua.platform or not parsed_ua.browser: # type: ignore
if not parsed_ua.platform or not parsed_ua.browser:
continue
if parsed_ua.platform not in to_store: # type: ignore
to_store[parsed_ua.platform] = {} # type: ignore
if f'{parsed_ua.browser} {parsed_ua.version}' not in to_store[parsed_ua.platform]: # type: ignore
to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'] = [] # type: ignore
to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'].append(parsed_ua.string) # type: ignore
to_store['by_frequency'].append({'os': parsed_ua.platform, # type: ignore
'browser': f'{parsed_ua.browser} {parsed_ua.version}', # type: ignore
if parsed_ua.platform not in to_store:
to_store[parsed_ua.platform] = {}
if f'{parsed_ua.browser} {parsed_ua.version}' not in to_store[parsed_ua.platform]:
to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'] = []
to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'].append(parsed_ua.string)
to_store['by_frequency'].append({'os': parsed_ua.platform,
'browser': f'{parsed_ua.browser} {parsed_ua.version}',
'useragent': parsed_ua.string})
with self_generated_ua_file.open('w') as f:
json.dump(to_store, f, indent=2)
@ -355,7 +355,7 @@ class Lookyloo():
if error_cache:
self.logger.warning(error_cache['error'])
self.redis.hmset(str(capture_dir), error_cache)
self.redis.hmset(str(capture_dir), error_cache) # type: ignore
self.redis.hset('lookup_dirs', uuid, str(capture_dir))
if fatal_error:
@ -382,7 +382,7 @@ class Lookyloo():
if (capture_dir / 'no_index').exists(): # If the folders claims anonymity
cache['no_index'] = 1
self.redis.hmset(str(capture_dir), cache)
self.redis.hmset(str(capture_dir), cache) # type: ignore
self.redis.hset('lookup_dirs', uuid, str(capture_dir))
def hide_capture(self, capture_uuid: str) -> None:
@ -399,16 +399,16 @@ class Lookyloo():
def capture_uuids(self):
return self.redis.hkeys('lookup_dirs')
def capture_cache(self, capture_uuid: str) -> Dict[str, Any]:
def capture_cache(self, capture_uuid: str) -> Dict[str, Union[str, Path]]:
capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1':
# try to rebuild the cache
self._set_capture_cache(capture_dir, force=True)
cached = self.redis.hgetall(str(capture_dir))
cached: Dict[str, Union[str, Path]] = self.redis.hgetall(str(capture_dir)) # type: ignore
if all(key in cached.keys() for key in ['uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir']):
cached['redirects'] = json.loads(cached['redirects'])
cached['redirects'] = json.loads(cached['redirects']) # type: ignore
cached['capture_dir'] = Path(cached['capture_dir'])
return cached
elif 'error' in cached:
@ -436,7 +436,7 @@ class Lookyloo():
return sorted(self.scrape_dir.iterdir(), reverse=True)
def lookup_capture_dir(self, capture_uuid: str) -> Union[Path, None]:
capture_dir = self.redis.hget('lookup_dirs', capture_uuid)
capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore
if capture_dir:
return Path(capture_dir)
return None
@ -457,10 +457,10 @@ class Lookyloo():
uuid = self.redis.spop('to_scrape')
if not uuid:
return None
to_scrape = self.redis.hgetall(uuid)
to_scrape: Dict[str, Union[str, int, float]] = self.redis.hgetall(uuid) # type: ignore
self.redis.delete(uuid)
to_scrape['perma_uuid'] = uuid
if self.scrape(**to_scrape):
if self.scrape(**to_scrape): # type: ignore
self.logger.info(f'Processed {to_scrape["url"]}')
return True
return False
@ -473,10 +473,10 @@ class Lookyloo():
initial_url = ''
cache = self.capture_cache(capture_uuid)
if cache:
initial_url = cache['url']
initial_url = cache['url'] # type: ignore
if 'redirects' in cache and cache['redirects']:
redirects = "Redirects:\n"
redirects += '\n'.join(cache['redirects'])
redirects += '\n'.join(cache['redirects']) # type: ignore
else:
redirects = "No redirects."
@ -512,15 +512,15 @@ class Lookyloo():
return
ua = UserAgent(tree.root_hartree.user_agent)
to_dump = {}
if ua.platform: # type: ignore
to_dump['os'] = ua.platform # type: ignore
if ua.browser: # type: ignore
if ua.version: # type: ignore
to_dump['browser'] = f'{ua.browser} {ua.version}' # type: ignore
if ua.platform:
to_dump['os'] = ua.platform
if ua.browser:
if ua.version:
to_dump['browser'] = f'{ua.browser} {ua.version}'
else:
to_dump['browser'] = ua.browser # type: ignore
if ua.language: # type: ignore
to_dump['language'] = ua.language # type: ignore
to_dump['browser'] = ua.browser
if ua.language:
to_dump['language'] = ua.language
if not to_dump:
# UA not recognized
@ -559,7 +559,7 @@ class Lookyloo():
def get_capture(self, capture_uuid: str) -> BytesIO:
return self._get_raw(capture_uuid)
def scrape(self, url: str, cookies_pseudofile: Optional[BufferedIOBase]=None,
def scrape(self, url: str, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
browser: Optional[str]=None) -> Union[bool, str]:
@ -648,12 +648,12 @@ class Lookyloo():
return perma_uuid
def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
captures = []
captures: List[Tuple[str, str]] = []
total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1)
for capture_uuid, url_uuid, url_hostname, _ in details:
cache = self.capture_cache(capture_uuid)
if cache:
captures.append((capture_uuid, cache['title']))
captures.append((capture_uuid, cache['title'])) # type: ignore
domains = self.indexing.get_body_hash_domains(body_hash)
return captures, domains
@ -674,9 +674,9 @@ class Lookyloo():
cache = self.capture_cache(h_capture_uuid)
if cache:
if same_url:
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) # type: ignore
else:
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) # type: ignore
return total_captures, captures_list
def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode):

View File

@ -224,10 +224,10 @@ def redirects(tree_uuid: str):
return Response('Not available.', mimetype='text/text')
if not cache['redirects']:
return Response('No redirects.', mimetype='text/text')
if cache['url'] == cache['redirects'][0]:
to_return = BytesIO('\n'.join(cache['redirects']).encode())
if cache['url'] == cache['redirects'][0]: # type: ignore
to_return = BytesIO('\n'.join(cache['redirects']).encode()) # type: ignore
else:
to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode())
to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode()) # type: ignore
return send_file(to_return, mimetype='text/text',
as_attachment=True, attachment_filename='redirects.txt')
@ -364,7 +364,7 @@ def index_generic(show_hidden: bool=False):
if 'timestamp' not in cached:
# this is a buggy capture, skip
continue
if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time:
if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time: # type: ignore
continue
titles.append((cached['uuid'], cached['title'], cached['timestamp'], cached['url'],
cached['redirects'], True if cached['incomplete_redirects'] == '1' else False))