mirror of https://github.com/CIRCL/lookyloo
chg: rename and cleanup methods
parent
25c1a6dbce
commit
cd7b050cb0
|
@ -120,8 +120,8 @@ class Lookyloo():
|
|||
json.dump(to_store, f, indent=2)
|
||||
|
||||
def _cache_capture(self, capture_uuid: str) -> CrawledTree:
|
||||
'''Generate the pickle, add capture in the indexes'''
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
'''Generate the pickle, set the cache, add capture in the indexes'''
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
har_files = sorted(capture_dir.glob('*.har'))
|
||||
# NOTE: We only index the public captures
|
||||
index = True
|
||||
|
@ -131,7 +131,6 @@ class Lookyloo():
|
|||
self._resolve_dns(ct)
|
||||
# Force update cache of the capture (takes care of the incomplete redirect key)
|
||||
self._set_capture_cache(capture_dir, force=True)
|
||||
# getting the cache triggers an update of the said cache. We want it there.
|
||||
cache = self.capture_cache(capture_uuid)
|
||||
if not cache:
|
||||
raise LookylooException(f'Broken cache for {capture_dir}')
|
||||
|
@ -224,7 +223,7 @@ class Lookyloo():
|
|||
def get_crawled_tree(self, capture_uuid: str) -> CrawledTree:
|
||||
'''Get the generated tree in ETE Toolkit format.
|
||||
Loads the pickle if it exists, creates it otherwise.'''
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
ct = load_pickle_tree(capture_dir)
|
||||
if not ct:
|
||||
ct = self._cache_capture(capture_uuid)
|
||||
|
@ -247,7 +246,7 @@ class Lookyloo():
|
|||
|
||||
def remove_pickle(self, capture_uuid: str) -> None:
|
||||
'''Remove the pickle from a specific capture.'''
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
remove_pickle_tree(capture_dir)
|
||||
|
||||
def rebuild_cache(self) -> None:
|
||||
|
@ -277,7 +276,7 @@ class Lookyloo():
|
|||
|
||||
def get_meta(self, capture_uuid: str) -> Dict[str, str]:
|
||||
'''Get the meta informations from a capture (mostly, details about the User Agent used.)'''
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
meta = {}
|
||||
if (capture_dir / 'meta').exists():
|
||||
with open((capture_dir / 'meta'), 'r') as f:
|
||||
|
@ -286,7 +285,7 @@ class Lookyloo():
|
|||
|
||||
def categories_capture(self, capture_uuid: str) -> Dict[str, Any]:
|
||||
'''Get all the categories related to a capture, in MISP Taxonomies format'''
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
# get existing categories if possible
|
||||
if (capture_dir / 'categories').exists():
|
||||
with (capture_dir / 'categories').open() as f:
|
||||
|
@ -301,7 +300,7 @@ class Lookyloo():
|
|||
# Make sure the category is mappable to a taxonomy.
|
||||
self.taxonomies.revert_machinetag(category)
|
||||
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
# get existing categories if possible
|
||||
if (capture_dir / 'categories').exists():
|
||||
with (capture_dir / 'categories').open() as f:
|
||||
|
@ -316,7 +315,7 @@ class Lookyloo():
|
|||
'''Remove a category (MISP Taxonomy tag) from a capture.'''
|
||||
if not get_config('generic', 'enable_categorization'):
|
||||
return
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
# get existing categories if possible
|
||||
if (capture_dir / 'categories').exists():
|
||||
with (capture_dir / 'categories').open() as f:
|
||||
|
@ -460,7 +459,7 @@ class Lookyloo():
|
|||
"""Add the capture in the hidden pool (not shown on the front page)
|
||||
NOTE: it won't remove the correlations until they are rebuilt.
|
||||
"""
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
self.redis.hset(str(capture_dir), 'no_index', 1)
|
||||
(capture_dir / 'no_index').touch()
|
||||
|
||||
|
@ -469,7 +468,7 @@ class Lookyloo():
|
|||
'''All the capture UUIDs present in the cache.'''
|
||||
return self.redis.hkeys('lookup_dirs')
|
||||
|
||||
def sorted_cache(self, capture_uuids: Iterable[str]=[]) -> List[CaptureCache]:
|
||||
def sorted_capture_cache(self, capture_uuids: Iterable[str]=[]) -> List[CaptureCache]:
|
||||
'''Get all the captures in the cache, sorted by timestamp (new -> old).'''
|
||||
all_cache: List[CaptureCache] = []
|
||||
p = self.redis.pipeline()
|
||||
|
@ -494,7 +493,7 @@ class Lookyloo():
|
|||
def capture_cache(self, capture_uuid: str) -> Optional[CaptureCache]:
|
||||
"""Get the cache from redis.
|
||||
NOTE: Doesn't try to build the pickle"""
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
cached: Dict[str, Any] = self.redis.hgetall(str(capture_dir))
|
||||
if not cached:
|
||||
self.logger.warning(f'No cache available for {capture_dir}.')
|
||||
|
@ -527,7 +526,7 @@ class Lookyloo():
|
|||
f.write(str(uuid4()))
|
||||
return sorted(self.capture_dir.iterdir(), reverse=True)
|
||||
|
||||
def lookup_capture_dir(self, capture_uuid: str) -> Path:
|
||||
def _get_capture_dir(self, capture_uuid: str) -> Path:
|
||||
'''Use the cache to get a capture directory from a capture UUID'''
|
||||
capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore
|
||||
if not capture_dir:
|
||||
|
@ -635,7 +634,7 @@ class Lookyloo():
|
|||
def _get_raw(self, capture_uuid: str, extension: str='*', all_files: bool=True) -> BytesIO:
|
||||
'''Get file(s) from the capture directory'''
|
||||
try:
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
capture_dir = self._get_capture_dir(capture_uuid)
|
||||
except MissingUUID:
|
||||
return BytesIO(f'Capture {capture_uuid} not unavailable, try again later.'.encode())
|
||||
except NoValidHarFile:
|
||||
|
@ -792,9 +791,8 @@ class Lookyloo():
|
|||
|
||||
def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
|
||||
'''Returns all the captures related to a hash (sha512), used in the web interface.'''
|
||||
captures: List[Tuple[str, str]] = []
|
||||
total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1)
|
||||
cached_captures = self.sorted_cache([d[0] for d in details])
|
||||
cached_captures = self.sorted_capture_cache([d[0] for d in details])
|
||||
captures = [(cache.uuid, cache.title) for cache in cached_captures]
|
||||
domains = self.indexing.get_body_hash_domains(body_hash)
|
||||
return captures, domains
|
||||
|
@ -823,7 +821,7 @@ class Lookyloo():
|
|||
|
||||
def get_url_occurrences(self, url: str, limit: int=20) -> List[Dict]:
|
||||
'''Get the most recent captures and URL nodes where the URL has been seen.'''
|
||||
captures = self.sorted_cache(self.indexing.get_captures_url(url))
|
||||
captures = self.sorted_capture_cache(self.indexing.get_captures_url(url))
|
||||
|
||||
to_return: List[Dict] = []
|
||||
for capture in captures[:limit]:
|
||||
|
@ -843,7 +841,7 @@ class Lookyloo():
|
|||
|
||||
def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False, limit: int=20) -> List[Dict]:
|
||||
'''Get the most recent captures and URL nodes where the hostname has been seen.'''
|
||||
captures = self.sorted_cache(self.indexing.get_captures_hostname(hostname))
|
||||
captures = self.sorted_capture_cache(self.indexing.get_captures_hostname(hostname))
|
||||
|
||||
to_return: List[Dict] = []
|
||||
for capture in captures[:limit]:
|
||||
|
@ -869,9 +867,9 @@ class Lookyloo():
|
|||
to_return.append(to_append)
|
||||
return to_return
|
||||
|
||||
def get_cookie_name_investigator(self, cookie_name: str):
|
||||
def get_cookie_name_investigator(self, cookie_name: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float, List[Tuple[str, float]]]]]:
|
||||
'''Returns all the captures related to a cookie name entry, used in the web interface.'''
|
||||
cached_captures = self.sorted_cache([entry[0] for entry in self.indexing.get_cookies_names_captures(cookie_name)])
|
||||
cached_captures = self.sorted_capture_cache([entry[0] for entry in self.indexing.get_cookies_names_captures(cookie_name)])
|
||||
captures = [(cache.uuid, cache.title) for cache in cached_captures]
|
||||
domains = [(domain, freq, self.indexing.cookies_names_domains_values(cookie_name, domain))
|
||||
for domain, freq in self.indexing.get_cookie_domains(cookie_name)]
|
||||
|
@ -892,7 +890,7 @@ class Lookyloo():
|
|||
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname))
|
||||
return total_captures, captures_list
|
||||
|
||||
def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode):
|
||||
def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode) -> Tuple[Optional[Union[str, List[Any]]], Optional[Tuple[bool, Any]]]:
|
||||
''' There are a few different sources to figure out known vs. legitimate content,
|
||||
this method normalize it for the web interface.'''
|
||||
known: Optional[Union[str, List[Any]]] = None
|
||||
|
@ -1140,7 +1138,7 @@ class Lookyloo():
|
|||
stats: Dict[int, Dict[int, Dict[str, Any]]] = {}
|
||||
weeks_stats: Dict[int, Dict] = {}
|
||||
|
||||
for cache in self.sorted_cache():
|
||||
for cache in self.sorted_capture_cache():
|
||||
date_submission: datetime = cache.timestamp
|
||||
|
||||
if date_submission.year not in stats:
|
||||
|
|
|
@ -511,7 +511,7 @@ def index_generic(show_hidden: bool=False, category: Optional[str]=None):
|
|||
else:
|
||||
cut_time = None # type: ignore
|
||||
|
||||
for cached in lookyloo.sorted_cache():
|
||||
for cached in lookyloo.sorted_capture_cache():
|
||||
if cut_time and cached.timestamp < cut_time:
|
||||
continue
|
||||
if category:
|
||||
|
|
Loading…
Reference in New Issue