From cd7b050cb0b32e3634578eb985fa8319dbeffbea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 18 Mar 2021 18:47:54 +0100 Subject: [PATCH] chg: rename and cleanup methods --- lookyloo/lookyloo.py | 42 ++++++++++++++++++++--------------------- website/web/__init__.py | 2 +- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 38d876c..8438b37 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -120,8 +120,8 @@ class Lookyloo(): json.dump(to_store, f, indent=2) def _cache_capture(self, capture_uuid: str) -> CrawledTree: - '''Generate the pickle, add capture in the indexes''' - capture_dir = self.lookup_capture_dir(capture_uuid) + '''Generate the pickle, set the cache, add capture in the indexes''' + capture_dir = self._get_capture_dir(capture_uuid) har_files = sorted(capture_dir.glob('*.har')) # NOTE: We only index the public captures index = True @@ -131,7 +131,6 @@ class Lookyloo(): self._resolve_dns(ct) # Force update cache of the capture (takes care of the incomplete redirect key) self._set_capture_cache(capture_dir, force=True) - # getting the cache triggers an update of the said cache. We want it there. cache = self.capture_cache(capture_uuid) if not cache: raise LookylooException(f'Broken cache for {capture_dir}') @@ -224,7 +223,7 @@ class Lookyloo(): def get_crawled_tree(self, capture_uuid: str) -> CrawledTree: '''Get the generated tree in ETE Toolkit format. Loads the pickle if it exists, creates it otherwise.''' - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) ct = load_pickle_tree(capture_dir) if not ct: ct = self._cache_capture(capture_uuid) @@ -247,7 +246,7 @@ class Lookyloo(): def remove_pickle(self, capture_uuid: str) -> None: '''Remove the pickle from a specific capture.''' - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) remove_pickle_tree(capture_dir) def rebuild_cache(self) -> None: @@ -277,7 +276,7 @@ class Lookyloo(): def get_meta(self, capture_uuid: str) -> Dict[str, str]: '''Get the meta informations from a capture (mostly, details about the User Agent used.)''' - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) meta = {} if (capture_dir / 'meta').exists(): with open((capture_dir / 'meta'), 'r') as f: @@ -286,7 +285,7 @@ class Lookyloo(): def categories_capture(self, capture_uuid: str) -> Dict[str, Any]: '''Get all the categories related to a capture, in MISP Taxonomies format''' - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) # get existing categories if possible if (capture_dir / 'categories').exists(): with (capture_dir / 'categories').open() as f: @@ -301,7 +300,7 @@ class Lookyloo(): # Make sure the category is mappable to a taxonomy. self.taxonomies.revert_machinetag(category) - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) # get existing categories if possible if (capture_dir / 'categories').exists(): with (capture_dir / 'categories').open() as f: @@ -316,7 +315,7 @@ class Lookyloo(): '''Remove a category (MISP Taxonomy tag) from a capture.''' if not get_config('generic', 'enable_categorization'): return - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) # get existing categories if possible if (capture_dir / 'categories').exists(): with (capture_dir / 'categories').open() as f: @@ -460,7 +459,7 @@ class Lookyloo(): """Add the capture in the hidden pool (not shown on the front page) NOTE: it won't remove the correlations until they are rebuilt. """ - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) self.redis.hset(str(capture_dir), 'no_index', 1) (capture_dir / 'no_index').touch() @@ -469,7 +468,7 @@ class Lookyloo(): '''All the capture UUIDs present in the cache.''' return self.redis.hkeys('lookup_dirs') - def sorted_cache(self, capture_uuids: Iterable[str]=[]) -> List[CaptureCache]: + def sorted_capture_cache(self, capture_uuids: Iterable[str]=[]) -> List[CaptureCache]: '''Get all the captures in the cache, sorted by timestamp (new -> old).''' all_cache: List[CaptureCache] = [] p = self.redis.pipeline() @@ -494,7 +493,7 @@ class Lookyloo(): def capture_cache(self, capture_uuid: str) -> Optional[CaptureCache]: """Get the cache from redis. NOTE: Doesn't try to build the pickle""" - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) cached: Dict[str, Any] = self.redis.hgetall(str(capture_dir)) if not cached: self.logger.warning(f'No cache available for {capture_dir}.') @@ -527,7 +526,7 @@ class Lookyloo(): f.write(str(uuid4())) return sorted(self.capture_dir.iterdir(), reverse=True) - def lookup_capture_dir(self, capture_uuid: str) -> Path: + def _get_capture_dir(self, capture_uuid: str) -> Path: '''Use the cache to get a capture directory from a capture UUID''' capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore if not capture_dir: @@ -635,7 +634,7 @@ class Lookyloo(): def _get_raw(self, capture_uuid: str, extension: str='*', all_files: bool=True) -> BytesIO: '''Get file(s) from the capture directory''' try: - capture_dir = self.lookup_capture_dir(capture_uuid) + capture_dir = self._get_capture_dir(capture_uuid) except MissingUUID: return BytesIO(f'Capture {capture_uuid} not unavailable, try again later.'.encode()) except NoValidHarFile: @@ -792,9 +791,8 @@ class Lookyloo(): def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]: '''Returns all the captures related to a hash (sha512), used in the web interface.''' - captures: List[Tuple[str, str]] = [] total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1) - cached_captures = self.sorted_cache([d[0] for d in details]) + cached_captures = self.sorted_capture_cache([d[0] for d in details]) captures = [(cache.uuid, cache.title) for cache in cached_captures] domains = self.indexing.get_body_hash_domains(body_hash) return captures, domains @@ -823,7 +821,7 @@ class Lookyloo(): def get_url_occurrences(self, url: str, limit: int=20) -> List[Dict]: '''Get the most recent captures and URL nodes where the URL has been seen.''' - captures = self.sorted_cache(self.indexing.get_captures_url(url)) + captures = self.sorted_capture_cache(self.indexing.get_captures_url(url)) to_return: List[Dict] = [] for capture in captures[:limit]: @@ -843,7 +841,7 @@ class Lookyloo(): def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False, limit: int=20) -> List[Dict]: '''Get the most recent captures and URL nodes where the hostname has been seen.''' - captures = self.sorted_cache(self.indexing.get_captures_hostname(hostname)) + captures = self.sorted_capture_cache(self.indexing.get_captures_hostname(hostname)) to_return: List[Dict] = [] for capture in captures[:limit]: @@ -869,9 +867,9 @@ class Lookyloo(): to_return.append(to_append) return to_return - def get_cookie_name_investigator(self, cookie_name: str): + def get_cookie_name_investigator(self, cookie_name: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float, List[Tuple[str, float]]]]]: '''Returns all the captures related to a cookie name entry, used in the web interface.''' - cached_captures = self.sorted_cache([entry[0] for entry in self.indexing.get_cookies_names_captures(cookie_name)]) + cached_captures = self.sorted_capture_cache([entry[0] for entry in self.indexing.get_cookies_names_captures(cookie_name)]) captures = [(cache.uuid, cache.title) for cache in cached_captures] domains = [(domain, freq, self.indexing.cookies_names_domains_values(cookie_name, domain)) for domain, freq in self.indexing.get_cookie_domains(cookie_name)] @@ -892,7 +890,7 @@ class Lookyloo(): captures_list['different_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname)) return total_captures, captures_list - def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode): + def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode) -> Tuple[Optional[Union[str, List[Any]]], Optional[Tuple[bool, Any]]]: ''' There are a few different sources to figure out known vs. legitimate content, this method normalize it for the web interface.''' known: Optional[Union[str, List[Any]]] = None @@ -1140,7 +1138,7 @@ class Lookyloo(): stats: Dict[int, Dict[int, Dict[str, Any]]] = {} weeks_stats: Dict[int, Dict] = {} - for cache in self.sorted_cache(): + for cache in self.sorted_capture_cache(): date_submission: datetime = cache.timestamp if date_submission.year not in stats: diff --git a/website/web/__init__.py b/website/web/__init__.py index 9c328ea..490cdc9 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -511,7 +511,7 @@ def index_generic(show_hidden: bool=False, category: Optional[str]=None): else: cut_time = None # type: ignore - for cached in lookyloo.sorted_cache(): + for cached in lookyloo.sorted_capture_cache(): if cut_time and cached.timestamp < cut_time: continue if category: