chg: Refactoring and documenting

pull/156/head
Raphaël Vinot 2021-01-12 17:22:51 +01:00
parent dd30d25683
commit 0d68844c90
2 changed files with 108 additions and 117 deletions

View File

@ -31,7 +31,7 @@ from redis import Redis
from scrapysplashwrapper import crawl from scrapysplashwrapper import crawl
from werkzeug.useragents import UserAgent from werkzeug.useragents import UserAgent
from .exceptions import NoValidHarFile, MissingUUID from .exceptions import NoValidHarFile, MissingUUID, LookylooException
from .helpers import (get_homedir, get_socket_path, load_cookies, get_config, from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree, safe_create_dir, get_email_template, load_pickle_tree,
remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains) remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains)
@ -80,10 +80,12 @@ class Lookyloo():
self._init_existing_dumps() self._init_existing_dumps()
def cache_user_agents(self, user_agent: str, remote_ip: str) -> None: def cache_user_agents(self, user_agent: str, remote_ip: str) -> None:
'''Cache the useragents of the visitors'''
today = date.today().isoformat() today = date.today().isoformat()
self.redis.zincrby(f'user_agents|{today}', 1, f'{remote_ip}|{user_agent}') self.redis.zincrby(f'user_agents|{today}', 1, f'{remote_ip}|{user_agent}')
def build_ua_file(self) -> None: def build_ua_file(self) -> None:
'''Build a file in a format compatible with the capture page'''
yesterday = (date.today() - timedelta(days=1)) yesterday = (date.today() - timedelta(days=1))
self_generated_ua_file_path = get_homedir() / 'own_user_agents' / str(yesterday.year) / f'{yesterday.month:02}' self_generated_ua_file_path = get_homedir() / 'own_user_agents' / str(yesterday.year) / f'{yesterday.month:02}'
safe_create_dir(self_generated_ua_file_path) safe_create_dir(self_generated_ua_file_path)
@ -111,11 +113,9 @@ class Lookyloo():
with self_generated_ua_file.open('w') as f: with self_generated_ua_file.open('w') as f:
json.dump(to_store, f, indent=2) json.dump(to_store, f, indent=2)
def cache_tree(self, capture_uuid: str) -> None: def _cache_capture(self, capture_uuid: str) -> None:
'''Generate the pickle, add capture in the indexes''' '''Generate the pickle, add capture in the indexes'''
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
with open((capture_dir / 'uuid'), 'r') as f: with open((capture_dir / 'uuid'), 'r') as f:
uuid = f.read() uuid = f.read()
@ -125,7 +125,7 @@ class Lookyloo():
try: try:
ct = CrawledTree(har_files, uuid) ct = CrawledTree(har_files, uuid)
self._ensure_meta(capture_dir, ct) self._ensure_meta(capture_dir, ct)
self.resolve_dns(ct) self._resolve_dns(ct)
# getting the cache triggers an update of the said cache. We want it there. # getting the cache triggers an update of the said cache. We want it there.
cache = self.capture_cache(capture_uuid) cache = self.capture_cache(capture_uuid)
if self.is_public_instance: if self.is_public_instance:
@ -144,6 +144,10 @@ class Lookyloo():
pickle.dump(ct, _p) pickle.dump(ct, _p)
def _build_cname_chain(self, known_cnames: Dict[str, Optional[str]], hostname) -> List[str]: def _build_cname_chain(self, known_cnames: Dict[str, Optional[str]], hostname) -> List[str]:
'''Returns a list of CNAMEs starting from one hostname.
The CNAMEs resolutions are made in `_resolve_dns`. A hostname can have a CNAME entry
and the CNAME entry can have an other CNAME entry, and so on multiple times.
This method loops over the hostnames until there are no CNAMES.'''
cnames: List[str] = [] cnames: List[str] = []
to_search = hostname to_search = hostname
while True: while True:
@ -154,7 +158,11 @@ class Lookyloo():
to_search = known_cnames[to_search] to_search = known_cnames[to_search]
return cnames return cnames
def resolve_dns(self, ct: CrawledTree): def _resolve_dns(self, ct: CrawledTree):
'''Resolves all domains of the tree, keeps A (IPv4), AAAA (IPv6), and CNAME entries
and store them in ips.json and cnames.json, in the capture directory.
Updates the nodes of the tree accordingly so the information is available.
'''
cnames_path = ct.root_hartree.har.path.parent / 'cnames.json' cnames_path = ct.root_hartree.har.path.parent / 'cnames.json'
ips_path = ct.root_hartree.har.path.parent / 'ips.json' ips_path = ct.root_hartree.har.path.parent / 'ips.json'
host_cnames: Dict[str, Optional[str]] = {} host_cnames: Dict[str, Optional[str]] = {}
@ -198,88 +206,72 @@ class Lookyloo():
return ct return ct
def get_crawled_tree(self, capture_uuid: str) -> CrawledTree: def get_crawled_tree(self, capture_uuid: str) -> CrawledTree:
'''Get the generated tree in ETE Toolkit format.
Loads the pickle if it exists, creates it otherwise.'''
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
ct = load_pickle_tree(capture_dir) ct = load_pickle_tree(capture_dir)
if not ct: if not ct:
self.cache_tree(capture_uuid) self._cache_capture(capture_uuid)
ct = load_pickle_tree(capture_dir) ct = load_pickle_tree(capture_dir)
if not ct: if not ct:
raise NoValidHarFile(f'Unable to get tree from {capture_dir}') raise NoValidHarFile(f'Unable to get tree from {capture_dir}')
return ct return ct
def add_context(self, capture_uuid: str, urlnode_uuid: str, ressource_hash: str, legitimate: bool, malicious: bool, details: Dict[str, Dict[str, str]]): def add_context(self, capture_uuid: str, urlnode_uuid: str, ressource_hash: str, legitimate: bool, malicious: bool, details: Dict[str, Dict[str, str]]):
'''Adds context information to a capture or a URL node'''
if malicious: if malicious:
self.context.add_malicious(ressource_hash, details['malicious']) self.context.add_malicious(ressource_hash, details['malicious'])
if legitimate: if legitimate:
self.context.add_legitimate(ressource_hash, details['legitimate']) self.context.add_legitimate(ressource_hash, details['legitimate'])
def add_to_legitimate(self, capture_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None): def add_to_legitimate(self, capture_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None):
'''Mark a full captyre as legitimate.
Iterates over all the nodes and mark them all as legitimate too.'''
ct = self.get_crawled_tree(capture_uuid) ct = self.get_crawled_tree(capture_uuid)
self.context.mark_as_legitimate(ct, hostnode_uuid, urlnode_uuid) self.context.mark_as_legitimate(ct, hostnode_uuid, urlnode_uuid)
def load_tree(self, capture_uuid: str) -> Tuple[str, str, str, str, Dict[str, str]]:
capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
ct = self.get_crawled_tree(capture_uuid)
ct = self.context.contextualize_tree(ct)
meta = {}
if (capture_dir / 'meta').exists():
with open((capture_dir / 'meta'), 'r') as f:
meta = json.load(f)
return ct.to_json(), ct.start_time.isoformat(), ct.user_agent, ct.root_url, meta
def remove_pickle(self, capture_uuid: str) -> None: def remove_pickle(self, capture_uuid: str) -> None:
'''Remove the pickle from a specific capture.'''
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
remove_pickle_tree(capture_dir) remove_pickle_tree(capture_dir)
def rebuild_cache(self) -> None: def rebuild_cache(self) -> None:
'''Flush and rebuild the redis cache. Doesn't remove the pickles.'''
self.redis.flushdb() self.redis.flushdb()
self._init_existing_dumps() self._init_existing_dumps()
def rebuild_all(self) -> None: def rebuild_all(self) -> None:
for capture_dir in self.capture_dirs: '''Flush and rebuild the redis cache, and delede all the pickles.'''
remove_pickle_tree(capture_dir) [remove_pickle_tree(capture_dir) for capture_dir in self.capture_dirs] # type: ignore
self.rebuild_cache() self.rebuild_cache()
def get_urlnode_from_tree(self, capture_uuid: str, node_uuid: str) -> URLNode: def get_urlnode_from_tree(self, capture_uuid: str, node_uuid: str) -> URLNode:
capture_dir = self.lookup_capture_dir(capture_uuid) '''Get a URL node from a tree, by UUID'''
if not capture_dir: ct = self.get_crawled_tree(capture_uuid)
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find UUID {node_uuid} in {capture_dir}')
return ct.root_hartree.get_url_node_by_uuid(node_uuid) return ct.root_hartree.get_url_node_by_uuid(node_uuid)
def get_hostnode_from_tree(self, capture_uuid: str, node_uuid: str) -> HostNode: def get_hostnode_from_tree(self, capture_uuid: str, node_uuid: str) -> HostNode:
capture_dir = self.lookup_capture_dir(capture_uuid) '''Get a host node from a tree, by UUID'''
if not capture_dir: ct = self.get_crawled_tree(capture_uuid)
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find UUID {node_uuid} in {capture_dir}')
return ct.root_hartree.get_host_node_by_uuid(node_uuid) return ct.root_hartree.get_host_node_by_uuid(node_uuid)
def get_statistics(self, capture_uuid: str) -> Dict[str, Any]: def get_statistics(self, capture_uuid: str) -> Dict[str, Any]:
capture_dir = self.lookup_capture_dir(capture_uuid) '''Get the statistics of a capture.'''
if not capture_dir: ct = self.get_crawled_tree(capture_uuid)
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
ct = load_pickle_tree(capture_dir)
if not ct:
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_dir}) is cached.')
return {}
return ct.root_hartree.stats return ct.root_hartree.stats
def categories_capture(self, capture_uuid: str) -> Dict[str, Any]: def get_meta(self, capture_uuid: str) -> Dict[str, str]:
'''Get the meta informations from a capture (mostly, details about the User Agent used.)'''
capture_dir = self.lookup_capture_dir(capture_uuid)
meta = {}
if (capture_dir / 'meta').exists():
with open((capture_dir / 'meta'), 'r') as f:
meta = json.load(f)
return meta
def categories_capture(self, capture_uuid: str) -> Dict[str, Any]:
'''Get all the categories related to a capture, in MISP Taxonomies format'''
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
# get existing categories if possible # get existing categories if possible
if (capture_dir / 'categories').exists(): if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as f: with (capture_dir / 'categories').open() as f:
@ -288,14 +280,13 @@ class Lookyloo():
return {} return {}
def categorize_capture(self, capture_uuid: str, category: str) -> None: def categorize_capture(self, capture_uuid: str, category: str) -> None:
'''Add a category (MISP Taxonomy tag) to a capture.'''
if not get_config('generic', 'enable_categorization'): if not get_config('generic', 'enable_categorization'):
return return
# Make sure the category is mappable to a taxonomy. # Make sure the category is mappable to a taxonomy.
self.taxonomies.revert_machinetag(category) self.taxonomies.revert_machinetag(category)
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
# get existing categories if possible # get existing categories if possible
if (capture_dir / 'categories').exists(): if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as f: with (capture_dir / 'categories').open() as f:
@ -307,11 +298,10 @@ class Lookyloo():
f.writelines(f'{t}\n' for t in current_categories) f.writelines(f'{t}\n' for t in current_categories)
def uncategorize_capture(self, capture_uuid: str, category: str) -> None: def uncategorize_capture(self, capture_uuid: str, category: str) -> None:
'''Remove a category (MISP Taxonomy tag) from a capture.'''
if not get_config('generic', 'enable_categorization'): if not get_config('generic', 'enable_categorization'):
return return
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
# get existing categories if possible # get existing categories if possible
if (capture_dir / 'categories').exists(): if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as f: with (capture_dir / 'categories').open() as f:
@ -323,12 +313,13 @@ class Lookyloo():
f.writelines(f'{t}\n' for t in current_categories) f.writelines(f'{t}\n' for t in current_categories)
def trigger_modules(self, capture_uuid: str, force: bool=False) -> None: def trigger_modules(self, capture_uuid: str, force: bool=False) -> None:
capture_dir = self.lookup_capture_dir(capture_uuid) '''Launch the 3rd party modules on a capture.
if not capture_dir: It uses the cached result *if* the module was triggered the same day.
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache') The `force` flag re-triggers the module regardless of the cache.'''
ct = load_pickle_tree(capture_dir) try:
if not ct: ct = self.get_crawled_tree(capture_uuid)
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_dir}) is cached.') except LookylooException:
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_uuid}) is cached.')
return return
if self.pi.available: if self.pi.available:
@ -346,12 +337,11 @@ class Lookyloo():
self.vt.url_lookup(ct.root_hartree.har.root_url, force) self.vt.url_lookup(ct.root_hartree.har.root_url, force)
def get_modules_responses(self, capture_uuid: str) -> Optional[Dict[str, Any]]: def get_modules_responses(self, capture_uuid: str) -> Optional[Dict[str, Any]]:
capture_dir = self.lookup_capture_dir(capture_uuid) '''Get the responses of the modules from the cached responses on the disk'''
if not capture_dir: try:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache') ct = self.get_crawled_tree(capture_uuid)
ct = load_pickle_tree(capture_dir) except LookylooException:
if not ct: self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_dir}) is cached.')
return None return None
to_return: Dict[str, Any] = {} to_return: Dict[str, Any] = {}
if self.vt.available: if self.vt.available:
@ -371,6 +361,7 @@ class Lookyloo():
return to_return return to_return
def _set_capture_cache(self, capture_dir: Path, force: bool=False, redis_pipeline: Optional[Redis]=None) -> None: def _set_capture_cache(self, capture_dir: Path, force: bool=False, redis_pipeline: Optional[Redis]=None) -> None:
'''Populate the redis cache for a capture. Mostly used on the index page.'''
if force or not self.redis.exists(str(capture_dir)): if force or not self.redis.exists(str(capture_dir)):
# (re)build cache # (re)build cache
pass pass
@ -455,17 +446,17 @@ class Lookyloo():
NOTE: it won't remove the correlations until they are rebuilt. NOTE: it won't remove the correlations until they are rebuilt.
""" """
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
self.redis.hset(str(capture_dir), 'no_index', 1) self.redis.hset(str(capture_dir), 'no_index', 1)
(capture_dir / 'no_index').touch() (capture_dir / 'no_index').touch()
@property @property
def capture_uuids(self): def capture_uuids(self):
'''All the capture UUIDs present in the cache.'''
return self.redis.hkeys('lookup_dirs') return self.redis.hkeys('lookup_dirs')
@property @property
def sorted_cache(self): def sorted_cache(self):
'''Get all the captures in the cache, sorted by timestamp (new -> old).'''
all_cache: List[Dict[str, Union[str, Path]]] = [] all_cache: List[Dict[str, Union[str, Path]]] = []
p = self.redis.pipeline() p = self.redis.pipeline()
capture_uuids = self.capture_uuids capture_uuids = self.capture_uuids
@ -497,8 +488,6 @@ class Lookyloo():
"""Get the cache from redis. """Get the cache from redis.
NOTE: Doesn't try to build the pickle""" NOTE: Doesn't try to build the pickle"""
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1': if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1':
# try to rebuild the cache # try to rebuild the cache
self._set_capture_cache(capture_dir, force=True) self._set_capture_cache(capture_dir, force=True)
@ -516,6 +505,7 @@ class Lookyloo():
return {} return {}
def _init_existing_dumps(self) -> None: def _init_existing_dumps(self) -> None:
'''Initialize the cache for all the captures'''
p = self.redis.pipeline() p = self.redis.pipeline()
for capture_dir in self.capture_dirs: for capture_dir in self.capture_dirs:
if capture_dir.exists(): if capture_dir.exists():
@ -525,6 +515,7 @@ class Lookyloo():
@property @property
def capture_dirs(self) -> List[Path]: def capture_dirs(self) -> List[Path]:
'''Get all the capture directories, sorder from newest to oldest.'''
for capture_dir in self.capture_dir.iterdir(): for capture_dir in self.capture_dir.iterdir():
if capture_dir.is_dir() and not capture_dir.iterdir(): if capture_dir.is_dir() and not capture_dir.iterdir():
# Cleanup self.capture_dir of failed runs. # Cleanup self.capture_dir of failed runs.
@ -535,13 +526,15 @@ class Lookyloo():
f.write(str(uuid4())) f.write(str(uuid4()))
return sorted(self.capture_dir.iterdir(), reverse=True) return sorted(self.capture_dir.iterdir(), reverse=True)
def lookup_capture_dir(self, capture_uuid: str) -> Union[Path, None]: def lookup_capture_dir(self, capture_uuid: str) -> Path:
'''Use the cache to get a capture directory from a capture UUID'''
capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore
if capture_dir: if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
return Path(capture_dir) return Path(capture_dir)
return None
def enqueue_capture(self, query: MutableMapping[str, Any]) -> str: def enqueue_capture(self, query: MutableMapping[str, Any]) -> str:
'''Enqueue a query in the capture queue (used by the API for asynchronous processing)'''
perma_uuid = str(uuid4()) perma_uuid = str(uuid4())
p = self.redis.pipeline() p = self.redis.pipeline()
for key, value in query.items(): for key, value in query.items():
@ -554,6 +547,7 @@ class Lookyloo():
return perma_uuid return perma_uuid
def process_capture_queue(self) -> Union[bool, None]: def process_capture_queue(self) -> Union[bool, None]:
'''Process a query from the capture queue'''
uuid = self.redis.spop('to_capture') uuid = self.redis.spop('to_capture')
if not uuid: if not uuid:
return None return None
@ -566,6 +560,7 @@ class Lookyloo():
return False return False
def send_mail(self, capture_uuid: str, email: str='', comment: str='') -> None: def send_mail(self, capture_uuid: str, email: str='', comment: str='') -> None:
'''Send an email notification regarding a specific capture'''
if not get_config('generic', 'enable_mail_notification'): if not get_config('generic', 'enable_mail_notification'):
return return
@ -607,6 +602,7 @@ class Lookyloo():
self.logger.warning(msg.as_string()) self.logger.warning(msg.as_string())
def _ensure_meta(self, capture_dir: Path, tree: CrawledTree) -> None: def _ensure_meta(self, capture_dir: Path, tree: CrawledTree) -> None:
'''Make sure the meta file is present, it contains information about the User Agent used for the capture.'''
metafile = capture_dir / 'meta' metafile = capture_dir / 'meta'
if metafile.exists(): if metafile.exists():
return return
@ -630,9 +626,8 @@ class Lookyloo():
json.dump(to_dump, f) json.dump(to_dump, f)
def _get_raw(self, capture_uuid: str, extension: str='*', all_files: bool=True) -> BytesIO: def _get_raw(self, capture_uuid: str, extension: str='*', all_files: bool=True) -> BytesIO:
'''Get file(s) from the capture directory'''
capture_dir = self.lookup_capture_dir(capture_uuid) capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
all_paths = sorted(list(capture_dir.glob(f'*.{extension}'))) all_paths = sorted(list(capture_dir.glob(f'*.{extension}')))
if not all_files: if not all_files:
# Only get the first one in the list # Only get the first one in the list
@ -648,15 +643,19 @@ class Lookyloo():
return to_return return to_return
def get_html(self, capture_uuid: str, all_html: bool=False) -> BytesIO: def get_html(self, capture_uuid: str, all_html: bool=False) -> BytesIO:
'''Get rendered HTML'''
return self._get_raw(capture_uuid, 'html', all_html) return self._get_raw(capture_uuid, 'html', all_html)
def get_cookies(self, capture_uuid: str, all_cookies: bool=False) -> BytesIO: def get_cookies(self, capture_uuid: str, all_cookies: bool=False) -> BytesIO:
'''Get the cookie(s)'''
return self._get_raw(capture_uuid, 'cookies.json', all_cookies) return self._get_raw(capture_uuid, 'cookies.json', all_cookies)
def get_screenshot(self, capture_uuid: str, all_images: bool=False) -> BytesIO: def get_screenshot(self, capture_uuid: str, all_images: bool=False) -> BytesIO:
'''Get the screenshot(s) of the rendered page'''
return self._get_raw(capture_uuid, 'png', all_images) return self._get_raw(capture_uuid, 'png', all_images)
def get_screenshot_thumbnail(self, capture_uuid: str, all_images: bool=False, for_datauri=False) -> Union[str, BytesIO]: def get_screenshot_thumbnail(self, capture_uuid: str, all_images: bool=False, for_datauri=False) -> Union[str, BytesIO]:
'''Get the thumbnail of the rendered page'''
size = 64, 64 size = 64, 64
screenshot = Image.open(self._get_raw(capture_uuid, 'png', all_images)) screenshot = Image.open(self._get_raw(capture_uuid, 'png', all_images))
c_screenshot = screenshot.crop((0, 0, screenshot.width, screenshot.width)) c_screenshot = screenshot.crop((0, 0, screenshot.width, screenshot.width))
@ -669,12 +668,14 @@ class Lookyloo():
return to_return return to_return
def get_capture(self, capture_uuid: str) -> BytesIO: def get_capture(self, capture_uuid: str) -> BytesIO:
'''Get all the files related to this capture.'''
return self._get_raw(capture_uuid) return self._get_raw(capture_uuid)
def capture(self, url: str, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None, def capture(self, url: str, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
depth: int=1, listing: bool=True, user_agent: Optional[str]=None, depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None, referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
browser: Optional[str]=None) -> Union[bool, str]: browser: Optional[str]=None) -> Union[bool, str]:
'''Launch a capture'''
url = url.strip() url = url.strip()
url = refang(url) url = refang(url)
if not url.startswith('http'): if not url.startswith('http'):
@ -762,6 +763,7 @@ class Lookyloo():
return perma_uuid return perma_uuid
def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]: def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
'''Returns all the captures related to a hash (sha512), used in the web interface.'''
captures: List[Tuple[str, str]] = [] captures: List[Tuple[str, str]] = []
total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1) total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1)
for capture_uuid, url_uuid, url_hostname, _ in details: for capture_uuid, url_uuid, url_hostname, _ in details:
@ -772,17 +774,13 @@ class Lookyloo():
return captures, domains return captures, domains
def get_body_hash_full(self, body_hash: str) -> Tuple[Dict[str, List[Dict[str, str]]], BytesIO]: def get_body_hash_full(self, body_hash: str) -> Tuple[Dict[str, List[Dict[str, str]]], BytesIO]:
'''Returns a lot of information about the hash (sha512) and the hits in the instance.
Also contains the data (base64 encoded)'''
details = self.indexing.get_body_hash_urls(body_hash) details = self.indexing.get_body_hash_urls(body_hash)
body_content = BytesIO() body_content = BytesIO()
# get the body from the first entry in the details list # get the body from the first entry in the details list
for url, entries in details.items(): for url, entries in details.items():
capture_dir = self.lookup_capture_dir(entries[0]['capture']) ct = self.get_crawled_tree(entries[0]['capture'])
if not capture_dir:
raise MissingUUID(f"Unable to find {entries[0]['capture']}")
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find {capture_dir}')
urlnode = ct.root_hartree.get_url_node_by_uuid(entries[0]['urlnode']) urlnode = ct.root_hartree.get_url_node_by_uuid(entries[0]['urlnode'])
if urlnode.body_hash == body_hash: if urlnode.body_hash == body_hash:
# the hash we're looking for is the whole file # the hash we're looking for is the whole file
@ -798,16 +796,11 @@ class Lookyloo():
return details, body_content return details, body_content
def get_url_occurrences(self, url: str): def get_url_occurrences(self, url: str):
'''Get all the captures and URL nodes the URL has been seen on the instance.'''
capture_uuids = self.indexing.get_captures_url(url) capture_uuids = self.indexing.get_captures_url(url)
to_return: Dict[str, Dict] = {cuuid: {} for cuuid in capture_uuids} to_return: Dict[str, Dict] = {cuuid: {} for cuuid in capture_uuids}
for capture_uuid in capture_uuids: for capture_uuid in capture_uuids:
capture_dir = self.lookup_capture_dir(capture_uuid) ct = self.get_crawled_tree(capture_uuid)
if not capture_dir:
raise MissingUUID(f"Unable to find {capture_uuid}")
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find {capture_dir}')
to_return[capture_uuid]['start_timestamp'] = ct.root_hartree.start_time.isoformat() to_return[capture_uuid]['start_timestamp'] = ct.root_hartree.start_time.isoformat()
to_return[capture_uuid]['urlnodes'] = {} to_return[capture_uuid]['urlnodes'] = {}
for urlnode in ct.root_hartree.url_tree.search_nodes(name=url): for urlnode in ct.root_hartree.url_tree.search_nodes(name=url):
@ -818,16 +811,11 @@ class Lookyloo():
return to_return return to_return
def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False): def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False):
'''Get all the captures and URL nodes the hostname has been seen on the instance.'''
capture_uuids = self.indexing.get_captures_hostname(hostname) capture_uuids = self.indexing.get_captures_hostname(hostname)
to_return: Dict[str, Dict] = {cuuid: {} for cuuid in capture_uuids} to_return: Dict[str, Dict] = {cuuid: {} for cuuid in capture_uuids}
for capture_uuid in capture_uuids: for capture_uuid in capture_uuids:
capture_dir = self.lookup_capture_dir(capture_uuid) ct = self.get_crawled_tree(capture_uuid)
if not capture_dir:
raise MissingUUID(f"Unable to find {capture_uuid}")
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find {capture_dir}')
to_return[capture_uuid]['start_timestamp'] = ct.root_hartree.start_time.isoformat() to_return[capture_uuid]['start_timestamp'] = ct.root_hartree.start_time.isoformat()
to_return[capture_uuid]['hostnodes'] = [] to_return[capture_uuid]['hostnodes'] = []
if with_urls_occurrences: if with_urls_occurrences:
@ -844,6 +832,7 @@ class Lookyloo():
return to_return return to_return
def get_cookie_name_investigator(self, cookie_name: str): def get_cookie_name_investigator(self, cookie_name: str):
'''Returns all the captures related to a cookie name entry, used in the web interface.'''
captures = [] captures = []
for capture_uuid, url_uuid in self.indexing.get_cookies_names_captures(cookie_name): for capture_uuid, url_uuid in self.indexing.get_cookies_names_captures(cookie_name):
cache = self.capture_cache(capture_uuid) cache = self.capture_cache(capture_uuid)
@ -854,6 +843,9 @@ class Lookyloo():
return captures, domains return captures, domains
def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Tuple[int, Dict[str, List[Tuple[str, str, str, str, str]]]]: def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Tuple[int, Dict[str, List[Tuple[str, str, str, str, str]]]]:
'''Search all the captures a specific hash was seen.
If a URL is given, it splits the results if the hash is seen on the same URL or an other one.
Capture UUID avoids duplicates on the same capture'''
captures_list: Dict[str, List[Tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []} captures_list: Dict[str, List[Tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []}
total_captures, details = self.indexing.get_body_hash_captures(blob_hash, url, filter_capture_uuid=capture_uuid) total_captures, details = self.indexing.get_body_hash_captures(blob_hash, url, filter_capture_uuid=capture_uuid)
for h_capture_uuid, url_uuid, url_hostname, same_url in details: for h_capture_uuid, url_uuid, url_hostname, same_url in details:
@ -866,6 +858,8 @@ class Lookyloo():
return total_captures, captures_list return total_captures, captures_list
def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode): def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode):
''' There are a few different sources to figure out known vs. legitimate content,
this method normalize it for the web interface.'''
known: Optional[Union[str, List[Any]]] = None known: Optional[Union[str, List[Any]]] = None
legitimate: Optional[Tuple[bool, Any]] = None legitimate: Optional[Tuple[bool, Any]] = None
if h not in known_content: if h not in known_content:
@ -884,6 +878,7 @@ class Lookyloo():
return known, legitimate return known, legitimate
def get_ressource(self, tree_uuid: str, urlnode_uuid: str, h: Optional[str]) -> Optional[Tuple[str, BytesIO, str]]: def get_ressource(self, tree_uuid: str, urlnode_uuid: str, h: Optional[str]) -> Optional[Tuple[str, BytesIO, str]]:
'''Get a specific resource from a URL node. If a hash s also given, we want an embeded resource'''
url = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid) url = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid)
if url.empty_response: if url.empty_response:
return None return None
@ -901,21 +896,17 @@ class Lookyloo():
return None return None
def misp_export(self, capture_uuid: str) -> Union[MISPEvent, Dict[str, str]]: def misp_export(self, capture_uuid: str) -> Union[MISPEvent, Dict[str, str]]:
'''Export a capture in MISP format. You can POST the return of this method
directly to a MISP instance and it will create an event.'''
cache = self.capture_cache(capture_uuid) cache = self.capture_cache(capture_uuid)
if not cache: if not cache:
return {'error': 'UUID missing in cache, try again later.'} return {'error': 'UUID missing in cache, try again later.'}
if cache['incomplete_redirects']: if cache['incomplete_redirects']:
self.cache_tree(capture_uuid) self._cache_capture(capture_uuid)
cache = self.capture_cache(capture_uuid) cache = self.capture_cache(capture_uuid)
capture_dir = self.lookup_capture_dir(capture_uuid) ct = self.get_crawled_tree(capture_uuid)
if not capture_dir:
raise MissingUUID(f'Unable to find {capture_uuid}')
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find {capture_dir}')
event = MISPEvent() event = MISPEvent()
event.info = f'Lookyloo Capture ({cache["url"]})' event.info = f'Lookyloo Capture ({cache["url"]})'
@ -961,16 +952,11 @@ class Lookyloo():
return get_resources_hashes(container) return get_resources_hashes(container)
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]: def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
capture_dir = self.lookup_capture_dir(capture_uuid) '''Gather all the informations needed to display the Hostnode investigator popup.'''
if not capture_dir: ct = self.get_crawled_tree(capture_uuid)
raise MissingUUID(f'Unable to find {capture_uuid}')
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find {capture_dir}')
hostnode = ct.root_hartree.get_host_node_by_uuid(node_uuid) hostnode = ct.root_hartree.get_host_node_by_uuid(node_uuid)
if not hostnode: if not hostnode:
raise MissingUUID(f'Unable to find UUID {node_uuid} in {capture_dir}') raise MissingUUID(f'Unable to find UUID {node_uuid} in {node_uuid}')
cnames_path = ct.root_hartree.har.path.parent / 'cnames.json' cnames_path = ct.root_hartree.har.path.parent / 'cnames.json'
if cnames_path.exists(): if cnames_path.exists():
@ -1064,6 +1050,7 @@ class Lookyloo():
return hostnode, urls return hostnode, urls
def get_stats(self) -> Dict[str, List]: def get_stats(self) -> Dict[str, List]:
'''Gather statistics about the lookyloo instance'''
today = date.today() today = date.today()
calendar_week = today.isocalendar()[1] calendar_week = today.isocalendar()[1]

View File

@ -317,7 +317,7 @@ def hide_capture(tree_uuid: str):
@app.route('/tree/<string:tree_uuid>/cache', methods=['GET']) @app.route('/tree/<string:tree_uuid>/cache', methods=['GET'])
def cache_tree(tree_uuid: str): def cache_tree(tree_uuid: str):
lookyloo.cache_tree(tree_uuid) lookyloo.get_crawled_tree(tree_uuid)
return redirect(url_for('index')) return redirect(url_for('index'))
@ -354,10 +354,14 @@ def tree(tree_uuid: str, urlnode_uuid: Optional[str]=None):
flash(cache['error'], 'error') flash(cache['error'], 'error')
try: try:
tree_json, start_time, user_agent, root_url, meta = lookyloo.load_tree(tree_uuid) ct = lookyloo.get_crawled_tree(tree_uuid)
ct = lookyloo.context.contextualize_tree(ct)
b64_thumbnail = lookyloo.get_screenshot_thumbnail(tree_uuid, for_datauri=True) b64_thumbnail = lookyloo.get_screenshot_thumbnail(tree_uuid, for_datauri=True)
return render_template('tree.html', tree_json=tree_json, start_time=start_time, meta = lookyloo.get_meta(tree_uuid)
user_agent=user_agent, root_url=root_url, tree_uuid=tree_uuid, return render_template('tree.html', tree_json=ct.to_json(),
start_time=ct.start_time.isoformat(),
user_agent=ct.user_agent, root_url=ct.root_url,
tree_uuid=tree_uuid,
screenshot_thumbnail=b64_thumbnail, page_title=cache['title'], screenshot_thumbnail=b64_thumbnail, page_title=cache['title'],
meta=meta, enable_mail_notification=enable_mail_notification, meta=meta, enable_mail_notification=enable_mail_notification,
enable_context_by_users=enable_context_by_users, enable_context_by_users=enable_context_by_users,
@ -702,7 +706,7 @@ def json_redirects(tree_uuid: str):
return to_return return to_return
if cache['incomplete_redirects']: if cache['incomplete_redirects']:
# Trigger tree build, get all redirects # Trigger tree build, get all redirects
lookyloo.cache_tree(tree_uuid) lookyloo.get_crawled_tree(tree_uuid)
cache = lookyloo.capture_cache(tree_uuid) cache = lookyloo.capture_cache(tree_uuid)
if cache: if cache:
to_return['response']['redirects'] = cache['redirects'] to_return['response']['redirects'] = cache['redirects']