chg: Make the cache entries a dataclass

Fix #99
pull/156/head
Raphaël Vinot 2021-01-14 17:12:16 +01:00
parent ffc2de3cf2
commit 6149df06eb
4 changed files with 100 additions and 72 deletions

View File

@ -33,7 +33,9 @@ def main():
if lookyloo.is_public_instance:
cache = lookyloo.capture_cache(capture_uuid)
if cache.get('no_index') is not None:
if not cache:
continue
if cache.no_index is not None:
index = False
# NOTE: these methods do nothing if we just generated the pickle when calling lookyloo.get_crawled_tree

31
lookyloo/capturecache.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from dataclasses import dataclass
from datetime import datetime
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class CaptureCache():
__default_cache_keys: Tuple[str, str, str, str, str, str] = \
('uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir')
def __init__(self, cache_entry: Dict[str, Any]):
if all(key in cache_entry.keys() for key in self.__default_cache_keys):
self.uuid: str = cache_entry['uuid']
self.title: str = cache_entry['title']
self.timestamp: datetime = datetime.strptime(cache_entry['timestamp'], '%Y-%m-%dT%H:%M:%S.%f%z')
self.url: str = cache_entry['url']
self.redirects: List[str] = json.loads(cache_entry['redirects'])
self.capture_dir: Path = cache_entry['capture_dir']
elif not cache_entry.get('error'):
missing = set(self.__default_cache_keys) - set(cache_entry.keys())
raise Exception(f'Missing keys ({missing}), no error message. It should not happen.')
self.error: Optional[str] = cache_entry.get('error')
self.incomplete_redirects: bool = True if cache_entry.get('incomplete_redirects') == 1 else False
self.no_index: bool = True if cache_entry.get('no_index') == 1 else False
self.categories: List[str] = json.loads(cache_entry['categories']) if cache_entry.get('categories') else []

View File

@ -36,6 +36,7 @@ from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree,
remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains)
from .modules import VirusTotal, SaneJavaScript, PhishingInitiative
from .capturecache import CaptureCache
from .context import Context
from .indexing import Indexing
@ -128,8 +129,10 @@ class Lookyloo():
self._resolve_dns(ct)
# getting the cache triggers an update of the said cache. We want it there.
cache = self.capture_cache(capture_uuid)
if not cache:
raise LookylooException(f'Broken cache for {capture_dir}')
if self.is_public_instance:
if cache.get('no_index') is not None:
if cache.no_index:
index = False
if index:
self.indexing.index_cookies_capture(ct)
@ -457,7 +460,7 @@ class Lookyloo():
@property
def sorted_cache(self):
'''Get all the captures in the cache, sorted by timestamp (new -> old).'''
all_cache: List[Dict[str, Union[str, Path]]] = []
all_cache: List[CaptureCache] = []
p = self.redis.pipeline()
capture_uuids = self.capture_uuids
if not capture_uuids:
@ -466,43 +469,30 @@ class Lookyloo():
for directory in self.redis.hmget('lookup_dirs', *capture_uuids):
if directory:
p.hgetall(directory)
all_cache = []
for c in p.execute():
if not c:
continue
if all(key in c.keys() for key in ['uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir']):
c['redirects'] = json.loads(c['redirects'])
c['capture_dir'] = Path(c['capture_dir'])
elif 'error' in c:
pass
else:
continue
if 'timestamp' not in c:
continue
if 'categories' in c:
c['categories'] = json.loads(c['categories'])
all_cache.append(c)
return sorted(all_cache, key=operator.itemgetter('timestamp'), reverse=True)
c = CaptureCache(c)
if hasattr(c, 'timestamp'):
all_cache.append(c)
return sorted(all_cache, key=operator.attrgetter('timestamp'), reverse=True)
def capture_cache(self, capture_uuid: str) -> Dict[str, Union[str, Path, List]]:
def capture_cache(self, capture_uuid: str) -> Optional[CaptureCache]:
"""Get the cache from redis.
NOTE: Doesn't try to build the pickle"""
capture_dir = self.lookup_capture_dir(capture_uuid)
if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1':
# try to rebuild the cache
self._set_capture_cache(capture_dir, force=True)
cached: Dict[str, Union[str, Path, List]] = self.redis.hgetall(str(capture_dir)) # type: ignore
if all(key in cached.keys() for key in ['uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir']):
cached['redirects'] = json.loads(cached['redirects']) # type: ignore
cached['capture_dir'] = Path(cached['capture_dir']) # type: ignore
if 'categories' in cached:
cached['categories'] = json.loads(cached['categories']) # type: ignore
return cached
elif 'error' in cached:
return cached
else:
self.logger.warning(f'Cache ({capture_dir}) is invalid: {json.dumps(cached, indent=2)}')
return {}
cached: Dict[str, Any] = self.redis.hgetall(str(capture_dir)) # type: ignore
if not cached:
self.logger.warning(f'No cache available for {capture_dir}.')
return None
try:
return CaptureCache(cached)
except Exception as e:
self.logger.warning(f'Cache ({capture_dir}) is invalid ({e}): {json.dumps(cached, indent=2)}')
return None
def _init_existing_dumps(self) -> None:
'''Initialize the cache for all the captures'''
@ -568,10 +558,10 @@ class Lookyloo():
initial_url = ''
cache = self.capture_cache(capture_uuid)
if cache:
initial_url = cache['url'] # type: ignore
if 'redirects' in cache and cache['redirects']:
initial_url = cache.url
if cache.redirects:
redirects = "Redirects:\n"
redirects += '\n'.join(cache['redirects']) # type: ignore
redirects += '\n'.join(cache.redirects)
else:
redirects = "No redirects."
@ -776,7 +766,7 @@ class Lookyloo():
for capture_uuid, url_uuid, url_hostname, _ in details:
cache = self.capture_cache(capture_uuid)
if cache:
captures.append((capture_uuid, cache['title'])) # type: ignore
captures.append((capture_uuid, cache.title))
domains = self.indexing.get_body_hash_domains(body_hash)
return captures, domains
@ -844,7 +834,7 @@ class Lookyloo():
for capture_uuid, url_uuid in self.indexing.get_cookies_names_captures(cookie_name):
cache = self.capture_cache(capture_uuid)
if cache:
captures.append((capture_uuid, cache['title']))
captures.append((capture_uuid, cache.title))
domains = [(domain, freq, self.indexing.cookies_names_domains_values(cookie_name, domain))
for domain, freq in self.indexing.get_cookie_domains(cookie_name)]
return captures, domains
@ -859,9 +849,9 @@ class Lookyloo():
cache = self.capture_cache(h_capture_uuid)
if cache:
if same_url:
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) # type: ignore
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname))
else:
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname)) # type: ignore
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname))
return total_captures, captures_list
def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode):
@ -909,18 +899,20 @@ class Lookyloo():
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
if cache['incomplete_redirects']:
if cache.incomplete_redirects:
self._cache_capture(capture_uuid)
cache = self.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
ct = self.get_crawled_tree(capture_uuid)
event = MISPEvent()
event.info = f'Lookyloo Capture ({cache["url"]})'
event.info = f'Lookyloo Capture ({cache.url})'
event.add_attribute('link', f'https://{self.public_domain}/tree/{capture_uuid}')
initial_url = URLObject(cache["url"]) # type: ignore
redirects = [URLObject(url) for url in cache['redirects']] # type: ignore
initial_url = URLObject(cache.url)
redirects = [URLObject(url) for url in cache.redirects]
if redirects:
initial_url.add_reference(redirects[0], 'redirects-to')
@ -1068,9 +1060,12 @@ class Lookyloo():
for uuid in self.capture_uuids:
# What we get here is in a random order. This look sorts the captures
cache = self.capture_cache(uuid)
if 'timestamp' not in cache:
if not cache:
# That shouldn't happen, a warning went in the logs.
continue
date_submission: datetime = datetime.fromisoformat(cache['timestamp'].rstrip('Z')) # type: ignore
if not hasattr(cache, 'timestamp'):
continue
date_submission: datetime = cache.timestamp
if date_submission.year not in stats:
stats[date_submission.year] = {}
@ -1078,11 +1073,11 @@ class Lookyloo():
stats[date_submission.year][date_submission.month] = defaultdict(dict, **stats_dict)
stats[date_submission.year][date_submission.month]['uniq_urls'] = set()
stats[date_submission.year][date_submission.month]['submissions'] += 1
stats[date_submission.year][date_submission.month]['uniq_urls'].add(cache['url'])
if len(cache['redirects']) > 0: # type: ignore
stats[date_submission.year][date_submission.month]['uniq_urls'].add(cache.url)
if len(cache.redirects) > 0:
stats[date_submission.year][date_submission.month]['submissions_with_redirects'] += 1
stats[date_submission.year][date_submission.month]['redirects'] += len(cache['redirects']) # type: ignore
stats[date_submission.year][date_submission.month]['uniq_urls'].update(cache['redirects'])
stats[date_submission.year][date_submission.month]['redirects'] += len(cache.redirects)
stats[date_submission.year][date_submission.month]['uniq_urls'].update(cache.redirects)
if ((date_submission.year == today.year and date_submission.isocalendar()[1] >= calendar_week - 1)
or (calendar_week == 1 and date_submission.year == today.year - 1 and date_submission.isocalendar()[1] in [52, 53])):
@ -1090,11 +1085,11 @@ class Lookyloo():
weeks_stats[date_submission.isocalendar()[1]] = defaultdict(dict, **stats_dict)
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'] = set()
weeks_stats[date_submission.isocalendar()[1]]['submissions'] += 1
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'].add(cache['url'])
if len(cache['redirects']) > 0: # type: ignore
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'].add(cache.url)
if len(cache.redirects) > 0:
weeks_stats[date_submission.isocalendar()[1]]['submissions_with_redirects'] += 1
weeks_stats[date_submission.isocalendar()[1]]['redirects'] += len(cache['redirects']) # type: ignore
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'].update(cache['redirects'])
weeks_stats[date_submission.isocalendar()[1]]['redirects'] += len(cache.redirects)
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'].update(cache.redirects)
statistics: Dict[str, List] = {'weeks': [], 'years': []}
for week_number in sorted(weeks_stats.keys()):

View File

@ -6,7 +6,7 @@ from zipfile import ZipFile, ZIP_DEFLATED
from io import BytesIO, StringIO
import os
from pathlib import Path
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import json
import http
import calendar
@ -263,12 +263,12 @@ def redirects(tree_uuid: str):
cache = lookyloo.capture_cache(tree_uuid)
if not cache:
return Response('Not available.', mimetype='text/text')
if not cache['redirects']:
if not cache.redirects:
return Response('No redirects.', mimetype='text/text')
if cache['url'] == cache['redirects'][0]: # type: ignore
to_return = BytesIO('\n'.join(cache['redirects']).encode()) # type: ignore
if cache.url == cache.redirects[0]:
to_return = BytesIO('\n'.join(cache.redirects).encode())
else:
to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode()) # type: ignore
to_return = BytesIO('\n'.join([cache.url] + cache.redirects).encode())
return send_file(to_return, mimetype='text/text',
as_attachment=True, attachment_filename='redirects.txt')
@ -350,8 +350,8 @@ def tree(tree_uuid: str, urlnode_uuid: Optional[str]=None):
flash('Invalid cache.', 'error')
return redirect(url_for('index'))
if 'error' in cache:
flash(cache['error'], 'error')
if cache.error:
flash(cache.error, 'error')
try:
ct = lookyloo.get_crawled_tree(tree_uuid)
@ -362,14 +362,14 @@ def tree(tree_uuid: str, urlnode_uuid: Optional[str]=None):
start_time=ct.start_time.isoformat(),
user_agent=ct.user_agent, root_url=ct.root_url,
tree_uuid=tree_uuid,
screenshot_thumbnail=b64_thumbnail, page_title=cache['title'],
screenshot_thumbnail=b64_thumbnail, page_title=cache.title,
meta=meta, enable_mail_notification=enable_mail_notification,
enable_context_by_users=enable_context_by_users,
enable_categorization=enable_categorization,
enable_bookmark=enable_bookmark,
blur_screenshot=blur_screenshot, urlnode_uuid=urlnode_uuid,
auto_trigger_modules=auto_trigger_modules,
has_redirects=True if cache['redirects'] else False)
has_redirects=True if cache.redirects else False)
except NoValidHarFile as e:
return render_template('error.html', error_message=e)
@ -392,7 +392,7 @@ def index_generic(show_hidden: bool=False, category: Optional[str]=None):
titles = []
if time_delta_on_index:
# We want to filter the captures on the index
cut_time = datetime.now() - timedelta(**time_delta_on_index)
cut_time = (datetime.now() - timedelta(**time_delta_on_index)).replace(tzinfo=timezone.utc)
else:
cut_time = None # type: ignore
@ -400,19 +400,19 @@ def index_generic(show_hidden: bool=False, category: Optional[str]=None):
if not cached:
continue
if category:
if 'categories' not in cached or category not in cached['categories']:
if not cached.categories or category not in cached.categories:
continue
if show_hidden:
if 'no_index' not in cached:
if not cached.no_index:
# Only display the hidden ones
continue
elif 'no_index' in cached:
elif cached.no_index:
continue
if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time:
if cut_time and cached.timestamp < cut_time:
continue
titles.append((cached['uuid'], cached['title'], cached['timestamp'], cached['url'],
cached['redirects'], True if cached['incomplete_redirects'] == '1' else False))
titles.append((cached.uuid, cached.title, cached.timestamp.isoformat(), cached.url,
cached.redirects, cached.incomplete_redirects))
titles = sorted(titles, key=lambda x: (x[2], x[3]), reverse=True)
return render_template('index.html', titles=titles)
@ -700,18 +700,18 @@ def json_redirects(tree_uuid: str):
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
to_return: Dict[str, Any] = {'response': {'url': cache['url'], 'redirects': []}}
if not cache['redirects']:
to_return: Dict[str, Any] = {'response': {'url': cache.url, 'redirects': []}}
if not cache.redirects:
to_return['response']['info'] = 'No redirects'
return to_return
if cache['incomplete_redirects']:
if cache.incomplete_redirects:
# Trigger tree build, get all redirects
lookyloo.get_crawled_tree(tree_uuid)
cache = lookyloo.capture_cache(tree_uuid)
if cache:
to_return['response']['redirects'] = cache['redirects']
to_return['response']['redirects'] = cache.redirects
else:
to_return['response']['redirects'] = cache['redirects']
to_return['response']['redirects'] = cache.redirects
return jsonify(to_return)