lookyloo/lookyloo/lookyloo.py

1322 lines
62 KiB
Python
Raw Normal View History

2019-01-30 14:30:01 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
2020-05-11 19:01:02 +02:00
import base64
from collections import defaultdict, Counter
from datetime import datetime, date, timedelta
2020-05-11 19:01:02 +02:00
from email.message import EmailMessage
from io import BufferedIOBase, BytesIO
import ipaddress
2020-05-11 19:01:02 +02:00
import json
import logging
from pathlib import Path
import pickle
import smtplib
import socket
2021-03-16 14:09:30 +01:00
import sys
from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping, Set, Iterable
from urllib.parse import urlsplit, urljoin
2019-01-30 14:30:01 +01:00
from uuid import uuid4
from zipfile import ZipFile
2020-10-29 13:29:13 +01:00
import operator
2019-01-30 14:30:01 +01:00
from defang import refang # type: ignore
2020-09-24 18:46:43 +02:00
import dns.resolver
import dns.rdatatype
2020-05-20 19:11:15 +02:00
from har2tree import CrawledTree, Har2TreeError, HarFile, HostNode, URLNode
2020-12-09 21:08:34 +01:00
from PIL import Image # type: ignore
2021-02-02 15:23:25 +01:00
from pymisp import MISPEvent, MISPAttribute, MISPObject
2020-12-08 14:59:34 +01:00
from pymisp.tools import URLObject, FileObject
import requests
from requests.exceptions import HTTPError
2020-05-11 19:01:02 +02:00
from redis import Redis
from scrapysplashwrapper import crawl
from werkzeug.useragents import UserAgent
2021-01-12 17:22:51 +01:00
from .exceptions import NoValidHarFile, MissingUUID, LookylooException
2020-10-09 18:05:04 +02:00
from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree,
2021-03-31 19:25:57 +02:00
remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains,
CaptureStatus)
2021-04-26 00:52:08 +02:00
from .modules import VirusTotal, SaneJavaScript, PhishingInitiative, MISP, UniversalWhois
from .capturecache import CaptureCache
2020-10-09 18:05:04 +02:00
from .context import Context
from .indexing import Indexing
2019-01-30 14:30:01 +01:00
class Lookyloo():
def __init__(self) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.indexing = Indexing()
self.is_public_instance = get_config('generic', 'public_instance')
self.public_domain = get_config('generic', 'public_domain')
2020-10-28 18:49:15 +01:00
self.taxonomies = get_taxonomies()
2020-01-06 15:32:38 +01:00
self.redis: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
self.capture_dir: Path = get_homedir() / 'scraped'
if os.environ.get('SPLASH_URL_DOCKER'):
# In order to have a working default for the docker image, it is easier to use an environment variable
self.splash_url: str = os.environ['SPLASH_URL_DOCKER']
else:
self.splash_url = get_config('generic', 'splash_url')
self.only_global_lookups: bool = get_config('generic', 'only_global_lookups')
2020-04-01 14:33:35 +02:00
2021-05-18 23:58:56 +02:00
self._priority = get_config('generic', 'priority')
safe_create_dir(self.capture_dir)
2019-01-30 14:30:01 +01:00
# Initialize 3rd party components
self.pi = PhishingInitiative(get_config('modules', 'PhishingInitiative'))
if not self.pi.available:
self.logger.warning('Unable to setup the PhishingInitiative module')
self.vt = VirusTotal(get_config('modules', 'VirusTotal'))
if not self.vt.available:
self.logger.warning('Unable to setup the VirusTotal module')
self.sanejs = SaneJavaScript(get_config('modules', 'SaneJS'))
if not self.sanejs.available:
self.logger.warning('Unable to setup the SaneJS module')
2021-01-28 18:37:44 +01:00
self.misp = MISP(get_config('modules', 'MISP'))
if not self.misp.available:
self.logger.warning('Unable to setup the MISP module')
2021-04-26 00:52:08 +02:00
self.uwhois = UniversalWhois(get_config('modules', 'UniversalWhois'))
if not self.uwhois.available:
self.logger.warning('Unable to setup the UniversalWhois module')
2020-12-03 12:33:35 +01:00
self.context = Context(self.sanejs)
self._captures_index: Dict[str, CaptureCache] = {}
if not self.redis.exists('cache_loaded'):
self._init_existing_dumps()
2019-02-01 16:11:16 +01:00
2021-05-18 23:58:56 +02:00
def _get_priority(self, source: str, user: str, authenticated: bool) -> int:
src_prio: int = self._priority['sources'][source] if source in self._priority['sources'] else -1
if not authenticated:
usr_prio = self._priority['users']['_default_anon']
# reduce priority for anonymous users making lots of captures
queue_size = self.redis.zscore('queues', f'{source}|{authenticated}|{user}')
if queue_size is None:
queue_size = 0
usr_prio -= int(queue_size / 10)
else:
usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth']
return src_prio + usr_prio
def cache_user_agents(self, user_agent: str, remote_ip: str) -> None:
2021-01-12 17:22:51 +01:00
'''Cache the useragents of the visitors'''
today = date.today().isoformat()
self.redis.zincrby(f'user_agents|{today}', 1, f'{remote_ip}|{user_agent}')
def build_ua_file(self) -> None:
2021-01-12 17:22:51 +01:00
'''Build a file in a format compatible with the capture page'''
yesterday = (date.today() - timedelta(days=1))
self_generated_ua_file_path = get_homedir() / 'own_user_agents' / str(yesterday.year) / f'{yesterday.month:02}'
safe_create_dir(self_generated_ua_file_path)
self_generated_ua_file = self_generated_ua_file_path / f'{yesterday.isoformat()}.json'
if self_generated_ua_file.exists():
return
entries = self.redis.zrevrange(f'user_agents|{yesterday.isoformat()}', 0, -1)
if not entries:
return
to_store: Dict[str, Any] = {'by_frequency': []}
uas = Counter([entry.split('|', 1)[1] for entry in entries])
for ua, count in uas.most_common():
parsed_ua = UserAgent(ua)
2020-10-12 12:15:07 +02:00
if not parsed_ua.platform or not parsed_ua.browser:
2020-06-25 16:50:57 +02:00
continue
2020-10-12 12:15:07 +02:00
if parsed_ua.platform not in to_store:
to_store[parsed_ua.platform] = {}
if f'{parsed_ua.browser} {parsed_ua.version}' not in to_store[parsed_ua.platform]:
to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'] = []
to_store[parsed_ua.platform][f'{parsed_ua.browser} {parsed_ua.version}'].append(parsed_ua.string)
to_store['by_frequency'].append({'os': parsed_ua.platform,
'browser': f'{parsed_ua.browser} {parsed_ua.version}',
2020-10-01 11:48:00 +02:00
'useragent': parsed_ua.string})
with self_generated_ua_file.open('w') as f:
json.dump(to_store, f, indent=2)
2021-04-18 17:58:16 +02:00
# Remove the UA / IP mapping.
self.redis.delete(f'user_agents|{yesterday.isoformat()}')
def _cache_capture(self, capture_uuid: str, /) -> CrawledTree:
2021-03-18 18:47:54 +01:00
'''Generate the pickle, set the cache, add capture in the indexes'''
capture_dir = self._get_capture_dir(capture_uuid)
2020-07-06 18:15:03 +02:00
har_files = sorted(capture_dir.glob('*.har'))
# NOTE: We only index the public captures
index = True
try:
2021-03-12 16:52:12 +01:00
ct = CrawledTree(har_files, capture_uuid)
self._ensure_meta(capture_dir, ct)
2021-01-12 17:22:51 +01:00
self._resolve_dns(ct)
# Force update cache of the capture (takes care of the incomplete redirect key)
self._set_capture_cache(capture_dir, force=True)
cache = self.capture_cache(capture_uuid)
if not cache:
raise LookylooException(f'Broken cache for {capture_dir}')
if self.is_public_instance:
if cache.no_index:
index = False
if index:
self.indexing.index_cookies_capture(ct)
self.indexing.index_body_hashes_capture(ct)
2020-10-27 00:02:18 +01:00
self.indexing.index_url_capture(ct)
categories = list(self.categories_capture(capture_uuid).keys())
self.indexing.index_categories_capture(capture_uuid, categories)
except Har2TreeError as e:
raise NoValidHarFile(e.message)
except RecursionError as e:
raise NoValidHarFile(f'Tree too deep, probably a recursive refresh: {e}.\n Append /export to the URL to get the files.')
2020-07-06 18:15:03 +02:00
with (capture_dir / 'tree.pickle').open('wb') as _p:
2021-03-16 14:09:30 +01:00
# Some pickles require a pretty high recursion limit, this kindof fixes it.
# If the capture is really broken (generally a refresh to self), the capture
# is discarded in the RecursionError above.
default_recursion_limit = sys.getrecursionlimit()
sys.setrecursionlimit(int(default_recursion_limit * 1.1))
pickle.dump(ct, _p)
2021-03-16 14:09:30 +01:00
sys.setrecursionlimit(default_recursion_limit)
return ct
2020-09-24 18:46:43 +02:00
def _build_cname_chain(self, known_cnames: Dict[str, Optional[str]], hostname) -> List[str]:
2021-01-12 17:22:51 +01:00
'''Returns a list of CNAMEs starting from one hostname.
The CNAMEs resolutions are made in `_resolve_dns`. A hostname can have a CNAME entry
and the CNAME entry can have an other CNAME entry, and so on multiple times.
This method loops over the hostnames until there are no CNAMES.'''
2020-09-24 18:46:43 +02:00
cnames: List[str] = []
to_search = hostname
while True:
if known_cnames.get(to_search) is None:
break
2020-09-29 14:24:18 +02:00
# At this point, known_cnames[to_search] must exist and be a str
2020-09-24 18:46:43 +02:00
cnames.append(known_cnames[to_search]) # type: ignore
to_search = known_cnames[to_search]
return cnames
2021-01-12 17:22:51 +01:00
def _resolve_dns(self, ct: CrawledTree):
'''Resolves all domains of the tree, keeps A (IPv4), AAAA (IPv6), and CNAME entries
and store them in ips.json and cnames.json, in the capture directory.
Updates the nodes of the tree accordingly so the information is available.
'''
2020-09-24 18:46:43 +02:00
cnames_path = ct.root_hartree.har.path.parent / 'cnames.json'
2020-09-29 14:24:18 +02:00
ips_path = ct.root_hartree.har.path.parent / 'ips.json'
2020-09-24 18:46:43 +02:00
host_cnames: Dict[str, Optional[str]] = {}
if cnames_path.exists():
with cnames_path.open() as f:
host_cnames = json.load(f)
2020-09-29 14:24:18 +02:00
host_ips: Dict[str, List[str]] = {}
if ips_path.exists():
with ips_path.open() as f:
host_ips = json.load(f)
2020-09-24 18:46:43 +02:00
for node in ct.root_hartree.hostname_tree.traverse():
2020-09-29 14:24:18 +02:00
if node.name not in host_cnames or node.name not in host_ips:
2020-09-24 18:46:43 +02:00
# Resolve and cache
try:
response = dns.resolver.resolve(node.name, search=True)
for answer in response.response.answer:
if answer.rdtype == dns.rdatatype.RdataType.CNAME:
host_cnames[str(answer.name).rstrip('.')] = str(answer[0].target).rstrip('.')
else:
host_cnames[str(answer.name).rstrip('.')] = None
2020-09-29 14:24:18 +02:00
if answer.rdtype in [dns.rdatatype.RdataType.A, dns.rdatatype.RdataType.AAAA]:
host_ips[str(answer.name).rstrip('.')] = list(set(str(b) for b in answer))
2020-09-24 18:46:43 +02:00
except Exception:
host_cnames[node.name] = None
2020-09-29 14:24:18 +02:00
host_ips[node.name] = []
2020-09-24 18:46:43 +02:00
cnames = self._build_cname_chain(host_cnames, node.name)
if cnames:
node.add_feature('cname', cnames)
2020-09-29 14:24:18 +02:00
if cnames[-1] in host_ips:
node.add_feature('resolved_ips', host_ips[cnames[-1]])
elif node.name in host_ips:
node.add_feature('resolved_ips', host_ips[node.name])
2020-09-24 18:46:43 +02:00
with cnames_path.open('w') as f:
json.dump(host_cnames, f)
2020-09-29 14:24:18 +02:00
with ips_path.open('w') as f:
json.dump(host_ips, f)
2020-09-24 18:46:43 +02:00
return ct
def get_crawled_tree(self, capture_uuid: str, /) -> CrawledTree:
2021-01-12 17:22:51 +01:00
'''Get the generated tree in ETE Toolkit format.
Loads the pickle if it exists, creates it otherwise.'''
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
ct = load_pickle_tree(capture_dir)
if not ct:
ct = self._cache_capture(capture_uuid)
if not ct:
raise NoValidHarFile(f'Unable to get tree from {capture_dir}')
return ct
def add_context(self, capture_uuid: str, /, urlnode_uuid: str, *, ressource_hash: str,
legitimate: bool, malicious: bool, details: Dict[str, Dict[str, str]]):
2021-01-12 17:22:51 +01:00
'''Adds context information to a capture or a URL node'''
if malicious:
self.context.add_malicious(ressource_hash, details['malicious'])
if legitimate:
self.context.add_legitimate(ressource_hash, details['legitimate'])
def add_to_legitimate(self, capture_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None):
2021-01-12 17:22:51 +01:00
'''Mark a full captyre as legitimate.
Iterates over all the nodes and mark them all as legitimate too.'''
ct = self.get_crawled_tree(capture_uuid)
self.context.mark_as_legitimate(ct, hostnode_uuid, urlnode_uuid)
def remove_pickle(self, capture_uuid: str, /) -> None:
2021-01-12 17:22:51 +01:00
'''Remove the pickle from a specific capture.'''
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
remove_pickle_tree(capture_dir)
2020-05-18 18:32:59 +02:00
def rebuild_cache(self) -> None:
2021-01-12 17:22:51 +01:00
'''Flush and rebuild the redis cache. Doesn't remove the pickles.'''
2020-04-01 17:44:06 +02:00
self.redis.flushdb()
self._init_existing_dumps()
2020-05-18 18:32:59 +02:00
def rebuild_all(self) -> None:
2021-01-12 17:22:51 +01:00
'''Flush and rebuild the redis cache, and delede all the pickles.'''
[remove_pickle_tree(capture_dir) for capture_dir in self.capture_dirs] # type: ignore
2020-04-01 17:44:06 +02:00
self.rebuild_cache()
def get_urlnode_from_tree(self, capture_uuid: str, /, node_uuid: str) -> URLNode:
2021-01-12 17:22:51 +01:00
'''Get a URL node from a tree, by UUID'''
ct = self.get_crawled_tree(capture_uuid)
2020-05-20 19:11:15 +02:00
return ct.root_hartree.get_url_node_by_uuid(node_uuid)
def get_hostnode_from_tree(self, capture_uuid: str, /, node_uuid: str) -> HostNode:
2021-01-12 17:22:51 +01:00
'''Get a host node from a tree, by UUID'''
ct = self.get_crawled_tree(capture_uuid)
2020-05-20 19:11:15 +02:00
return ct.root_hartree.get_host_node_by_uuid(node_uuid)
def get_statistics(self, capture_uuid: str, /) -> Dict[str, Any]:
2021-01-12 17:22:51 +01:00
'''Get the statistics of a capture.'''
ct = self.get_crawled_tree(capture_uuid)
2021-04-27 17:09:58 +02:00
return ct.root_hartree.stats
2020-05-13 17:31:27 +02:00
def get_meta(self, capture_uuid: str, /) -> Dict[str, str]:
2021-01-12 17:22:51 +01:00
'''Get the meta informations from a capture (mostly, details about the User Agent used.)'''
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
2021-01-12 17:22:51 +01:00
meta = {}
if (capture_dir / 'meta').exists():
with open((capture_dir / 'meta'), 'r') as f:
meta = json.load(f)
return meta
def categories_capture(self, capture_uuid: str, /) -> Dict[str, Any]:
2021-01-12 17:22:51 +01:00
'''Get all the categories related to a capture, in MISP Taxonomies format'''
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
2020-10-28 18:49:15 +01:00
# get existing categories if possible
if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as f:
current_categories = [line.strip() for line in f.readlines()]
return {e: self.taxonomies.revert_machinetag(e) for e in current_categories}
return {}
2020-10-28 18:49:15 +01:00
def categorize_capture(self, capture_uuid: str, /, category: str) -> None:
2021-01-12 17:22:51 +01:00
'''Add a category (MISP Taxonomy tag) to a capture.'''
2020-10-28 18:49:15 +01:00
if not get_config('generic', 'enable_categorization'):
return
# Make sure the category is mappable to a taxonomy.
self.taxonomies.revert_machinetag(category)
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
2020-10-28 18:49:15 +01:00
# get existing categories if possible
if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as f:
current_categories = set(line.strip() for line in f.readlines())
else:
current_categories = set()
current_categories.add(category)
with (capture_dir / 'categories').open('w') as f:
f.writelines(f'{t}\n' for t in current_categories)
def uncategorize_capture(self, capture_uuid: str, /, category: str) -> None:
2021-01-12 17:22:51 +01:00
'''Remove a category (MISP Taxonomy tag) from a capture.'''
2020-10-28 18:49:15 +01:00
if not get_config('generic', 'enable_categorization'):
return
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
2020-10-28 18:49:15 +01:00
# get existing categories if possible
if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as f:
current_categories = set(line.strip() for line in f.readlines())
else:
current_categories = set()
current_categories.remove(category)
with (capture_dir / 'categories').open('w') as f:
f.writelines(f'{t}\n' for t in current_categories)
def trigger_modules(self, capture_uuid: str, /, force: bool=False) -> None:
2021-01-12 17:22:51 +01:00
'''Launch the 3rd party modules on a capture.
It uses the cached result *if* the module was triggered the same day.
The `force` flag re-triggers the module regardless of the cache.'''
try:
ct = self.get_crawled_tree(capture_uuid)
except LookylooException:
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_uuid}) is cached.')
return
2020-12-03 12:33:35 +01:00
if self.pi.available:
2020-06-09 15:06:35 +02:00
if ct.redirects:
for redirect in ct.redirects:
self.pi.url_lookup(redirect, force)
else:
self.pi.url_lookup(ct.root_hartree.har.root_url, force)
2020-12-03 12:33:35 +01:00
if self.vt.available:
if ct.redirects:
for redirect in ct.redirects:
self.vt.url_lookup(redirect, force)
else:
self.vt.url_lookup(ct.root_hartree.har.root_url, force)
def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
2021-01-12 17:22:51 +01:00
'''Get the responses of the modules from the cached responses on the disk'''
try:
ct = self.get_crawled_tree(capture_uuid)
except LookylooException:
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
2020-04-20 16:52:46 +02:00
return None
to_return: Dict[str, Any] = {}
2020-12-03 12:33:35 +01:00
if self.vt.available:
to_return['vt'] = {}
if ct.redirects:
for redirect in ct.redirects:
to_return['vt'][redirect] = self.vt.get_url_lookup(redirect)
else:
to_return['vt'][ct.root_hartree.har.root_url] = self.vt.get_url_lookup(ct.root_hartree.har.root_url)
2020-12-03 12:33:35 +01:00
if self.pi.available:
2020-06-09 15:06:35 +02:00
to_return['pi'] = {}
if ct.redirects:
for redirect in ct.redirects:
to_return['pi'][redirect] = self.pi.get_url_lookup(redirect)
else:
to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url)
return to_return
2020-10-29 23:25:20 +01:00
def _set_capture_cache(self, capture_dir: Path, force: bool=False, redis_pipeline: Optional[Redis]=None) -> None:
2021-01-12 17:22:51 +01:00
'''Populate the redis cache for a capture. Mostly used on the index page.'''
if force or not self.redis.exists(str(capture_dir)):
# (re)build cache
pass
else:
2019-06-25 18:08:52 +02:00
return
with (capture_dir / 'uuid').open() as f:
uuid = f.read().strip()
har_files = sorted(capture_dir.glob('*.har'))
2020-03-17 15:27:04 +01:00
error_cache: Dict[str, str] = {}
if (capture_dir / 'error.txt').exists():
# Something went wrong
with (capture_dir / 'error.txt').open() as _error:
2020-07-03 18:25:16 +02:00
content = _error.read()
try:
2020-07-06 14:16:17 +02:00
error_to_cache = json.loads(content)
if isinstance(error_to_cache, dict) and error_to_cache.get('details'):
error_to_cache = error_to_cache.get('details')
2020-07-03 18:25:16 +02:00
except json.decoder.JSONDecodeError:
# old format
error_to_cache = content
error_cache['error'] = f'The capture {capture_dir.name} has an error: {error_to_cache}'
2020-07-03 18:25:16 +02:00
2020-07-06 15:33:00 +02:00
fatal_error = False
if har_files:
try:
har = HarFile(har_files[0], uuid)
except Har2TreeError as e:
error_cache['error'] = e.message
fatal_error = True
else:
error_cache['error'] = f'No har files in {capture_dir.name}'
fatal_error = True
if (capture_dir / 'categories').exists():
with (capture_dir / 'categories').open() as _categories:
categories = [c.strip() for c in _categories.readlines()]
else:
categories = []
2020-10-29 23:25:20 +01:00
if not redis_pipeline:
p = self.redis.pipeline()
else:
2021-01-25 13:14:33 +01:00
p = redis_pipeline # type: ignore
2020-10-29 23:25:20 +01:00
p.hset('lookup_dirs', uuid, str(capture_dir))
if error_cache:
if 'HTTP Error' not in error_cache['error']:
self.logger.warning(error_cache['error'])
2021-01-25 13:14:33 +01:00
p.hmset(str(capture_dir), error_cache) # type: ignore
2020-10-29 23:25:20 +01:00
if not fatal_error:
redirects = har.initial_redirects
incomplete_redirects = False
if redirects and har.need_tree_redirects:
# load tree from disk, get redirects
ct = load_pickle_tree(capture_dir)
if ct:
redirects = ct.redirects
else:
# Pickle not available
incomplete_redirects = True
cache: Dict[str, Union[str, int]] = {'uuid': uuid,
'title': har.initial_title,
'timestamp': har.initial_start_time,
'url': har.root_url,
'redirects': json.dumps(redirects),
'categories': json.dumps(categories),
2020-10-29 23:25:20 +01:00
'capture_dir': str(capture_dir),
'incomplete_redirects': 1 if incomplete_redirects else 0}
if (capture_dir / 'no_index').exists(): # If the folders claims anonymity
cache['no_index'] = 1
if (capture_dir / 'parent').exists(): # The capture was initiated from an other one
with (capture_dir / 'parent').open() as f:
cache['parent'] = f.read().strip()
2021-01-25 13:14:33 +01:00
p.hmset(str(capture_dir), cache) # type: ignore
2020-10-29 23:25:20 +01:00
if not redis_pipeline:
p.execute()
# If the cache is re-created for some reason, pop from the local cache.
self._captures_index.pop(uuid, None)
def hide_capture(self, capture_uuid: str, /) -> None:
"""Add the capture in the hidden pool (not shown on the front page)
NOTE: it won't remove the correlations until they are rebuilt.
"""
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
self.redis.hset(str(capture_dir), 'no_index', 1)
(capture_dir / 'no_index').touch()
2021-04-14 18:04:44 +02:00
if capture_uuid in self._captures_index:
self._captures_index[capture_uuid].no_index = True
@property
2021-01-14 17:28:59 +01:00
def capture_uuids(self) -> List[str]:
2021-01-12 17:22:51 +01:00
'''All the capture UUIDs present in the cache.'''
2021-01-25 13:14:33 +01:00
return self.redis.hkeys('lookup_dirs')
2021-03-18 18:47:54 +01:00
def sorted_capture_cache(self, capture_uuids: Iterable[str]=[]) -> List[CaptureCache]:
2021-01-12 17:22:51 +01:00
'''Get all the captures in the cache, sorted by timestamp (new -> old).'''
2020-12-11 10:26:42 +01:00
if not capture_uuids:
# Sort all captures
capture_uuids = self.capture_uuids
if not capture_uuids:
# No captures at all on the instance
return []
all_cache: List[CaptureCache] = [self._captures_index[uuid] for uuid in capture_uuids if uuid in self._captures_index]
captures_to_get = set(capture_uuids) - set(self._captures_index.keys())
if captures_to_get:
p = self.redis.pipeline()
for directory in self.redis.hmget('lookup_dirs', *captures_to_get):
if not directory:
continue
2020-10-29 14:06:38 +01:00
p.hgetall(directory)
for c in p.execute():
if not c:
continue
c = CaptureCache(c)
if c.incomplete_redirects:
self._set_capture_cache(c.capture_dir, force=True)
c = self.capture_cache(c.uuid)
if hasattr(c, 'timestamp'):
all_cache.append(c)
self._captures_index[c.uuid] = c
all_cache.sort(key=operator.attrgetter('timestamp'), reverse=True)
return all_cache
2020-10-29 13:29:13 +01:00
def capture_cache(self, capture_uuid: str, /) -> Optional[CaptureCache]:
"""Get the cache from redis.
NOTE: Doesn't try to build the pickle"""
2021-04-03 02:24:38 +02:00
if capture_uuid in self._captures_index:
return self._captures_index[capture_uuid]
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
2021-01-25 13:14:33 +01:00
cached: Dict[str, Any] = self.redis.hgetall(str(capture_dir))
if not cached:
self.logger.warning(f'No cache available for {capture_dir}.')
return None
try:
return CaptureCache(cached)
2021-01-14 17:28:59 +01:00
except LookylooException as e:
self.logger.warning(f'Cache ({capture_dir}) is invalid ({e}): {json.dumps(cached, indent=2)}')
return None
2019-02-01 16:11:16 +01:00
2020-01-06 15:32:38 +01:00
def _init_existing_dumps(self) -> None:
2021-01-12 17:22:51 +01:00
'''Initialize the cache for all the captures'''
2020-10-29 23:25:20 +01:00
p = self.redis.pipeline()
for capture_dir in self.capture_dirs:
if capture_dir.exists():
2020-10-29 23:25:20 +01:00
self._set_capture_cache(capture_dir, redis_pipeline=p)
p.set('cache_loaded', 1)
p.execute()
2019-02-01 16:11:16 +01:00
2019-01-30 14:30:01 +01:00
@property
def capture_dirs(self) -> List[Path]:
2021-01-12 17:22:51 +01:00
'''Get all the capture directories, sorder from newest to oldest.'''
for capture_dir in self.capture_dir.iterdir():
if capture_dir.is_dir() and not capture_dir.iterdir():
# Cleanup self.capture_dir of failed runs.
capture_dir.rmdir()
if not (capture_dir / 'uuid').exists():
2019-01-30 14:30:01 +01:00
# Create uuid if missing
with (capture_dir / 'uuid').open('w') as f:
2019-01-30 14:30:01 +01:00
f.write(str(uuid4()))
return sorted(self.capture_dir.iterdir(), reverse=True)
2019-01-30 14:30:01 +01:00
def _get_capture_dir(self, capture_uuid: str, /) -> Path:
2021-01-12 17:22:51 +01:00
'''Use the cache to get a capture directory from a capture UUID'''
2020-10-12 12:15:07 +02:00
capture_dir: str = self.redis.hget('lookup_dirs', capture_uuid) # type: ignore
2021-01-12 17:22:51 +01:00
if not capture_dir:
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
2021-03-12 16:52:12 +01:00
to_return = Path(capture_dir)
if not to_return.exists():
# The capture was removed, remove the UUID
self.redis.hdel('lookup_dirs', capture_uuid)
self.logger.warning(f'UUID ({capture_uuid}) linked to a missing directory ({capture_dir}). Removed now.')
raise NoValidHarFile(f'UUID ({capture_uuid}) linked to a missing directory ({capture_dir}). Removed now.')
return to_return
2019-01-30 14:30:01 +01:00
def get_capture_status(self, capture_uuid: str, /) -> CaptureStatus:
if self.redis.zrank('to_capture', capture_uuid) is not None:
2021-03-31 19:25:57 +02:00
return CaptureStatus.QUEUED
elif self.redis.hexists('lookup_dirs', capture_uuid):
return CaptureStatus.DONE
elif self.redis.sismember('ongoing', capture_uuid):
return CaptureStatus.ONGOING
return CaptureStatus.UNKNOWN
2021-05-18 23:58:56 +02:00
def enqueue_capture(self, query: MutableMapping[str, Any], source: str, user: str, authenticated: bool) -> str:
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
2019-01-30 14:30:01 +01:00
perma_uuid = str(uuid4())
p = self.redis.pipeline()
for key, value in query.items():
if isinstance(value, bool):
# Yes, empty string because that's False.
query[key] = 1 if value else ''
if isinstance(value, list):
query[key] = json.dumps(value)
2021-01-25 13:14:33 +01:00
p.hmset(perma_uuid, query) # type: ignore
2021-05-18 23:58:56 +02:00
priority = self._get_priority(source, user, authenticated)
p.zadd('to_capture', {perma_uuid: priority})
p.zincrby('queues', 1, f'{source}|{authenticated}|{user}')
p.set(f'{perma_uuid}_mgmt', f'{source}|{authenticated}|{user}')
2019-01-30 14:30:01 +01:00
p.execute()
return perma_uuid
def process_capture_queue(self) -> Union[bool, None]:
2021-01-12 17:22:51 +01:00
'''Process a query from the capture queue'''
if not self.redis.exists('to_capture'):
return None
status, message = self.splash_status()
if not status:
self.logger.critical(f'Splash is not running, unable to process the capture queue: {message}')
return None
2021-05-18 23:58:56 +02:00
value = self.redis.zpopmax('to_capture')
if not value or not value[0]:
2019-04-05 16:12:54 +02:00
return None
2021-05-18 23:58:56 +02:00
uuid, score = value[0]
queue = self.redis.get(f'{uuid}_mgmt')
2021-03-31 19:25:57 +02:00
self.redis.sadd('ongoing', uuid)
2021-05-18 23:58:56 +02:00
lazy_cleanup = self.redis.pipeline()
lazy_cleanup.delete(f'{uuid}_mgmt')
lazy_cleanup.zincrby('queues', -1, queue)
2021-01-25 13:14:33 +01:00
to_capture: Dict[str, Union[str, int, float]] = self.redis.hgetall(uuid)
to_capture['perma_uuid'] = uuid
if 'cookies' in to_capture:
to_capture['cookies_pseudofile'] = to_capture.pop('cookies')
2021-03-31 19:25:57 +02:00
status = self._capture(**to_capture) # type: ignore
2021-05-18 23:58:56 +02:00
lazy_cleanup.srem('ongoing', uuid)
lazy_cleanup.delete(uuid)
# make sure to expire the key if nothing was process for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
2021-03-31 19:25:57 +02:00
if status:
self.logger.info(f'Processed {to_capture["url"]}')
2019-04-05 16:12:54 +02:00
return True
2021-03-31 19:25:57 +02:00
self.logger.warning(f'Unable to capture {to_capture["url"]}')
2019-04-05 16:12:54 +02:00
return False
2019-01-30 14:30:01 +01:00
def send_mail(self, capture_uuid: str, /, email: str='', comment: str='') -> None:
2021-01-12 17:22:51 +01:00
'''Send an email notification regarding a specific capture'''
if not get_config('generic', 'enable_mail_notification'):
2020-05-11 19:01:02 +02:00
return
redirects = ''
initial_url = ''
cache = self.capture_cache(capture_uuid)
if cache:
initial_url = cache.url
if cache.redirects:
redirects = "Redirects:\n"
redirects += '\n'.join(cache.redirects)
else:
redirects = "No redirects."
email_config = get_config('generic', 'email')
2020-05-11 19:01:02 +02:00
msg = EmailMessage()
msg['From'] = email_config['from']
if email:
msg['Reply-To'] = email
2020-05-11 19:01:02 +02:00
msg['To'] = email_config['to']
msg['Subject'] = email_config['subject']
body = get_email_template()
body = body.format(
recipient=msg['To'].addresses[0].display_name,
domain=self.public_domain,
2020-05-11 19:01:02 +02:00
uuid=capture_uuid,
initial_url=initial_url,
redirects=redirects,
2020-05-11 19:01:02 +02:00
comment=comment,
sender=msg['From'].addresses[0].display_name,
)
msg.set_content(body)
try:
s = smtplib.SMTP(email_config['smtp_host'], email_config['smtp_port'])
s.send_message(msg)
s.quit()
except Exception as e:
2020-06-29 18:00:53 +02:00
self.logger.exception(e)
2020-08-20 15:05:27 +02:00
self.logger.warning(msg.as_string())
2020-05-11 19:01:02 +02:00
def _ensure_meta(self, capture_dir: Path, tree: CrawledTree) -> None:
2021-01-12 17:22:51 +01:00
'''Make sure the meta file is present, it contains information about the User Agent used for the capture.'''
metafile = capture_dir / 'meta'
if metafile.exists():
return
ua = UserAgent(tree.root_hartree.user_agent)
to_dump = {}
2020-10-12 12:15:07 +02:00
if ua.platform:
to_dump['os'] = ua.platform
if ua.browser:
if ua.version:
to_dump['browser'] = f'{ua.browser} {ua.version}'
else:
2020-10-12 12:15:07 +02:00
to_dump['browser'] = ua.browser
if ua.language:
to_dump['language'] = ua.language
if not to_dump:
# UA not recognized
self.logger.info(f'Unable to recognize the User agent: {ua}')
2020-10-01 11:48:00 +02:00
to_dump['user_agent'] = ua.string
with metafile.open('w') as f:
json.dump(to_dump, f)
def _get_raw(self, capture_uuid: str, /, extension: str='*', all_files: bool=True) -> BytesIO:
2021-01-12 17:22:51 +01:00
'''Get file(s) from the capture directory'''
2021-03-07 23:57:46 +01:00
try:
2021-03-18 18:47:54 +01:00
capture_dir = self._get_capture_dir(capture_uuid)
2021-03-07 23:57:46 +01:00
except MissingUUID:
return BytesIO(f'Capture {capture_uuid} not unavailable, try again later.'.encode())
2021-03-12 16:52:12 +01:00
except NoValidHarFile:
return BytesIO(f'No capture {capture_uuid} on the system.'.encode())
all_paths = sorted(list(capture_dir.glob(f'*.{extension}')))
if not all_files:
# Only get the first one in the list
with open(all_paths[0], 'rb') as f:
return BytesIO(f.read())
to_return = BytesIO()
with ZipFile(to_return, 'w') as myzip:
for path in all_paths:
if path.name.endswith('pickle'):
continue
myzip.write(path, arcname=f'{capture_dir.name}/{path.name}')
to_return.seek(0)
return to_return
def get_html(self, capture_uuid: str, /, all_html: bool=False) -> BytesIO:
2021-01-12 17:22:51 +01:00
'''Get rendered HTML'''
return self._get_raw(capture_uuid, 'html', all_html)
def get_cookies(self, capture_uuid: str, /, all_cookies: bool=False) -> BytesIO:
2021-01-12 17:22:51 +01:00
'''Get the cookie(s)'''
return self._get_raw(capture_uuid, 'cookies.json', all_cookies)
2020-05-26 17:45:04 +02:00
def get_screenshot(self, capture_uuid: str, /) -> BytesIO:
2021-01-12 17:22:51 +01:00
'''Get the screenshot(s) of the rendered page'''
return self._get_raw(capture_uuid, 'png', all_files=False)
def get_screenshot_thumbnail(self, capture_uuid: str, /, for_datauri: bool=False, width: int=64) -> Union[str, BytesIO]:
'''Get the thumbnail of the rendered page. Always crop to a square.'''
to_return = BytesIO()
size = width, width
try:
2021-01-13 15:16:36 +01:00
s = self.get_screenshot(capture_uuid)
orig_screenshot = Image.open(s)
to_thumbnail = orig_screenshot.crop((0, 0, orig_screenshot.width, orig_screenshot.width))
except Image.DecompressionBombError as e:
# The image is most probably too big: https://pillow.readthedocs.io/en/stable/reference/Image.html
self.logger.warning(f'Unable to generate the screenshot thumbnail of {capture_uuid}: image too big ({e}).')
error_img: Path = get_homedir() / 'website' / 'web' / 'static' / 'error_screenshot.png'
to_thumbnail = Image.open(error_img)
to_thumbnail.thumbnail(size)
to_thumbnail.save(to_return, 'png')
to_return.seek(0)
if for_datauri:
return base64.b64encode(to_return.getvalue()).decode()
else:
return to_return
def get_capture(self, capture_uuid: str, /) -> BytesIO:
2021-01-12 17:22:51 +01:00
'''Get all the files related to this capture.'''
return self._get_raw(capture_uuid)
2019-01-30 14:30:01 +01:00
def get_urls_rendered_page(self, capture_uuid: str, /):
ct = self.get_crawled_tree(capture_uuid)
return sorted(set(ct.root_hartree.rendered_node.urls_in_rendered_page)
- set(ct.root_hartree.all_url_requests.keys()))
def splash_status(self) -> Tuple[bool, str]:
try:
splash_status = requests.get(urljoin(self.splash_url, '_ping'))
splash_status.raise_for_status()
json_status = splash_status.json()
if json_status['status'] == 'ok':
return True, 'Splash is up'
else:
return False, str(json_status)
except HTTPError as http_err:
return False, f'HTTP error occurred: {http_err}'
except Exception as err:
return False, f'Other error occurred: {err}'
def _capture(self, url: str, *, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
browser: Optional[str]=None, parent: Optional[str]=None) -> Union[bool, str]:
2021-01-12 17:22:51 +01:00
'''Launch a capture'''
2020-03-19 14:05:19 +01:00
url = url.strip()
2020-03-19 11:05:29 +01:00
url = refang(url)
2019-01-30 14:30:01 +01:00
if not url.startswith('http'):
url = f'http://{url}'
if self.only_global_lookups:
splitted_url = urlsplit(url)
if splitted_url.netloc:
2020-01-06 15:32:38 +01:00
if splitted_url.hostname:
if splitted_url.hostname.split('.')[-1] != 'onion':
try:
ip = socket.gethostbyname(splitted_url.hostname)
except socket.gaierror:
self.logger.info('Name or service not known')
return False
if not ipaddress.ip_address(ip).is_global:
return False
else:
return False
2020-01-24 10:17:41 +01:00
cookies = load_cookies(cookies_pseudofile)
if not user_agent:
# Catch case where the UA is broken on the UI, and the async submission.
2020-10-01 11:48:00 +02:00
ua: str = get_config('generic', 'default_user_agent')
else:
ua = user_agent
2020-06-29 18:00:53 +02:00
2020-10-01 11:48:00 +02:00
if int(depth) > int(get_config('generic', 'max_depth')):
self.logger.warning(f'Not allowed to capture on a depth higher than {get_config("generic", "max_depth")}: {depth}')
2020-10-01 11:48:00 +02:00
depth = int(get_config('generic', 'max_depth'))
if not perma_uuid:
perma_uuid = str(uuid4())
2021-03-08 13:57:55 +01:00
self.logger.info(f'Capturing {url}')
try:
items = crawl(self.splash_url, url, cookies=cookies, depth=depth, user_agent=ua,
referer=referer, log_enabled=True, log_level=get_config('generic', 'splash_loglevel'))
except Exception as e:
self.logger.critical(f'Something went terribly wrong when capturing {url}.')
raise e
2019-01-30 14:30:01 +01:00
if not items:
# broken
2021-03-08 13:57:55 +01:00
self.logger.critical(f'Something went terribly wrong when capturing {url}.')
2019-04-05 16:12:54 +02:00
return False
2019-01-30 14:30:01 +01:00
width = len(str(len(items)))
dirpath = self.capture_dir / datetime.now().isoformat()
2020-04-01 14:33:35 +02:00
safe_create_dir(dirpath)
if os or browser:
meta = {}
if os:
meta['os'] = os
if browser:
meta['browser'] = browser
with (dirpath / 'meta').open('w') as _meta:
json.dump(meta, _meta)
# Write UUID
with (dirpath / 'uuid').open('w') as _uuid:
_uuid.write(perma_uuid)
# Write no_index marker (optional)
if not listing:
(dirpath / 'no_index').touch()
# Write parent UUID (optional)
if parent:
with (dirpath / 'parent').open('w') as _parent:
_parent.write(parent)
2020-07-03 18:25:16 +02:00
for i, item in enumerate(items):
if 'error' in item:
with (dirpath / 'error.txt').open('w') as _error:
2020-07-03 18:25:16 +02:00
json.dump(item['error'], _error)
# The capture went fine
2019-01-30 14:30:01 +01:00
harfile = item['har']
png = base64.b64decode(item['png'])
html = item['html']
2020-03-18 21:14:48 +01:00
last_redirect = item['last_redirected_url']
with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as _har:
json.dump(harfile, _har)
with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as _img:
_img.write(png)
with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as _html:
_html.write(html)
2020-03-18 21:14:48 +01:00
with (dirpath / '{0:0{width}}.last_redirect.txt'.format(i, width=width)).open('w') as _redir:
_redir.write(last_redirect)
if 'childFrames' in item:
child_frames = item['childFrames']
with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as _iframes:
json.dump(child_frames, _iframes)
if 'cookies' in item:
cookies = item['cookies']
with (dirpath / '{0:0{width}}.cookies.json'.format(i, width=width)).open('w') as _cookies:
json.dump(cookies, _cookies)
self._set_capture_cache(dirpath)
2019-01-30 14:30:01 +01:00
return perma_uuid
2020-06-04 18:23:36 +02:00
def get_body_hash_investigator(self, body_hash: str, /) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
2021-01-12 17:22:51 +01:00
'''Returns all the captures related to a hash (sha512), used in the web interface.'''
total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1)
2021-03-18 18:47:54 +01:00
cached_captures = self.sorted_capture_cache([d[0] for d in details])
captures = [(cache.uuid, cache.title) for cache in cached_captures]
domains = self.indexing.get_body_hash_domains(body_hash)
return captures, domains
def get_body_hash_full(self, body_hash: str, /) -> Tuple[Dict[str, List[Dict[str, str]]], BytesIO]:
2021-01-12 17:22:51 +01:00
'''Returns a lot of information about the hash (sha512) and the hits in the instance.
Also contains the data (base64 encoded)'''
2020-10-23 20:51:15 +02:00
details = self.indexing.get_body_hash_urls(body_hash)
body_content = BytesIO()
# get the body from the first entry in the details list
for url, entries in details.items():
2021-01-12 17:22:51 +01:00
ct = self.get_crawled_tree(entries[0]['capture'])
2020-10-23 20:51:15 +02:00
urlnode = ct.root_hartree.get_url_node_by_uuid(entries[0]['urlnode'])
if urlnode.body_hash == body_hash:
# the hash we're looking for is the whole file
body_content = urlnode.body
else:
# The hash is an embedded resource
for mimetype, blobs in urlnode.body_hash.embedded_ressources.items():
for h, b in blobs:
if h == body_hash:
body_content = b
break
break
return details, body_content
def get_latest_url_capture(self, url: str, /) -> Optional[CaptureCache]:
'''Get the most recent capture with this URL'''
captures = self.sorted_capture_cache(self.indexing.get_captures_url(url))
if captures:
return captures[0]
return None
def get_url_occurrences(self, url: str, /, limit: int=20) -> List[Dict]:
2021-03-16 13:35:59 +01:00
'''Get the most recent captures and URL nodes where the URL has been seen.'''
2021-03-18 18:47:54 +01:00
captures = self.sorted_capture_cache(self.indexing.get_captures_url(url))
2021-03-16 13:35:59 +01:00
to_return: List[Dict] = []
for capture in captures[:limit]:
ct = self.get_crawled_tree(capture.uuid)
to_append: Dict[str, Union[str, Dict]] = {'capture_uuid': capture.uuid,
'start_timestamp': capture.timestamp.isoformat(),
'title': capture.title}
2021-03-16 13:35:59 +01:00
urlnodes: Dict[str, Dict[str, str]] = {}
2020-10-27 00:02:18 +01:00
for urlnode in ct.root_hartree.url_tree.search_nodes(name=url):
2021-03-16 13:35:59 +01:00
urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
'hostnode_uuid': urlnode.hostnode_uuid}
2020-10-27 00:02:18 +01:00
if hasattr(urlnode, 'body_hash'):
2021-03-16 13:35:59 +01:00
urlnodes[urlnode.uuid]['hash'] = urlnode.body_hash
to_append['urlnodes'] = urlnodes
to_return.append(to_append)
2020-10-27 00:02:18 +01:00
return to_return
def get_hostname_occurrences(self, hostname: str, /, with_urls_occurrences: bool=False, limit: int=20) -> List[Dict]:
2021-03-16 13:35:59 +01:00
'''Get the most recent captures and URL nodes where the hostname has been seen.'''
2021-03-18 18:47:54 +01:00
captures = self.sorted_capture_cache(self.indexing.get_captures_hostname(hostname))
2021-03-16 13:35:59 +01:00
to_return: List[Dict] = []
for capture in captures[:limit]:
ct = self.get_crawled_tree(capture.uuid)
to_append: Dict[str, Union[str, List, Dict]] = {'capture_uuid': capture.uuid,
'start_timestamp': capture.timestamp.isoformat(),
'title': capture.title}
2021-03-16 13:35:59 +01:00
hostnodes: List[str] = []
2020-10-27 01:52:28 +01:00
if with_urls_occurrences:
2021-03-16 13:35:59 +01:00
urlnodes: Dict[str, Dict[str, str]] = {}
2020-10-27 01:42:00 +01:00
for hostnode in ct.root_hartree.hostname_tree.search_nodes(name=hostname):
2021-03-16 13:35:59 +01:00
hostnodes.append(hostnode.uuid)
2020-10-27 01:42:00 +01:00
if with_urls_occurrences:
for urlnode in hostnode.urls:
2021-03-16 13:35:59 +01:00
urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
'url': urlnode.name,
'hostnode_uuid': urlnode.hostnode_uuid}
2020-10-27 01:42:00 +01:00
if hasattr(urlnode, 'body_hash'):
2021-03-16 13:35:59 +01:00
urlnodes[urlnode.uuid]['hash'] = urlnode.body_hash
to_append['hostnodes'] = hostnodes
if with_urls_occurrences:
to_append['urlnodes'] = urlnodes
to_return.append(to_append)
2020-10-27 00:02:18 +01:00
return to_return
def get_cookie_name_investigator(self, cookie_name: str, /) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float, List[Tuple[str, float]]]]]:
2021-01-12 17:22:51 +01:00
'''Returns all the captures related to a cookie name entry, used in the web interface.'''
2021-03-18 18:47:54 +01:00
cached_captures = self.sorted_capture_cache([entry[0] for entry in self.indexing.get_cookies_names_captures(cookie_name)])
captures = [(cache.uuid, cache.title) for cache in cached_captures]
domains = [(domain, freq, self.indexing.cookies_names_domains_values(cookie_name, domain))
for domain, freq in self.indexing.get_cookie_domains(cookie_name)]
return captures, domains
2020-09-01 17:54:54 +02:00
def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Tuple[int, Dict[str, List[Tuple[str, str, str, str, str]]]]:
2021-01-12 17:22:51 +01:00
'''Search all the captures a specific hash was seen.
If a URL is given, it splits the results if the hash is seen on the same URL or an other one.
Capture UUID avoids duplicates on the same capture'''
2020-08-10 20:11:26 +02:00
captures_list: Dict[str, List[Tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []}
2020-09-04 18:40:51 +02:00
total_captures, details = self.indexing.get_body_hash_captures(blob_hash, url, filter_capture_uuid=capture_uuid)
for h_capture_uuid, url_uuid, url_hostname, same_url in details:
cache = self.capture_cache(h_capture_uuid)
if cache:
if same_url:
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname))
else:
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname))
return total_captures, captures_list
def _normalize_known_content(self, h: str, /, known_content: Dict[str, Any], url: URLNode) -> Tuple[Optional[Union[str, List[Any]]], Optional[Tuple[bool, Any]]]:
2021-01-12 17:22:51 +01:00
''' There are a few different sources to figure out known vs. legitimate content,
this method normalize it for the web interface.'''
known: Optional[Union[str, List[Any]]] = None
legitimate: Optional[Tuple[bool, Any]] = None
if h not in known_content:
return known, legitimate
if known_content[h]['type'] in ['generic', 'sanejs']:
known = known_content[h]['details']
elif known_content[h]['type'] == 'legitimate_on_domain':
legit = False
if url.hostname in known_content[h]['details']:
legit = True
legitimate = (legit, known_content[h]['details'])
elif known_content[h]['type'] == 'malicious':
legitimate = (False, known_content[h]['details'])
return known, legitimate
def get_ressource(self, tree_uuid: str, /, urlnode_uuid: str, h: Optional[str]) -> Optional[Tuple[str, BytesIO, str]]:
2021-01-12 17:22:51 +01:00
'''Get a specific resource from a URL node. If a hash s also given, we want an embeded resource'''
2021-04-20 17:32:17 +02:00
try:
url = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid)
except IndexError:
# unable to find the uuid, the cache is probably in a weird state.
return None
if url.empty_response:
return None
if not h or h == url.body_hash:
# we want the body
return url.filename if url.filename else 'file.bin', url.body, url.mimetype
# We want an embedded ressource
if h not in url.resources_hashes:
return None
for mimetype, blobs in url.embedded_ressources.items():
for ressource_h, blob in blobs:
if ressource_h == h:
return 'embedded_ressource.bin', blob, mimetype
return None
def __misp_add_ips_to_URLObject(self, obj: URLObject, hostname_tree: HostNode) -> None:
hosts = obj.get_attributes_by_relation('host')
if hosts:
hostnodes = hostname_tree.search_nodes(name=hosts[0].value)
if hostnodes and hasattr(hostnodes[0], 'resolved_ips'):
obj.add_attributes('ip', *hostnodes[0].resolved_ips)
2021-02-02 15:23:25 +01:00
def __misp_add_vt_to_URLObject(self, obj: MISPObject) -> Optional[MISPObject]:
urls = obj.get_attributes_by_relation('url')
url = urls[0]
self.vt.url_lookup(url.value)
report = self.vt.get_url_lookup(url.value)
if not report:
return None
vt_obj = MISPObject('virustotal-report', standalone=False)
vt_obj.add_attribute('first-submission', value=datetime.fromtimestamp(report['attributes']['first_submission_date']), disable_correlation=True)
vt_obj.add_attribute('last-submission', value=datetime.fromtimestamp(report['attributes']['last_submission_date']), disable_correlation=True)
vt_obj.add_attribute('permalink', value=f"https://www.virustotal.com/gui/url/{report['id']}/detection", disable_correlation=True)
obj.add_reference(vt_obj, 'analysed-with')
return vt_obj
def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> Union[List[MISPEvent], Dict[str, str]]:
2021-01-12 17:22:51 +01:00
'''Export a capture in MISP format. You can POST the return of this method
directly to a MISP instance and it will create an event.'''
cache = self.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
if cache.incomplete_redirects:
2021-01-12 17:22:51 +01:00
self._cache_capture(capture_uuid)
cache = self.capture_cache(capture_uuid)
if not cache:
return {'error': 'UUID missing in cache, try again later.'}
2020-12-08 14:59:34 +01:00
2021-01-12 17:22:51 +01:00
ct = self.get_crawled_tree(capture_uuid)
2020-12-08 14:59:34 +01:00
event = MISPEvent()
event.info = f'Lookyloo Capture ({cache.url})'
2021-01-28 19:28:54 +01:00
lookyloo_link: MISPAttribute = event.add_attribute('link', f'https://{self.public_domain}/tree/{capture_uuid}') # type: ignore
if not self.is_public_instance:
lookyloo_link.distribution = 0
2020-12-08 14:59:34 +01:00
initial_url = URLObject(cache.url)
2021-02-02 12:46:48 +01:00
initial_url.comment = 'Submitted URL'
self.__misp_add_ips_to_URLObject(initial_url, ct.root_hartree.hostname_tree)
2021-02-02 12:46:48 +01:00
redirects: List[URLObject] = []
2021-02-02 12:46:48 +01:00
for nb, url in enumerate(cache.redirects):
if url == cache.url:
continue
obj = URLObject(url)
2021-02-02 12:46:48 +01:00
obj.comment = f'Redirect {nb}'
self.__misp_add_ips_to_URLObject(obj, ct.root_hartree.hostname_tree)
redirects.append(obj)
if redirects:
redirects[-1].comment = f'Last redirect ({nb})'
2020-12-08 14:59:34 +01:00
if redirects:
2021-01-28 18:37:44 +01:00
prec_object = initial_url
for u_object in redirects:
prec_object.add_reference(u_object, 'redirects-to')
prec_object = u_object
2020-12-08 14:59:34 +01:00
2021-01-28 18:37:44 +01:00
initial_obj = event.add_object(initial_url)
initial_obj.add_reference(lookyloo_link, 'captured-by', 'Capture on lookyloo')
for u_object in redirects:
event.add_object(u_object)
final_redirect = event.objects[-1]
2021-02-02 15:23:25 +01:00
2021-01-28 19:28:54 +01:00
screenshot: MISPAttribute = event.add_attribute('attachment', 'screenshot_landing_page.png', data=self.get_screenshot(capture_uuid), disable_correlation=True) # type: ignore
2020-12-08 14:59:34 +01:00
try:
fo = FileObject(pseudofile=ct.root_hartree.rendered_node.body, filename=ct.root_hartree.rendered_node.filename)
2020-12-08 14:59:34 +01:00
fo.comment = 'Content received for the final redirect (before rendering)'
fo.add_reference(final_redirect, 'loaded-by', 'URL loading that content')
2021-01-28 18:37:44 +01:00
fo.add_reference(screenshot, 'rendered-as', 'Screenshot of the page')
2020-12-08 14:59:34 +01:00
event.add_object(fo)
except Har2TreeError:
pass
except AttributeError:
# No `body` in rendered node
pass
if self.vt.available:
for e_obj in event.objects:
if e_obj.name != 'url':
continue
vt_obj = self.__misp_add_vt_to_URLObject(e_obj)
if vt_obj:
event.add_object(vt_obj)
if with_parent and cache.parent:
parent = self.misp_export(cache.parent, with_parent)
if isinstance(parent, dict):
# Something bad happened
return parent
event.extends_uuid = parent[-1].uuid
parent.append(event)
return parent
return [event]
def get_hashes(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
2020-10-09 18:05:04 +02:00
"""Return hashes of resources.
Only tree_uuid: All the hashes
tree_uuid and hostnode_uuid: hashes of all the resources in that hostnode (including embedded ressources)
tree_uuid, hostnode_uuid, and urlnode_uuid: hash of the URL node body, and embedded resources
"""
container: Union[CrawledTree, HostNode, URLNode]
if urlnode_uuid:
container = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid)
elif hostnode_uuid:
container = self.get_hostnode_from_tree(tree_uuid, hostnode_uuid)
else:
container = self.get_crawled_tree(tree_uuid)
return get_resources_hashes(container)
def get_hostnode_investigator(self, capture_uuid: str, /, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
2021-01-12 17:22:51 +01:00
'''Gather all the informations needed to display the Hostnode investigator popup.'''
ct = self.get_crawled_tree(capture_uuid)
2020-06-04 18:23:36 +02:00
hostnode = ct.root_hartree.get_host_node_by_uuid(node_uuid)
if not hostnode:
2021-01-12 17:22:51 +01:00
raise MissingUUID(f'Unable to find UUID {node_uuid} in {node_uuid}')
2020-06-04 18:23:36 +02:00
known_content = self.context.find_known_content(hostnode)
2020-06-04 18:23:36 +02:00
urls: List[Dict[str, Any]] = []
for url in hostnode.urls:
# For the popup, we need:
# * https vs http
# * everything after the domain
# * the full URL
to_append: Dict[str, Any] = {
'encrypted': url.name.startswith('https'),
'url_path': url.name.split('/', 3)[-1],
2020-08-24 15:31:53 +02:00
'url_object': url,
2020-06-04 18:23:36 +02:00
}
if not url.empty_response:
# Index lookup
# %%% Full body %%%
freq = self.indexing.body_hash_fequency(url.body_hash)
to_append['body_hash_details'] = freq
2020-07-15 01:35:55 +02:00
if freq and 'hash_freq' in freq and freq['hash_freq'] and freq['hash_freq'] > 1:
to_append['body_hash_details']['other_captures'] = self.hash_lookup(url.body_hash, url.name, capture_uuid)
# %%% Embedded ressources %%%
2020-07-15 01:35:55 +02:00
if hasattr(url, 'embedded_ressources') and url.embedded_ressources:
to_append['embedded_ressources'] = {}
for mimetype, blobs in url.embedded_ressources.items():
for h, blob in blobs:
if h in to_append['embedded_ressources']:
# Skip duplicates
continue
2020-07-11 02:10:56 +02:00
freq_embedded = self.indexing.body_hash_fequency(h)
to_append['embedded_ressources'][h] = freq_embedded
to_append['embedded_ressources'][h]['body_size'] = blob.getbuffer().nbytes
to_append['embedded_ressources'][h]['type'] = mimetype
2020-07-11 02:10:56 +02:00
if freq_embedded['hash_freq'] > 1:
to_append['embedded_ressources'][h]['other_captures'] = self.hash_lookup(h, url.name, capture_uuid)
2020-08-25 18:00:16 +02:00
for h in to_append['embedded_ressources'].keys():
known, legitimate = self._normalize_known_content(h, known_content, url)
if known:
to_append['embedded_ressources'][h]['known_content'] = known
elif legitimate:
to_append['embedded_ressources'][h]['legitimacy'] = legitimate
known, legitimate = self._normalize_known_content(url.body_hash, known_content, url)
if known:
to_append['known_content'] = known
elif legitimate:
to_append['legitimacy'] = legitimate
2020-06-04 18:23:36 +02:00
# Optional: Cookies sent to server in request -> map to nodes who set the cookie in response
if hasattr(url, 'cookies_sent'):
2020-06-11 15:32:43 +02:00
to_display_sent: Dict[str, Set[Iterable[Optional[str]]]] = defaultdict(set)
2020-06-04 18:23:36 +02:00
for cookie, contexts in url.cookies_sent.items():
if not contexts:
2020-06-11 15:13:31 +02:00
# Locally created?
2020-06-11 15:32:43 +02:00
to_display_sent[cookie].add(('Unknown origin', ))
2020-06-04 18:23:36 +02:00
continue
for context in contexts:
2020-06-11 15:32:43 +02:00
to_display_sent[cookie].add((context['setter'].hostname, context['setter'].hostnode_uuid))
to_append['cookies_sent'] = to_display_sent
2020-06-04 18:23:36 +02:00
# Optional: Cookies received from server in response -> map to nodes who send the cookie in request
if hasattr(url, 'cookies_received'):
2020-06-11 15:32:43 +02:00
to_display_received: Dict[str, Dict[str, Set[Iterable[Optional[str]]]]] = {'3rd_party': defaultdict(set), 'sent': defaultdict(set), 'not_sent': defaultdict(set)}
2020-06-04 18:23:36 +02:00
for domain, c_received, is_3rd_party in url.cookies_received:
2020-06-11 15:13:31 +02:00
if c_received not in ct.root_hartree.cookies_sent:
# This cookie is never sent.
if is_3rd_party:
2020-06-11 15:32:43 +02:00
to_display_received['3rd_party'][c_received].add((domain, ))
2020-06-11 15:13:31 +02:00
else:
2020-06-11 15:32:43 +02:00
to_display_received['not_sent'][c_received].add((domain, ))
2020-06-11 15:13:31 +02:00
continue
2020-06-04 18:23:36 +02:00
for url_node in ct.root_hartree.cookies_sent[c_received]:
2020-06-11 15:13:31 +02:00
if is_3rd_party:
2020-06-11 15:32:43 +02:00
to_display_received['3rd_party'][c_received].add((url_node.hostname, url_node.hostnode_uuid))
2020-06-11 15:13:31 +02:00
else:
2020-06-11 15:32:43 +02:00
to_display_received['sent'][c_received].add((url_node.hostname, url_node.hostnode_uuid))
to_append['cookies_received'] = to_display_received
2020-06-04 18:23:36 +02:00
urls.append(to_append)
return hostnode, urls
2020-11-24 16:46:01 +01:00
2020-11-27 16:27:29 +01:00
def get_stats(self) -> Dict[str, List]:
2021-01-12 17:22:51 +01:00
'''Gather statistics about the lookyloo instance'''
2020-11-24 16:46:01 +01:00
today = date.today()
calendar_week = today.isocalendar()[1]
2020-11-27 16:27:29 +01:00
stats_dict = {'submissions': 0, 'submissions_with_redirects': 0, 'redirects': 0}
2020-11-27 16:27:29 +01:00
stats: Dict[int, Dict[int, Dict[str, Any]]] = {}
weeks_stats: Dict[int, Dict] = {}
2021-03-18 18:47:54 +01:00
for cache in self.sorted_capture_cache():
date_submission: datetime = cache.timestamp
if date_submission.year not in stats:
stats[date_submission.year] = {}
if date_submission.month not in stats[date_submission.year]:
stats[date_submission.year][date_submission.month] = defaultdict(dict, **stats_dict)
stats[date_submission.year][date_submission.month]['uniq_urls'] = set()
stats[date_submission.year][date_submission.month]['submissions'] += 1
stats[date_submission.year][date_submission.month]['uniq_urls'].add(cache.url)
if len(cache.redirects) > 0:
stats[date_submission.year][date_submission.month]['submissions_with_redirects'] += 1
stats[date_submission.year][date_submission.month]['redirects'] += len(cache.redirects)
stats[date_submission.year][date_submission.month]['uniq_urls'].update(cache.redirects)
if ((date_submission.year == today.year and calendar_week - 1 <= date_submission.isocalendar()[1] <= calendar_week)
or (calendar_week == 1 and date_submission.year == today.year - 1 and date_submission.isocalendar()[1] in [52, 53])):
if date_submission.isocalendar()[1] not in weeks_stats:
weeks_stats[date_submission.isocalendar()[1]] = defaultdict(dict, **stats_dict)
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'] = set()
weeks_stats[date_submission.isocalendar()[1]]['submissions'] += 1
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'].add(cache.url)
if len(cache.redirects) > 0:
weeks_stats[date_submission.isocalendar()[1]]['submissions_with_redirects'] += 1
weeks_stats[date_submission.isocalendar()[1]]['redirects'] += len(cache.redirects)
weeks_stats[date_submission.isocalendar()[1]]['uniq_urls'].update(cache.redirects)
2020-11-27 16:27:29 +01:00
statistics: Dict[str, List] = {'weeks': [], 'years': []}
for week_number in sorted(weeks_stats.keys()):
week_stat = weeks_stats[week_number]
urls = week_stat.pop('uniq_urls')
week_stat['week_number'] = week_number
week_stat['uniq_urls'] = len(urls)
week_stat['uniq_domains'] = len(uniq_domains(urls))
statistics['weeks'].append(week_stat)
for year in sorted(stats.keys()):
year_stats: Dict[str, Union[int, List]] = {'year': year, 'months': [], 'yearly_submissions': 0, 'yearly_redirects': 0}
2020-11-27 16:27:29 +01:00
for month in sorted(stats[year].keys()):
month_stats = stats[year][month]
urls = month_stats.pop('uniq_urls')
month_stats['month_number'] = month
month_stats['uniq_urls'] = len(urls)
month_stats['uniq_domains'] = len(uniq_domains(urls))
year_stats['months'].append(month_stats) # type: ignore
year_stats['yearly_submissions'] += month_stats['submissions']
2020-11-27 16:27:29 +01:00
year_stats['yearly_redirects'] += month_stats['redirects']
statistics['years'].append(year_stats)
2020-11-24 16:46:01 +01:00
return statistics