chg: Refactoring, add get_hashes

pull/97/head
Raphaël Vinot 2020-10-09 18:05:04 +02:00
parent 0c765005c4
commit 90a9ff9bb5
10 changed files with 879 additions and 789 deletions

View File

@ -3,6 +3,7 @@
from pathlib import Path
import logging
from typing import Optional
from lookyloo.abstractmanager import AbstractManager
from lookyloo.helpers import get_homedir, set_running, unset_running, shutdown_requested
@ -14,7 +15,7 @@ logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
class AsyncScraper(AbstractManager):
def __init__(self, storage_directory: Path=None, loglevel: int=logging.INFO):
def __init__(self, storage_directory: Optional[Path]=None, loglevel: int=logging.INFO):
super().__init__(loglevel)
if not storage_directory:
self.storage_directory = get_homedir() / 'scraped'

View File

@ -5,31 +5,32 @@ from lookyloo.helpers import get_homedir, check_running
from subprocess import Popen
import time
from pathlib import Path
from typing import Optional, List, Union
import argparse
def launch_cache(storage_directory: Path=None):
def launch_cache(storage_directory: Optional[Path]=None):
if not storage_directory:
storage_directory = get_homedir()
if not check_running('cache'):
Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache'))
def shutdown_cache(storage_directory: Path=None):
def shutdown_cache(storage_directory: Optional[Path]=None):
if not storage_directory:
storage_directory = get_homedir()
Popen(["./shutdown_redis.sh"], cwd=(storage_directory / 'cache'))
def launch_indexing(storage_directory: Path=None):
def launch_indexing(storage_directory: Optional[Path]=None):
if not storage_directory:
storage_directory = get_homedir()
if not check_running('indexing'):
Popen(["./run_redis.sh"], cwd=(storage_directory / 'indexing'))
def shutdown_indexing(storage_directory: Path=None):
def shutdown_indexing(storage_directory: Optional[Path]=None):
if not storage_directory:
storage_directory = get_homedir()
Popen(["./shutdown_redis.sh"], cwd=(storage_directory / 'indexing'))
@ -40,12 +41,12 @@ def launch_all():
launch_indexing()
def check_all(stop=False):
backends = [['cache', False], ['indexing', False]]
def check_all(stop: bool=False):
backends: List[List[Union[str, bool]]] = [['cache', False], ['indexing', False]]
while True:
for b in backends:
try:
b[1] = check_running(b[0])
b[1] = check_running(b[0]) # type: ignore
except Exception:
b[1] = False
if stop:

320
lookyloo/context.py Normal file
View File

@ -0,0 +1,320 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
from pathlib import Path
from urllib.parse import urlsplit
from typing import Optional, Union, Set, Any, Dict, List
from redis import Redis
from har2tree import CrawledTree, HostNode, URLNode
from .helpers import (get_config, get_socket_path, load_known_content, get_resources_hashes,
get_homedir, serialize_to_json)
from .modules import SaneJavaScript
class Context():
def __init__(self, sanejs: Optional[SaneJavaScript] = None):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True)
self.sanejs = sanejs
self._cache_known_content()
def clear_context(self):
self.redis.flushdb()
def _cache_known_content(self) -> None:
for dirname in ['known_content', 'known_content_user']:
for filename, file_content in load_known_content(dirname).items():
p = self.redis.pipeline()
if filename == 'generic':
# 1px images, files with spaces, empty => non-relevant stuff
for k, type_content in file_content.items():
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']})
elif filename == 'malicious':
# User defined as malicious
for h, details in file_content.items():
p.sadd('bh|malicious', h)
if 'target' in details and details['target']:
p.sadd(f'{h}|target', *details['target'])
if 'tag' in details and details['tag']:
p.sadd(f'{h}|tag', *details['tag'])
elif filename == 'legitimate':
# User defined as legitimate
for h, details in file_content.items():
if 'domain' in details and details['domain']:
p.sadd(f'bh|{h}|legitimate', *details['domain'])
elif 'description' in details:
p.hset('known_content', h, details['description'])
else:
# Full captures marked as legitimate
for h, details in file_content.items():
p.sadd(f'bh|{h}|legitimate', *details['hostnames'])
p.execute()
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode, str]) -> Dict[str, Any]:
"""Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)"""
if isinstance(har2tree_container, str):
to_lookup: Set[str] = {har2tree_container, }
else:
to_lookup = get_resources_hashes(har2tree_container)
known_content_table: Dict[str, Any] = {}
if not to_lookup:
return known_content_table
# get generic known content
known_in_generic = zip(to_lookup, self.redis.hmget('known_content', to_lookup))
for h, details in known_in_generic:
if not details:
continue
known_content_table[h] = {'type': 'generic', 'details': details}
to_lookup = to_lookup - set(known_content_table.keys())
if not to_lookup:
return known_content_table
# get known malicious
for h in to_lookup:
if self.redis.sismember('bh|malicious', h):
known_content_table[h] = {'type': 'malicious', 'details': {}}
targets = self.redis.smembers(f'{h}|target')
tags = self.redis.smembers(f'{h}|tag')
if targets:
known_content_table[h]['details']['target'] = targets
if tags:
known_content_table[h]['details']['tag'] = tags
to_lookup = to_lookup - set(known_content_table.keys())
if not to_lookup:
return known_content_table
# get known legitimate with domain
for h in to_lookup:
domains = self.redis.smembers(f'bh|{h}|legitimate')
if not domains:
continue
known_content_table[h] = {'type': 'legitimate_on_domain', 'details': domains}
to_lookup = to_lookup - set(known_content_table.keys())
if not to_lookup:
return known_content_table
if to_lookup and self.sanejs and self.sanejs.available:
# Query sanejs on the remaining ones
try:
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
libname, version, path = entry[0].split("|")
known_content_table[h] = {'type': 'sanejs',
'details': (libname, version, path, len(entry))}
except json.decoder.JSONDecodeError as e:
self.logger.warning(f'Something went wring with sanejs: {e}')
return known_content_table
def store_known_legitimate_tree(self, tree: CrawledTree):
known_content = self.find_known_content(tree)
capture_file: Path = get_homedir() / 'known_content_user' / f'{urlsplit(tree.root_url).hostname}.json'
if capture_file.exists():
with open(capture_file) as f:
to_store = json.load(f)
else:
to_store = {}
for urlnode in tree.root_hartree.url_tree.traverse():
for h in urlnode.resources_hashes:
if h in known_content and known_content[h]['type'] != 'malicious':
# when we mark a tree as legitimate, we may get a hash that was marked
# as malicious beforehand but turn out legitimate on that specific domain.
continue
mimetype = ''
if h != urlnode.body_hash:
# this is the hash of an embeded content so it won't have a filename but has a different mimetype
# FIXME: this is ugly.
for ressource_mimetype, blobs in urlnode.embedded_ressources.items():
for ressource_h, b in blobs:
if ressource_h == h:
mimetype = ressource_mimetype.split(';')[0]
break
if mimetype:
break
else:
if urlnode.mimetype:
mimetype = urlnode.mimetype.split(';')[0]
if h not in to_store:
to_store[h] = {'filenames': set(), 'description': '', 'hostnames': set(), 'mimetype': mimetype}
else:
to_store[h]['filenames'] = set(to_store[h]['filenames'])
to_store[h]['hostnames'] = set(to_store[h]['hostnames'])
to_store[h]['hostnames'].add(urlnode.hostname)
if urlnode.url_split.path:
filename = Path(urlnode.url_split.path).name
if filename:
to_store[h]['filenames'].add(filename)
with open(capture_file, 'w') as f:
json.dump(to_store, f, indent=2, default=serialize_to_json)
def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> None:
if hostnode_uuid:
urlnodes = tree.root_hartree.get_host_node_by_uuid(hostnode_uuid).urls
elif urlnode_uuid:
urlnodes = [tree.root_hartree.get_url_node_by_uuid(urlnode_uuid)]
else:
urlnodes = tree.root_hartree.url_tree.traverse()
self.store_known_legitimate_tree(tree)
known_content = self.find_known_content(tree)
pipeline = self.redis.pipeline()
for urlnode in urlnodes:
# Note: we can have multiple hahes on the same urlnode (see embedded resources).
# They are expected to be on the same domain as urlnode. This code work as expected.
for h in urlnode.resources_hashes:
if h in known_content and known_content[h]['type'] != 'malicious':
# when we mark a tree as legitimate, we may get a hash that was marked
# as malicious beforehand but turn out legitimate on that specific domain.
continue
pipeline.sadd(f'bh|{h}|legitimate', urlnode.hostname)
pipeline.execute()
def contextualize_tree(self, tree: CrawledTree) -> CrawledTree:
"""Iterate through all the URL nodes in the tree, add context to Host nodes accordingly
* malicious: At least one URLnode in the Hostnode is marked as malicious
* legitimate: All the URLnodes in the Hostnode are marked as legitimate
* empty: All the the URLnodes in the Hostnode have an empty body in their response
"""
hostnodes_with_malicious_content = set()
known_content = self.find_known_content(tree)
for urlnode in tree.root_hartree.url_tree.traverse():
if urlnode.empty_response:
continue
malicious = self.is_malicious(urlnode, known_content)
if malicious is True:
urlnode.add_feature('malicious', True)
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
elif malicious is False:
# Marked as legitimate
urlnode.add_feature('legitimate', True)
else:
# malicious is None => we cannot say.
pass
for hostnode in tree.root_hartree.hostname_tree.traverse():
if hostnode.uuid in hostnodes_with_malicious_content:
hostnode.add_feature('malicious', True)
elif all(urlnode.empty_response for urlnode in hostnode.urls):
hostnode.add_feature('all_empty', True)
else:
legit = [True for urlnode in hostnode.urls if hasattr(urlnode, 'legitimate')]
if len(legit) == len(hostnode.urls):
hostnode.add_feature('legitimate', True)
return tree
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname)
def store_known_malicious_ressource(self, ressource_hash: str, details: Dict[str, str]):
known_malicious_ressource_file = get_homedir() / 'known_content_user' / 'malicious.json'
if known_malicious_ressource_file.exists():
with open(known_malicious_ressource_file) as f:
to_store = json.load(f)
else:
to_store = {}
if ressource_hash not in to_store:
to_store[ressource_hash] = {'target': set(), 'tag': set()}
else:
to_store[ressource_hash]['target'] = set(to_store[ressource_hash]['target'])
to_store[ressource_hash]['tag'] = set(to_store[ressource_hash]['tag'])
if 'target' in details:
to_store[ressource_hash]['target'].add(details['target'])
if 'type' in details:
to_store[ressource_hash]['tag'].add(details['type'])
with open(known_malicious_ressource_file, 'w') as f:
json.dump(to_store, f, indent=2, default=serialize_to_json)
def add_malicious(self, ressource_hash: str, details: Dict[str, str]):
self.store_known_malicious_ressource(ressource_hash, details)
p = self.redis.pipeline()
p.sadd('bh|malicious', ressource_hash)
if 'target' in details:
p.sadd(f'{ressource_hash}|target', details['target'])
if 'type' in details:
p.sadd(f'{ressource_hash}|tag', details['type'])
p.execute()
def store_known_legitimate_ressource(self, ressource_hash: str, details: Dict[str, str]):
known_legitimate_ressource_file = get_homedir() / 'known_content_user' / 'legitimate.json'
if known_legitimate_ressource_file.exists():
with open(known_legitimate_ressource_file) as f:
to_store = json.load(f)
else:
to_store = {}
if ressource_hash not in to_store:
to_store[ressource_hash] = {'domain': set(), 'description': ''}
else:
to_store[ressource_hash]['domain'] = set(to_store[ressource_hash]['domain'])
if 'domain' in details:
to_store[ressource_hash]['domain'].add(details['domain'])
if 'description' in details:
to_store[ressource_hash]['description'] = details['description']
with open(known_legitimate_ressource_file, 'w') as f:
json.dump(to_store, f, indent=2, default=serialize_to_json)
def add_legitimate(self, ressource_hash: str, details: Dict[str, str]):
self.store_known_legitimate_ressource(ressource_hash, details)
if 'domain' in details:
self.redis.sadd(f'bh|{ressource_hash}|legitimate', details['domain'])
elif 'description' in details:
# Library
self.redis.hset('known_content', ressource_hash, details['description'])
# Query DB
def is_legitimate(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optional[bool]:
"""
If legitimate if generic, marked as legitimate or known on sanejs, loaded from the right domain
3 cases:
* True if *all* the contents are known legitimate
* False if *any* content is malicious
* None in all other cases
"""
status: List[Optional[bool]] = []
for h in urlnode.resources_hashes:
# Note: we can have multiple hashes on the same urlnode (see embedded resources).
if h not in known_hashes:
# We do not return here, because we want to return False if
# *any* of the contents is malicious
status.append(None) # Unknown
elif known_hashes[h]['type'] == 'malicious':
return False
elif known_hashes[h]['type'] in ['generic', 'sanejs']:
status.append(True)
elif known_hashes[h]['type'] == 'legitimate_on_domain':
if urlnode.hostname in known_hashes[h]['details']:
status.append(True)
else:
return False
if status and all(status):
return True # All the contents are known legitimate
return None
def is_malicious(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optional[bool]:
"""3 cases:
* True if *any* content is malicious
* False if *all* the contents are known legitimate
* None in all other cases
"""
legitimate = self.is_legitimate(urlnode, known_hashes)
if legitimate:
return False
elif legitimate is False:
return True
return None

View File

@ -2,21 +2,22 @@
# -*- coding: utf-8 -*-
import os
import logging
from typing import List, Optional, Dict, Union, Any
from io import BufferedIOBase
from pathlib import Path
from .exceptions import MissingEnv, CreateDirectoryException, ConfigError
from redis import Redis
from redis.exceptions import ConnectionError
from datetime import datetime, timedelta
import time
from glob import glob
import json
import traceback
from urllib.parse import urlparse
import pickle
from har2tree import CrawledTree
from typing import List, Optional, Dict, Union, Any, Set
from io import BufferedIOBase
from pathlib import Path
from datetime import datetime, timedelta
from glob import glob
from urllib.parse import urlparse
from functools import lru_cache
from har2tree import CrawledTree, HostNode, URLNode
from redis import Redis
from redis.exceptions import ConnectionError
from publicsuffix2 import PublicSuffixList, fetch # type: ignore
from bs4 import BeautifulSoup # type: ignore
try:
import cloudscraper # type: ignore
@ -24,10 +25,48 @@ try:
except ImportError:
HAS_CF = False
from .exceptions import MissingEnv, CreateDirectoryException, ConfigError
configs: Dict[str, Dict[str, Any]] = {}
logger = logging.getLogger('Lookyloo - Helpers')
# This method is used in json.dump or json.dumps calls as the default parameter:
# json.dumps(..., default=dump_to_json)
def serialize_to_json(obj: Union[Set]) -> Union[List]:
if isinstance(obj, set):
return list(obj)
def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
if isinstance(har2tree_container, CrawledTree):
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
elif isinstance(har2tree_container, HostNode):
urlnodes = har2tree_container.urls
elif isinstance(har2tree_container, URLNode):
urlnodes = [har2tree_container]
else:
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
all_ressources_hashes: Set[str] = set()
for urlnode in urlnodes:
if hasattr(urlnode, 'resources_hashes'):
all_ressources_hashes.update(urlnode.resources_hashes)
return all_ressources_hashes
@lru_cache
def get_public_suffix_list():
"""Initialize Public Suffix List"""
try:
psl_file = fetch()
psl = PublicSuffixList(psl_file=psl_file)
except Exception:
psl = PublicSuffixList()
return psl
@lru_cache
def get_homedir() -> Path:
if not os.environ.get('LOOKYLOO_HOME'):
# Try to open a .env file in the home directory if it exists.
@ -47,11 +86,13 @@ Run the following command (assuming you run the code from the clonned repository
return Path(os.environ['LOOKYLOO_HOME'])
@lru_cache
def get_email_template() -> str:
with (get_homedir() / 'config' / 'email.tmpl').open() as f:
return f.read()
@lru_cache
def load_configs(path_to_config_files: Optional[Union[str, Path]]=None):
global configs
if configs:
@ -74,6 +115,7 @@ def load_configs(path_to_config_files: Optional[Union[str, Path]]=None):
configs[path.stem] = json.load(_c)
@lru_cache
def get_config(config_type: str, entry: str) -> Any:
"""Get an entry from the given config_type file. Automatic fallback to the sample file"""
global configs

137
lookyloo/indexing.py Normal file
View File

@ -0,0 +1,137 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from urllib.parse import urlsplit
from typing import List, Tuple, Set, Dict, Optional
from redis import Redis
from har2tree import CrawledTree
from .helpers import get_socket_path, get_public_suffix_list
class Indexing():
def __init__(self) -> None:
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), decode_responses=True)
def clear_indexes(self):
self.redis.flushdb()
# ###### Cookies ######
@property
def cookies_names(self) -> List[Tuple[str, float]]:
return self.redis.zrevrange('cookies_names', 0, -1, withscores=True)
def cookies_names_number_domains(self, cookie_name: str) -> int:
return self.redis.zcard(f'cn|{cookie_name}')
def cookies_names_domains_values(self, cookie_name: str, domain: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'cn|{cookie_name}|{domain}', 0, -1, withscores=True)
def get_cookie_domains(self, cookie_name: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'cn|{cookie_name}', 0, -1, withscores=True)
def get_cookies_names_captures(self, cookie_name: str) -> List[Tuple[str, str]]:
return [uuids.split('|')for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')]
def index_cookies_capture(self, crawled_tree: CrawledTree) -> None:
if self.redis.sismember('indexed_cookies', crawled_tree.uuid):
# Do not reindex
return
self.redis.sadd('indexed_cookies', crawled_tree.uuid)
pipeline = self.redis.pipeline()
already_loaded: Set[Tuple[str, str]] = set()
for urlnode in crawled_tree.root_hartree.url_tree.traverse():
if hasattr(urlnode, 'cookies_received'):
for domain, cookie, _ in urlnode.cookies_received:
name, value = cookie.split('=', 1)
if (name, domain) in already_loaded:
# Only add cookie name once / capture
continue
already_loaded.add((name, domain))
pipeline.zincrby('cookies_names', 1, name)
pipeline.zincrby(f'cn|{name}', 1, domain)
pipeline.sadd(f'cn|{name}|captures', f'{crawled_tree.uuid}|{urlnode.uuid}')
pipeline.zincrby(f'cn|{name}|{domain}', 1, value)
pipeline.sadd('lookyloo_domains', domain)
pipeline.sadd(domain, name)
pipeline.execute()
def aggregate_domain_cookies(self):
psl = get_public_suffix_list()
pipeline = self.redis.pipeline()
for cn, cn_freq in self.cookies_names:
for domain, d_freq in self.get_cookie_domains(cn):
tld = psl.get_tld(domain)
main_domain_part = domain.strip(f'.{tld}').split('.')[-1]
pipeline.zincrby('aggregate_domains_cn', cn_freq, f'{main_domain_part}|{cn}')
pipeline.zincrby('aggregate_cn_domains', d_freq, f'{cn}|{main_domain_part}')
pipeline.execute()
aggregate_domains_cn = self.redis.zrevrange('aggregate_domains_cn', 0, -1, withscores=True)
aggregate_cn_domains = self.redis.zrevrange('aggregate_cn_domains', 0, -1, withscores=True)
self.redis.delete('aggregate_domains_cn')
self.redis.delete('aggregate_cn_domains')
return {'domains': aggregate_domains_cn, 'cookies': aggregate_cn_domains}
# ###### Body hashes ######
@property
def ressources(self) -> List[Tuple[str, float]]:
return self.redis.zrevrange('body_hashes', 0, 200, withscores=True)
def ressources_number_domains(self, h: str) -> int:
return self.redis.zcard(f'bh|{h}')
def body_hash_fequency(self, body_hash: str) -> Dict[str, float]:
return {'hash_freq': int(self.redis.zscore('body_hashes', body_hash)),
'hash_domains_freq': int(self.redis.zcard(f'bh|{body_hash}'))}
def index_body_hashes_capture(self, crawled_tree: CrawledTree) -> None:
if self.redis.sismember('indexed_body_hashes', crawled_tree.uuid):
# Do not reindex
return
self.redis.sadd('indexed_body_hashes', crawled_tree.uuid)
pipeline = self.redis.pipeline()
for urlnode in crawled_tree.root_hartree.url_tree.traverse():
for h in urlnode.resources_hashes:
pipeline.zincrby('body_hashes', 1, h)
pipeline.zincrby(f'bh|{h}', 1, urlnode.hostname)
# set of all captures with this hash
pipeline.sadd(f'bh|{h}|captures', crawled_tree.uuid)
# ZSet of all urlnode_UUIDs|full_url
pipeline.zincrby(f'bh|{h}|captures|{crawled_tree.uuid}', 1, f'{urlnode.uuid}|{urlnode.hostnode_uuid}|{urlnode.name}')
pipeline.execute()
def get_hash_uuids(self, body_hash: str) -> Tuple[str, str, str]:
capture_uuid = self.redis.srandmember(f'bh|{body_hash}|captures')
entry = self.redis.zrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, 1)[0]
urlnode_uuid, hostnode_uuid, url = entry.split('|', 2)
return capture_uuid, urlnode_uuid, hostnode_uuid
def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None,
filter_capture_uuid: Optional[str]=None,
limit: int=20) -> Tuple[int, List[Tuple[str, str, str, bool]]]:
to_return: List[Tuple[str, str, str, bool]] = []
all_captures = self.redis.smembers(f'bh|{body_hash}|captures')
len_captures = len(all_captures)
for capture_uuid in list(all_captures)[:limit]:
if capture_uuid == filter_capture_uuid:
# Used to skip hits in current capture
len_captures -= 1
continue
for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1):
url_uuid, hostnode_uuid, url = entry.split('|', 2)
if filter_url:
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, url == filter_url))
else:
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, False))
return len_captures, to_return
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)

View File

@ -4,7 +4,6 @@
import os
import base64
from collections import defaultdict, Counter
from datetime import datetime, date, timedelta
from email.message import EmailMessage
from io import BufferedIOBase, BytesIO
@ -22,470 +21,19 @@ from zipfile import ZipFile
import dns.resolver
import dns.rdatatype
import publicsuffix2 # type: ignore
from defang import refang # type: ignore
from har2tree import CrawledTree, Har2TreeError, HarFile, HostNode, URLNode
from redis import Redis
from scrapysplashwrapper import crawl
from werkzeug.useragents import UserAgent
from .exceptions import NoValidHarFile, MissingUUID
from .helpers import get_homedir, get_socket_path, load_cookies, get_config, safe_create_dir, get_email_template, load_pickle_tree, remove_pickle_tree, load_known_content
from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree,
remove_pickle_tree, get_resources_hashes)
from .modules import VirusTotal, SaneJavaScript, PhishingInitiative
def dump_to_json(obj: Union[Set]) -> Union[List]:
if isinstance(obj, set):
return list(obj)
class Indexing():
def __init__(self) -> None:
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), decode_responses=True)
def clear_indexes(self):
self.redis.flushdb()
# ###### Cookies ######
@property
def cookies_names(self) -> List[Tuple[str, float]]:
return self.redis.zrevrange('cookies_names', 0, -1, withscores=True)
def cookies_names_number_domains(self, cookie_name: str) -> int:
return self.redis.zcard(f'cn|{cookie_name}')
def cookies_names_domains_values(self, cookie_name: str, domain: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'cn|{cookie_name}|{domain}', 0, -1, withscores=True)
def get_cookie_domains(self, cookie_name: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'cn|{cookie_name}', 0, -1, withscores=True)
def get_cookies_names_captures(self, cookie_name: str) -> List[Tuple[str, str]]:
return [uuids.split('|')for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')]
def index_cookies_capture(self, crawled_tree: CrawledTree) -> None:
if self.redis.sismember('indexed_cookies', crawled_tree.uuid):
# Do not reindex
return
self.redis.sadd('indexed_cookies', crawled_tree.uuid)
pipeline = self.redis.pipeline()
already_loaded: Set[Tuple[str, str]] = set()
for urlnode in crawled_tree.root_hartree.url_tree.traverse():
if hasattr(urlnode, 'cookies_received'):
for domain, cookie, _ in urlnode.cookies_received:
name, value = cookie.split('=', 1)
if (name, domain) in already_loaded:
# Only add cookie name once / capture
continue
already_loaded.add((name, domain))
pipeline.zincrby('cookies_names', 1, name)
pipeline.zincrby(f'cn|{name}', 1, domain)
pipeline.sadd(f'cn|{name}|captures', f'{crawled_tree.uuid}|{urlnode.uuid}')
pipeline.zincrby(f'cn|{name}|{domain}', 1, value)
pipeline.sadd('lookyloo_domains', domain)
pipeline.sadd(domain, name)
pipeline.execute()
def aggregate_domain_cookies(self):
psl = publicsuffix2.PublicSuffixList()
pipeline = self.redis.pipeline()
for cn, cn_freq in self.cookies_names:
for domain, d_freq in self.get_cookie_domains(cn):
tld = psl.get_tld(domain)
main_domain_part = domain.strip(f'.{tld}').split('.')[-1]
pipeline.zincrby('aggregate_domains_cn', cn_freq, f'{main_domain_part}|{cn}')
pipeline.zincrby('aggregate_cn_domains', d_freq, f'{cn}|{main_domain_part}')
pipeline.execute()
aggregate_domains_cn = self.redis.zrevrange('aggregate_domains_cn', 0, -1, withscores=True)
aggregate_cn_domains = self.redis.zrevrange('aggregate_cn_domains', 0, -1, withscores=True)
self.redis.delete('aggregate_domains_cn')
self.redis.delete('aggregate_cn_domains')
return {'domains': aggregate_domains_cn, 'cookies': aggregate_cn_domains}
# ###### Body hashes ######
@property
def ressources(self) -> List[Tuple[str, float]]:
return self.redis.zrevrange('body_hashes', 0, 200, withscores=True)
def ressources_number_domains(self, h: str) -> int:
return self.redis.zcard(f'bh|{h}')
def body_hash_fequency(self, body_hash: str) -> Dict[str, float]:
return {'hash_freq': int(self.redis.zscore('body_hashes', body_hash)),
'hash_domains_freq': int(self.redis.zcard(f'bh|{body_hash}'))}
def index_body_hashes_capture(self, crawled_tree: CrawledTree) -> None:
if self.redis.sismember('indexed_body_hashes', crawled_tree.uuid):
# Do not reindex
return
self.redis.sadd('indexed_body_hashes', crawled_tree.uuid)
pipeline = self.redis.pipeline()
for urlnode in crawled_tree.root_hartree.url_tree.traverse():
for h in urlnode.resources_hashes:
pipeline.zincrby('body_hashes', 1, h)
pipeline.zincrby(f'bh|{h}', 1, urlnode.hostname)
# set of all captures with this hash
pipeline.sadd(f'bh|{h}|captures', crawled_tree.uuid)
# ZSet of all urlnode_UUIDs|full_url
pipeline.zincrby(f'bh|{h}|captures|{crawled_tree.uuid}', 1, f'{urlnode.uuid}|{urlnode.hostnode_uuid}|{urlnode.name}')
pipeline.execute()
def get_hash_uuids(self, body_hash: str) -> Tuple[str, str, str]:
capture_uuid = self.redis.srandmember(f'bh|{body_hash}|captures')
entry = self.redis.zrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, 1)[0]
urlnode_uuid, hostnode_uuid, url = entry.split('|', 2)
return capture_uuid, urlnode_uuid, hostnode_uuid
def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None,
filter_capture_uuid: Optional[str]=None,
limit: int=20) -> Tuple[int, List[Tuple[str, str, str, bool]]]:
to_return: List[Tuple[str, str, str, bool]] = []
all_captures = self.redis.smembers(f'bh|{body_hash}|captures')
len_captures = len(all_captures)
for capture_uuid in list(all_captures)[:limit]:
if capture_uuid == filter_capture_uuid:
# Used to skip hits in current capture
len_captures -= 1
continue
for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1):
url_uuid, hostnode_uuid, url = entry.split('|', 2)
if filter_url:
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, url == filter_url))
else:
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, False))
return len_captures, to_return
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)
class Context():
def __init__(self, sanejs: Optional[SaneJavaScript] = None):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True)
self.sanejs = sanejs
self._cache_known_content()
def clear_context(self):
self.redis.flushdb()
def _get_resources_hashes(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
if isinstance(har2tree_container, CrawledTree):
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
elif isinstance(har2tree_container, HostNode):
urlnodes = har2tree_container.urls
elif isinstance(har2tree_container, URLNode):
urlnodes = [har2tree_container]
else:
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
all_ressources_hashes: Set[str] = set()
for urlnode in urlnodes:
if hasattr(urlnode, 'resources_hashes'):
all_ressources_hashes.update(urlnode.resources_hashes)
return all_ressources_hashes
def _cache_known_content(self) -> None:
for dirname in ['known_content', 'known_content_user']:
for filename, file_content in load_known_content(dirname).items():
p = self.redis.pipeline()
if filename == 'generic':
# 1px images, files with spaces, empty => non-relevant stuff
for k, type_content in file_content.items():
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']})
elif filename == 'malicious':
# User defined as malicious
for h, details in file_content.items():
p.sadd('bh|malicious', h)
if 'target' in details and details['target']:
p.sadd(f'{h}|target', *details['target'])
if 'tag' in details and details['tag']:
p.sadd(f'{h}|tag', *details['tag'])
elif filename == 'legitimate':
# User defined as legitimate
for h, details in file_content.items():
if 'domain' in details and details['domain']:
p.sadd(f'bh|{h}|legitimate', *details['domain'])
elif 'description' in details:
p.hset('known_content', h, details['description'])
else:
# Full captures marked as legitimate
for h, details in file_content.items():
p.sadd(f'bh|{h}|legitimate', *details['hostnames'])
p.execute()
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode, str]) -> Dict[str, Any]:
"""Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)"""
if isinstance(har2tree_container, str):
to_lookup: Set[str] = {har2tree_container, }
else:
to_lookup = self._get_resources_hashes(har2tree_container)
known_content_table: Dict[str, Any] = {}
if not to_lookup:
return known_content_table
# get generic known content
known_in_generic = zip(to_lookup, self.redis.hmget('known_content', to_lookup))
for h, details in known_in_generic:
if not details:
continue
known_content_table[h] = {'type': 'generic', 'details': details}
to_lookup = to_lookup - set(known_content_table.keys())
if not to_lookup:
return known_content_table
# get known malicious
for h in to_lookup:
if self.redis.sismember('bh|malicious', h):
known_content_table[h] = {'type': 'malicious', 'details': {}}
targets = self.redis.smembers(f'{h}|target')
tags = self.redis.smembers(f'{h}|tag')
if targets:
known_content_table[h]['details']['target'] = targets
if tags:
known_content_table[h]['details']['tag'] = tags
to_lookup = to_lookup - set(known_content_table.keys())
if not to_lookup:
return known_content_table
# get known legitimate with domain
for h in to_lookup:
domains = self.redis.smembers(f'bh|{h}|legitimate')
if not domains:
continue
known_content_table[h] = {'type': 'legitimate_on_domain', 'details': domains}
to_lookup = to_lookup - set(known_content_table.keys())
if not to_lookup:
return known_content_table
if to_lookup and self.sanejs and self.sanejs.available:
# Query sanejs on the remaining ones
try:
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
libname, version, path = entry[0].split("|")
known_content_table[h] = {'type': 'sanejs',
'details': (libname, version, path, len(entry))}
except json.decoder.JSONDecodeError as e:
self.logger.warning(f'Something went wring with sanejs: {e}')
return known_content_table
def store_known_legitimate_tree(self, tree: CrawledTree):
known_content = self.find_known_content(tree)
capture_file: Path = get_homedir() / 'known_content_user' / f'{urlsplit(tree.root_url).hostname}.json'
if capture_file.exists():
with open(capture_file) as f:
to_store = json.load(f)
else:
to_store = {}
for urlnode in tree.root_hartree.url_tree.traverse():
for h in urlnode.resources_hashes:
if h in known_content and known_content[h]['type'] != 'malicious':
# when we mark a tree as legitimate, we may get a hash that was marked
# as malicious beforehand but turn out legitimate on that specific domain.
continue
mimetype = ''
if h != urlnode.body_hash:
# this is the hash of an embeded content so it won't have a filename but has a different mimetype
# FIXME: this is ugly.
for ressource_mimetype, blobs in urlnode.embedded_ressources.items():
for ressource_h, b in blobs:
if ressource_h == h:
mimetype = ressource_mimetype.split(';')[0]
break
if mimetype:
break
else:
if urlnode.mimetype:
mimetype = urlnode.mimetype.split(';')[0]
if h not in to_store:
to_store[h] = {'filenames': set(), 'description': '', 'hostnames': set(), 'mimetype': mimetype}
else:
to_store[h]['filenames'] = set(to_store[h]['filenames'])
to_store[h]['hostnames'] = set(to_store[h]['hostnames'])
to_store[h]['hostnames'].add(urlnode.hostname)
if urlnode.url_split.path:
filename = Path(urlnode.url_split.path).name
if filename:
to_store[h]['filenames'].add(filename)
with open(capture_file, 'w') as f:
json.dump(to_store, f, indent=2, default=dump_to_json)
def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> None:
if hostnode_uuid:
urlnodes = tree.root_hartree.get_host_node_by_uuid(hostnode_uuid).urls
elif urlnode_uuid:
urlnodes = [tree.root_hartree.get_url_node_by_uuid(urlnode_uuid)]
else:
urlnodes = tree.root_hartree.url_tree.traverse()
self.store_known_legitimate_tree(tree)
known_content = self.find_known_content(tree)
pipeline = self.redis.pipeline()
for urlnode in urlnodes:
# Note: we can have multiple hahes on the same urlnode (see embedded resources).
# They are expected to be on the same domain as urlnode. This code work as expected.
for h in urlnode.resources_hashes:
if h in known_content and known_content[h]['type'] != 'malicious':
# when we mark a tree as legitimate, we may get a hash that was marked
# as malicious beforehand but turn out legitimate on that specific domain.
continue
pipeline.sadd(f'bh|{h}|legitimate', urlnode.hostname)
pipeline.execute()
def contextualize_tree(self, tree: CrawledTree) -> CrawledTree:
"""Iterate through all the URL nodes in the tree, add context to Host nodes accordingly
* malicious: At least one URLnode in the Hostnode is marked as malicious
* legitimate: All the URLnodes in the Hostnode are marked as legitimate
* empty: All the the URLnodes in the Hostnode have an empty body in their response
"""
hostnodes_with_malicious_content = set()
known_content = self.find_known_content(tree)
for urlnode in tree.root_hartree.url_tree.traverse():
if urlnode.empty_response:
continue
malicious = self.is_malicious(urlnode, known_content)
if malicious is True:
urlnode.add_feature('malicious', True)
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
elif malicious is False:
# Marked as legitimate
urlnode.add_feature('legitimate', True)
else:
# malicious is None => we cannot say.
pass
for hostnode in tree.root_hartree.hostname_tree.traverse():
if hostnode.uuid in hostnodes_with_malicious_content:
hostnode.add_feature('malicious', True)
elif all(urlnode.empty_response for urlnode in hostnode.urls):
hostnode.add_feature('all_empty', True)
else:
legit = [True for urlnode in hostnode.urls if hasattr(urlnode, 'legitimate')]
if len(legit) == len(hostnode.urls):
hostnode.add_feature('legitimate', True)
return tree
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname)
def store_known_malicious_ressource(self, ressource_hash: str, details: Dict[str, str]):
known_malicious_ressource_file = get_homedir() / 'known_content_user' / 'malicious.json'
if known_malicious_ressource_file.exists():
with open(known_malicious_ressource_file) as f:
to_store = json.load(f)
else:
to_store = {}
if ressource_hash not in to_store:
to_store[ressource_hash] = {'target': set(), 'tag': set()}
else:
to_store[ressource_hash]['target'] = set(to_store[ressource_hash]['target'])
to_store[ressource_hash]['tag'] = set(to_store[ressource_hash]['tag'])
if 'target' in details:
to_store[ressource_hash]['target'].add(details['target'])
if 'type' in details:
to_store[ressource_hash]['tag'].add(details['type'])
with open(known_malicious_ressource_file, 'w') as f:
json.dump(to_store, f, indent=2, default=dump_to_json)
def add_malicious(self, ressource_hash: str, details: Dict[str, str]):
self.store_known_malicious_ressource(ressource_hash, details)
p = self.redis.pipeline()
p.sadd('bh|malicious', ressource_hash)
if 'target' in details:
p.sadd(f'{ressource_hash}|target', details['target'])
if 'type' in details:
p.sadd(f'{ressource_hash}|tag', details['type'])
p.execute()
def store_known_legitimate_ressource(self, ressource_hash: str, details: Dict[str, str]):
known_legitimate_ressource_file = get_homedir() / 'known_content_user' / 'legitimate.json'
if known_legitimate_ressource_file.exists():
with open(known_legitimate_ressource_file) as f:
to_store = json.load(f)
else:
to_store = {}
if ressource_hash not in to_store:
to_store[ressource_hash] = {'domain': set(), 'description': ''}
else:
to_store[ressource_hash]['domain'] = set(to_store[ressource_hash]['domain'])
if 'domain' in details:
to_store[ressource_hash]['domain'].add(details['domain'])
if 'description' in details:
to_store[ressource_hash]['description'] = details['description']
with open(known_legitimate_ressource_file, 'w') as f:
json.dump(to_store, f, indent=2, default=dump_to_json)
def add_legitimate(self, ressource_hash: str, details: Dict[str, str]):
self.store_known_legitimate_ressource(ressource_hash, details)
if 'domain' in details:
self.redis.sadd(f'bh|{ressource_hash}|legitimate', details['domain'])
elif 'description' in details:
# Library
self.redis.hset('known_content', ressource_hash, details['description'])
# Query DB
def is_legitimate(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optional[bool]:
"""
If legitimate if generic, marked as legitimate or known on sanejs, loaded from the right domain
3 cases:
* True if *all* the contents are known legitimate
* False if *any* content is malicious
* None in all other cases
"""
status: List[Optional[bool]] = []
for h in urlnode.resources_hashes:
# Note: we can have multiple hashes on the same urlnode (see embedded resources).
if h not in known_hashes:
# We do not return here, because we want to return False if
# *any* of the contents is malicious
status.append(None) # Unknown
elif known_hashes[h]['type'] == 'malicious':
return False
elif known_hashes[h]['type'] in ['generic', 'sanejs']:
status.append(True)
elif known_hashes[h]['type'] == 'legitimate_on_domain':
if urlnode.hostname in known_hashes[h]['details']:
status.append(True)
else:
return False
if status and all(status):
return True # All the contents are known legitimate
return None
def is_malicious(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optional[bool]:
"""3 cases:
* True if *any* content is malicious
* False if *all* the contents are known legitimate
* None in all other cases
"""
legitimate = self.is_legitimate(urlnode, known_hashes)
if legitimate:
return False
elif legitimate is False:
return True
return None
from .context import Context
from .indexing import Indexing
class Lookyloo():
@ -1166,6 +714,21 @@ class Lookyloo():
return 'embedded_ressource.bin', blob
return None
def get_hashes(self, tree_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
"""Return hashes of resources.
Only tree_uuid: All the hashes
tree_uuid and hostnode_uuid: hashes of all the resources in that hostnode (including embedded ressources)
tree_uuid, hostnode_uuid, and urlnode_uuid: hash of the URL node body, and embedded resources
"""
container: Union[CrawledTree, HostNode, URLNode]
if urlnode_uuid:
container = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid)
elif hostnode_uuid:
container = self.get_hostnode_from_tree(tree_uuid, hostnode_uuid)
else:
container = self.get_crawled_tree(tree_uuid)
return get_resources_hashes(container)
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
capture_dir = self.lookup_capture_dir(capture_uuid)
if not capture_dir:

21
poetry.lock generated
View File

@ -1203,37 +1203,37 @@ certifi = [
{file = "certifi-2020.6.20.tar.gz", hash = "sha256:5930595817496dd21bb8dc35dad090f1c2cd0adfaf21204bf6732ca5d8ee34d3"},
]
cffi = [
{file = "cffi-1.14.3-2-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:3eeeb0405fd145e714f7633a5173318bd88d8bbfc3dd0a5751f8c4f70ae629bc"},
{file = "cffi-1.14.3-2-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:cb763ceceae04803adcc4e2d80d611ef201c73da32d8f2722e9d0ab0c7f10768"},
{file = "cffi-1.14.3-2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:44f60519595eaca110f248e5017363d751b12782a6f2bd6a7041cba275215f5d"},
{file = "cffi-1.14.3-2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:c53af463f4a40de78c58b8b2710ade243c81cbca641e34debf3396a9640d6ec1"},
{file = "cffi-1.14.3-2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:33c6cdc071ba5cd6d96769c8969a0531be2d08c2628a0143a10a7dcffa9719ca"},
{file = "cffi-1.14.3-2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c11579638288e53fc94ad60022ff1b67865363e730ee41ad5e6f0a17188b327a"},
{file = "cffi-1.14.3-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:485d029815771b9fe4fa7e1c304352fe57df6939afe835dfd0182c7c13d5e92e"},
{file = "cffi-1.14.3-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:3cb3e1b9ec43256c4e0f8d2837267a70b0e1ca8c4f456685508ae6106b1f504c"},
{file = "cffi-1.14.3-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:f0620511387790860b249b9241c2f13c3a80e21a73e0b861a2df24e9d6f56730"},
{file = "cffi-1.14.3-cp27-cp27m-win32.whl", hash = "sha256:005f2bfe11b6745d726dbb07ace4d53f057de66e336ff92d61b8c7e9c8f4777d"},
{file = "cffi-1.14.3-cp27-cp27m-win_amd64.whl", hash = "sha256:2f9674623ca39c9ebe38afa3da402e9326c245f0f5ceff0623dccdac15023e05"},
{file = "cffi-1.14.3-cp27-cp27mu-manylinux1_i686.whl", hash = "sha256:09e96138280241bd355cd585148dec04dbbedb4f46128f340d696eaafc82dd7b"},
{file = "cffi-1.14.3-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:3363e77a6176afb8823b6e06db78c46dbc4c7813b00a41300a4873b6ba63b171"},
{file = "cffi-1.14.3-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:52bf29af05344c95136df71716bb60508bbd217691697b4307dcae681612db9f"},
{file = "cffi-1.14.3-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:0ef488305fdce2580c8b2708f22d7785ae222d9825d3094ab073e22e93dfe51f"},
{file = "cffi-1.14.3-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:0b1ad452cc824665ddc682400b62c9e4f5b64736a2ba99110712fdee5f2505c4"},
{file = "cffi-1.14.3-cp35-cp35m-win32.whl", hash = "sha256:85ba797e1de5b48aa5a8427b6ba62cf69607c18c5d4eb747604b7302f1ec382d"},
{file = "cffi-1.14.3-cp35-cp35m-win_amd64.whl", hash = "sha256:e66399cf0fc07de4dce4f588fc25bfe84a6d1285cc544e67987d22663393926d"},
{file = "cffi-1.14.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:c687778dda01832555e0af205375d649fa47afeaeeb50a201711f9a9573323b8"},
{file = "cffi-1.14.3-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:15f351bed09897fbda218e4db5a3d5c06328862f6198d4fb385f3e14e19decb3"},
{file = "cffi-1.14.3-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:4d7c26bfc1ea9f92084a1d75e11999e97b62d63128bcc90c3624d07813c52808"},
{file = "cffi-1.14.3-cp36-cp36m-manylinux2014_aarch64.whl", hash = "sha256:23e5d2040367322824605bc29ae8ee9175200b92cb5483ac7d466927a9b3d537"},
{file = "cffi-1.14.3-cp36-cp36m-win32.whl", hash = "sha256:a624fae282e81ad2e4871bdb767e2c914d0539708c0f078b5b355258293c98b0"},
{file = "cffi-1.14.3-cp36-cp36m-win_amd64.whl", hash = "sha256:de31b5164d44ef4943db155b3e8e17929707cac1e5bd2f363e67a56e3af4af6e"},
{file = "cffi-1.14.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:03d3d238cc6c636a01cf55b9b2e1b6531a7f2f4103fabb5a744231582e68ecc7"},
{file = "cffi-1.14.3-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:f92cdecb618e5fa4658aeb97d5eb3d2f47aa94ac6477c6daf0f306c5a3b9e6b1"},
{file = "cffi-1.14.3-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:22399ff4870fb4c7ef19fff6eeb20a8bbf15571913c181c78cb361024d574579"},
{file = "cffi-1.14.3-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:f4eae045e6ab2bb54ca279733fe4eb85f1effda392666308250714e01907f394"},
{file = "cffi-1.14.3-cp37-cp37m-win32.whl", hash = "sha256:b0358e6fefc74a16f745afa366acc89f979040e0cbc4eec55ab26ad1f6a9bfbc"},
{file = "cffi-1.14.3-cp37-cp37m-win_amd64.whl", hash = "sha256:6642f15ad963b5092d65aed022d033c77763515fdc07095208f15d3563003869"},
{file = "cffi-1.14.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c2a33558fdbee3df370399fe1712d72464ce39c66436270f3664c03f94971aff"},
{file = "cffi-1.14.3-cp38-cp38-manylinux1_i686.whl", hash = "sha256:2791f68edc5749024b4722500e86303a10d342527e1e3bcac47f35fbd25b764e"},
{file = "cffi-1.14.3-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:529c4ed2e10437c205f38f3691a68be66c39197d01062618c55f74294a4a4828"},
{file = "cffi-1.14.3-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:8f0f1e499e4000c4c347a124fa6a27d37608ced4fe9f7d45070563b7c4c370c9"},
{file = "cffi-1.14.3-cp38-cp38-win32.whl", hash = "sha256:3b8eaf915ddc0709779889c472e553f0d3e8b7bdf62dab764c8921b09bf94522"},
{file = "cffi-1.14.3-cp38-cp38-win_amd64.whl", hash = "sha256:bbd2f4dfee1079f76943767fce837ade3087b578aeb9f69aec7857d5bf25db15"},
{file = "cffi-1.14.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5d9a7dc7cf8b1101af2602fe238911bcc1ac36d239e0a577831f5dac993856e9"},
{file = "cffi-1.14.3-cp39-cp39-manylinux1_i686.whl", hash = "sha256:cc75f58cdaf043fe6a7a6c04b3b5a0e694c6a9e24050967747251fb80d7bce0d"},
{file = "cffi-1.14.3-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:bf39a9e19ce7298f1bd6a9758fa99707e9e5b1ebe5e90f2c3913a47bc548747c"},
{file = "cffi-1.14.3-cp39-cp39-win32.whl", hash = "sha256:d80998ed59176e8cba74028762fbd9b9153b9afc71ea118e63bbf5d4d0f9552b"},
@ -1384,25 +1384,21 @@ lxml = [
{file = "lxml-4.5.2-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:bea760a63ce9bba566c23f726d72b3c0250e2fa2569909e2d83cda1534c79443"},
{file = "lxml-4.5.2-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:c3f511a3c58676147c277eff0224c061dd5a6a8e1373572ac817ac6324f1b1e0"},
{file = "lxml-4.5.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:59daa84aef650b11bccd18f99f64bfe44b9f14a08a28259959d33676554065a1"},
{file = "lxml-4.5.2-cp35-cp35m-manylinux2014_aarch64.whl", hash = "sha256:c9d317efde4bafbc1561509bfa8a23c5cab66c44d49ab5b63ff690f5159b2304"},
{file = "lxml-4.5.2-cp35-cp35m-win32.whl", hash = "sha256:9dc9006dcc47e00a8a6a029eb035c8f696ad38e40a27d073a003d7d1443f5d88"},
{file = "lxml-4.5.2-cp35-cp35m-win_amd64.whl", hash = "sha256:08fc93257dcfe9542c0a6883a25ba4971d78297f63d7a5a26ffa34861ca78730"},
{file = "lxml-4.5.2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:121b665b04083a1e85ff1f5243d4a93aa1aaba281bc12ea334d5a187278ceaf1"},
{file = "lxml-4.5.2-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:5591c4164755778e29e69b86e425880f852464a21c7bb53c7ea453bbe2633bbe"},
{file = "lxml-4.5.2-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:cc411ad324a4486b142c41d9b2b6a722c534096963688d879ea6fa8a35028258"},
{file = "lxml-4.5.2-cp36-cp36m-manylinux2014_aarch64.whl", hash = "sha256:1fa21263c3aba2b76fd7c45713d4428dbcc7644d73dcf0650e9d344e433741b3"},
{file = "lxml-4.5.2-cp36-cp36m-win32.whl", hash = "sha256:786aad2aa20de3dbff21aab86b2fb6a7be68064cbbc0219bde414d3a30aa47ae"},
{file = "lxml-4.5.2-cp36-cp36m-win_amd64.whl", hash = "sha256:e1cacf4796b20865789083252186ce9dc6cc59eca0c2e79cca332bdff24ac481"},
{file = "lxml-4.5.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:80a38b188d20c0524fe8959c8ce770a8fdf0e617c6912d23fc97c68301bb9aba"},
{file = "lxml-4.5.2-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:ecc930ae559ea8a43377e8b60ca6f8d61ac532fc57efb915d899de4a67928efd"},
{file = "lxml-4.5.2-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:a76979f728dd845655026ab991df25d26379a1a8fc1e9e68e25c7eda43004bed"},
{file = "lxml-4.5.2-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:cfd7c5dd3c35c19cec59c63df9571c67c6d6e5c92e0fe63517920e97f61106d1"},
{file = "lxml-4.5.2-cp37-cp37m-win32.whl", hash = "sha256:5a9c8d11aa2c8f8b6043d845927a51eb9102eb558e3f936df494e96393f5fd3e"},
{file = "lxml-4.5.2-cp37-cp37m-win_amd64.whl", hash = "sha256:4b4a111bcf4b9c948e020fd207f915c24a6de3f1adc7682a2d92660eb4e84f1a"},
{file = "lxml-4.5.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:5dd20538a60c4cc9a077d3b715bb42307239fcd25ef1ca7286775f95e9e9a46d"},
{file = "lxml-4.5.2-cp38-cp38-manylinux1_i686.whl", hash = "sha256:2b30aa2bcff8e958cd85d907d5109820b01ac511eae5b460803430a7404e34d7"},
{file = "lxml-4.5.2-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:aa8eba3db3d8761db161003e2d0586608092e217151d7458206e243be5a43843"},
{file = "lxml-4.5.2-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:8f0ec6b9b3832e0bd1d57af41f9238ea7709bbd7271f639024f2fc9d3bb01293"},
{file = "lxml-4.5.2-cp38-cp38-win32.whl", hash = "sha256:107781b213cf7201ec3806555657ccda67b1fccc4261fb889ef7fc56976db81f"},
{file = "lxml-4.5.2-cp38-cp38-win_amd64.whl", hash = "sha256:f161af26f596131b63b236372e4ce40f3167c1b5b5d459b29d2514bd8c9dc9ee"},
{file = "lxml-4.5.2.tar.gz", hash = "sha256:cdc13a1682b2a6241080745b1953719e7fe0850b40a5c71ca574f090a1391df6"},
@ -1435,11 +1431,6 @@ markupsafe = [
{file = "MarkupSafe-1.1.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:ba59edeaa2fc6114428f1637ffff42da1e311e29382d81b339c1817d37ec93c6"},
{file = "MarkupSafe-1.1.1-cp37-cp37m-win32.whl", hash = "sha256:b00c1de48212e4cc9603895652c5c410df699856a2853135b3967591e4beebc2"},
{file = "MarkupSafe-1.1.1-cp37-cp37m-win_amd64.whl", hash = "sha256:9bf40443012702a1d2070043cb6291650a0841ece432556f784f004937f0f32c"},
{file = "MarkupSafe-1.1.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6788b695d50a51edb699cb55e35487e430fa21f1ed838122d722e0ff0ac5ba15"},
{file = "MarkupSafe-1.1.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:cdb132fc825c38e1aeec2c8aa9338310d29d337bebbd7baa06889d09a60a1fa2"},
{file = "MarkupSafe-1.1.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:13d3144e1e340870b25e7b10b98d779608c02016d5184cfb9927a9f10c689f42"},
{file = "MarkupSafe-1.1.1-cp38-cp38-win32.whl", hash = "sha256:596510de112c685489095da617b5bcbbac7dd6384aeebeda4df6025d0256a81b"},
{file = "MarkupSafe-1.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:e8313f01ba26fbbe36c7be1966a7b7424942f670f38e666995b88d012765b9be"},
{file = "MarkupSafe-1.1.1.tar.gz", hash = "sha256:29872e92839765e546828bb7754a68c418d927cd064fd4708fab9fe9c8bb116b"},
]
multidict = [

View File

@ -49,6 +49,8 @@ blur_screenshot = get_config('generic', 'enable_default_blur_screenshot')
logging.basicConfig(level=get_config('generic', 'loglevel'))
# ##### Global methods passed to jinja
# Method to make sizes in bytes human readable
# Source: https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
def sizeof_fmt(num, suffix='B'):
@ -71,6 +73,8 @@ def http_status_description(code: int):
app.jinja_env.globals.update(http_status_description=http_status_description)
# ##### Generic/configuration methods #####
@app.after_request
def after_request(response):
ua = request.headers.get('User-Agent')
@ -90,67 +94,16 @@ def get_pw(username: str) -> Optional[str]:
return None
@app.route('/rebuild_all')
@auth.login_required
def rebuild_all():
lookyloo.rebuild_all()
return redirect(url_for('index'))
# ##### Hostnode level methods #####
@app.route('/tree/<string:tree_uuid>/host/<string:node_uuid>/hashes', methods=['GET'])
def hashes_hostnode(tree_uuid: str, node_uuid: str):
hashes = lookyloo.get_hashes(tree_uuid, hostnode_uuid=node_uuid)
return send_file(BytesIO('\n'.join(hashes).encode()),
mimetype='test/plain', as_attachment=True, attachment_filename='hashes.txt')
@app.route('/rebuild_cache')
@auth.login_required
def rebuild_cache():
lookyloo.rebuild_cache()
return redirect(url_for('index'))
@app.route('/tree/<string:tree_uuid>/rebuild')
@auth.login_required
def rebuild_tree(tree_uuid: str):
try:
lookyloo.remove_pickle(tree_uuid)
return redirect(url_for('tree', tree_uuid=tree_uuid))
except Exception:
return redirect(url_for('index'))
@app.route('/submit', methods=['POST', 'GET'])
def submit():
to_query = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_scrape(to_query)
return Response(perma_uuid, mimetype='text/text')
@app.route('/scrape', methods=['GET', 'POST'])
def scrape_web():
if request.form.get('url'):
# check if the post request has the file part
if 'cookies' in request.files and request.files['cookies'].filename:
cookie_file = request.files['cookies'].stream
else:
cookie_file = None
url = request.form.get('url')
if url:
depth: int = request.form.get('depth') if request.form.get('depth') else 1 # type: ignore
listing: bool = request.form.get('listing') if request.form.get('listing') else False # type: ignore
perma_uuid = lookyloo.scrape(url=url, cookies_pseudofile=cookie_file,
depth=depth, listing=listing,
user_agent=request.form.get('user_agent'),
referer=request.form.get('referer'), # type: ignore
os=request.form.get('os'), browser=request.form.get('browser'))
return redirect(url_for('tree', tree_uuid=perma_uuid))
user_agents: Dict[str, Any] = {}
if get_config('generic', 'use_user_agents_users'):
lookyloo.build_ua_file()
# NOTE: For now, just generate the file, so we have an idea of the size
# user_agents = get_user_agents('own_user_agents')
if not user_agents:
user_agents = get_user_agents()
user_agents.pop('by_frequency')
return render_template('scrape.html', user_agents=user_agents)
@app.route('/tree/<string:tree_uuid>/hostname/<string:node_uuid>/text', methods=['GET'])
@app.route('/tree/<string:tree_uuid>/host/<string:node_uuid>/text', methods=['GET'])
def hostnode_details_text(tree_uuid: str, node_uuid: str):
hostnode = lookyloo.get_hostnode_from_tree(tree_uuid, node_uuid)
urls = []
@ -166,7 +119,7 @@ def hostnode_details_text(tree_uuid: str, node_uuid: str):
as_attachment=True, attachment_filename='file.md')
@app.route('/tree/<string:tree_uuid>/hostname_popup/<string:node_uuid>', methods=['GET'])
@app.route('/tree/<string:tree_uuid>/host/<string:node_uuid>', methods=['GET'])
def hostnode_popup(tree_uuid: str, node_uuid: str):
keys_response = {
'js': "/static/javascript.png",
@ -208,6 +161,317 @@ def hostnode_popup(tree_uuid: str, node_uuid: str):
enable_context_by_users=enable_context_by_users)
# ##### Tree level Methods #####
@app.route('/tree/<string:tree_uuid>/rebuild')
@auth.login_required
def rebuild_tree(tree_uuid: str):
try:
lookyloo.remove_pickle(tree_uuid)
return redirect(url_for('tree', tree_uuid=tree_uuid))
except Exception:
return redirect(url_for('index'))
@app.route('/tree/<string:tree_uuid>/trigger_modules/', defaults={'force': False})
@app.route('/tree/<string:tree_uuid>/trigger_modules/<int:force>', methods=['GET'])
def trigger_modules(tree_uuid: str, force: int):
lookyloo.trigger_modules(tree_uuid, True if force else False)
return redirect(url_for('modules', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>/stats', methods=['GET'])
def stats(tree_uuid: str):
stats = lookyloo.get_statistics(tree_uuid)
return render_template('statistics.html', uuid=tree_uuid, stats=stats)
@app.route('/tree/<string:tree_uuid>/modules', methods=['GET'])
def modules(tree_uuid: str):
modules_responses = lookyloo.get_modules_responses(tree_uuid)
if not modules_responses:
return redirect(url_for('tree', tree_uuid=tree_uuid))
vt_short_result: Dict[str, Dict[str, Any]] = {}
if 'vt' in modules_responses:
# VirusTotal cleanup
vt = modules_responses.pop('vt')
# Get malicious entries
for url, full_report in vt.items():
vt_short_result[url] = {
'permaurl': f'https://www.virustotal.com/gui/url/{full_report["id"]}/detection',
'malicious': []
}
for vendor, result in full_report['attributes']['last_analysis_results'].items():
if result['category'] == 'malicious':
vt_short_result[url]['malicious'].append((vendor, result['result']))
pi_short_result: Dict[str, str] = {}
if 'pi' in modules_responses:
pi = modules_responses.pop('pi')
for url, full_report in pi.items():
if not full_report:
continue
pi_short_result[url] = full_report['results'][0]['tag_label']
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result, pi=pi_short_result)
@app.route('/tree/<string:tree_uuid>/redirects', methods=['GET'])
def redirects(tree_uuid: str):
cache = lookyloo.capture_cache(tree_uuid)
if not cache:
return Response('Not available.', mimetype='text/text')
if not cache['redirects']:
return Response('No redirects.', mimetype='text/text')
if cache['url'] == cache['redirects'][0]:
to_return = BytesIO('\n'.join(cache['redirects']).encode())
else:
to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode())
return send_file(to_return, mimetype='text/text',
as_attachment=True, attachment_filename='redirects.txt')
@app.route('/tree/<string:tree_uuid>/image', methods=['GET'])
def image(tree_uuid: str):
to_return = lookyloo.get_screenshot(tree_uuid)
return send_file(to_return, mimetype='image/png',
as_attachment=True, attachment_filename='image.png')
@app.route('/tree/<string:tree_uuid>/html', methods=['GET'])
def html(tree_uuid: str):
to_return = lookyloo.get_html(tree_uuid)
return send_file(to_return, mimetype='text/html',
as_attachment=True, attachment_filename='page.html')
@app.route('/tree/<string:tree_uuid>/cookies', methods=['GET'])
def cookies(tree_uuid: str):
to_return = lookyloo.get_cookies(tree_uuid)
return send_file(to_return, mimetype='application/json',
as_attachment=True, attachment_filename='cookies.json')
@app.route('/tree/<string:tree_uuid>/hashes', methods=['GET'])
def hashes_tree(tree_uuid: str):
hashes = lookyloo.get_hashes(tree_uuid)
return send_file(BytesIO('\n'.join(hashes).encode()),
mimetype='test/plain', as_attachment=True, attachment_filename='hashes.txt')
@app.route('/tree/<string:tree_uuid>/export', methods=['GET'])
def export(tree_uuid: str):
to_return = lookyloo.get_capture(tree_uuid)
return send_file(to_return, mimetype='application/zip',
as_attachment=True, attachment_filename='capture.zip')
@app.route('/tree/<string:tree_uuid>/hide', methods=['GET'])
@auth.login_required
def hide_capture(tree_uuid: str):
lookyloo.hide_capture(tree_uuid)
return redirect(url_for('tree', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>/cache', methods=['GET'])
def cache_tree(tree_uuid: str):
lookyloo.cache_tree(tree_uuid)
return redirect(url_for('index'))
@app.route('/tree/<string:tree_uuid>/send_mail', methods=['POST', 'GET'])
def send_mail(tree_uuid: str):
email: str = request.form.get('email') if request.form.get('email') else '' # type: ignore
if '@' not in email:
# skip clearly incorrect emails
email = ''
comment: str = request.form.get('comment') if request.form.get('comment') else '' # type: ignore
lookyloo.send_mail(tree_uuid, email, comment)
return redirect(url_for('tree', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>', methods=['GET'])
@app.route('/tree/<string:tree_uuid>/<string:urlnode_uuid>', methods=['GET'])
def tree(tree_uuid: str, urlnode_uuid: Optional[str]=None):
if tree_uuid == 'False':
flash("Unable to process your request. The domain may not exist, or splash isn't started", 'error')
return redirect(url_for('index'))
try:
cache = lookyloo.capture_cache(tree_uuid)
except MissingUUID:
flash(f'Unable to find this UUID ({tree_uuid}). The capture may still be ongoing, try again later.', 'error')
return redirect(url_for('index'))
if not cache:
flash('Invalid cache.', 'error')
return redirect(url_for('index'))
if 'error' in cache:
flash(cache['error'], 'error')
try:
if get_config('generic', 'enable_mail_notification'):
enable_mail_notification = True
else:
enable_mail_notification = False
if get_config('generic', 'enable_context_by_users'):
enable_context_by_users = True
else:
enable_context_by_users = False
tree_json, start_time, user_agent, root_url, meta = lookyloo.load_tree(tree_uuid)
return render_template('tree.html', tree_json=tree_json, start_time=start_time,
user_agent=user_agent, root_url=root_url, tree_uuid=tree_uuid,
meta=meta, enable_mail_notification=enable_mail_notification,
enable_context_by_users=enable_context_by_users,
blur_screenshot=blur_screenshot,
urlnode_uuid=urlnode_uuid, has_redirects=True if cache['redirects'] else False)
except NoValidHarFile as e:
return render_template('error.html', error_message=e)
@app.route('/tree/<string:tree_uuid>/mark_as_legitimate', methods=['POST'])
@auth.login_required
def mark_as_legitimate(tree_uuid: str):
if request.data:
legitimate_entries = request.get_json(force=True)
lookyloo.add_to_legitimate(tree_uuid, **legitimate_entries)
else:
lookyloo.add_to_legitimate(tree_uuid)
return jsonify({'message': 'Legitimate entry added.'})
# ##### helpers #####
def index_generic(show_hidden: bool=False):
titles = []
if time_delta_on_index:
# We want to filter the captures on the index
cut_time = datetime.now() - timedelta(**time_delta_on_index)
else:
cut_time = None # type: ignore
for capture_uuid in lookyloo.capture_uuids:
cached = lookyloo.capture_cache(capture_uuid)
if not cached:
continue
if show_hidden:
if 'no_index' not in cached:
# Only display the hidden ones
continue
elif 'no_index' in cached:
continue
if 'timestamp' not in cached:
# this is a buggy capture, skip
continue
if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time:
continue
titles.append((cached['uuid'], cached['title'], cached['timestamp'], cached['url'],
cached['redirects'], True if cached['incomplete_redirects'] == '1' else False))
titles = sorted(titles, key=lambda x: (x[2], x[3]), reverse=True)
return render_template('index.html', titles=titles)
# ##### Index level methods #####
@app.route('/', methods=['GET'])
def index():
if request.method == 'HEAD':
# Just returns ack if the webserver is running
return 'Ack'
update_user_agents()
return index_generic()
@app.route('/hidden', methods=['GET'])
@auth.login_required
def index_hidden():
return index_generic(show_hidden=True)
@app.route('/cookies', methods=['GET'])
def cookies_lookup():
i = Indexing()
cookies_names = [(name, freq, i.cookies_names_number_domains(name)) for name, freq in i.cookies_names]
return render_template('cookies.html', cookies_names=cookies_names)
@app.route('/ressources', methods=['GET'])
def ressources():
i = Indexing()
ressources = []
for h, freq in i.ressources:
domain_freq = i.ressources_number_domains(h)
context = lookyloo.context.find_known_content(h)
capture_uuid, url_uuid, hostnode_uuid = i.get_hash_uuids(h)
ressources.append((h, freq, domain_freq, context.get(h), capture_uuid, url_uuid, hostnode_uuid))
return render_template('ressources.html', ressources=ressources)
@app.route('/rebuild_all')
@auth.login_required
def rebuild_all():
lookyloo.rebuild_all()
return redirect(url_for('index'))
@app.route('/rebuild_cache')
@auth.login_required
def rebuild_cache():
lookyloo.rebuild_cache()
return redirect(url_for('index'))
@app.route('/submit', methods=['POST', 'GET'])
def submit():
to_query = request.get_json(force=True)
perma_uuid = lookyloo.enqueue_scrape(to_query)
return Response(perma_uuid, mimetype='text/text')
@app.route('/scrape', methods=['GET', 'POST'])
def scrape_web():
if request.form.get('url'):
# check if the post request has the file part
if 'cookies' in request.files and request.files['cookies'].filename:
cookie_file = request.files['cookies'].stream
else:
cookie_file = None
url = request.form.get('url')
if url:
depth: int = request.form.get('depth') if request.form.get('depth') else 1 # type: ignore
listing: bool = request.form.get('listing') if request.form.get('listing') else False # type: ignore
perma_uuid = lookyloo.scrape(url=url, cookies_pseudofile=cookie_file,
depth=depth, listing=listing,
user_agent=request.form.get('user_agent'),
referer=request.form.get('referer'), # type: ignore
os=request.form.get('os'), browser=request.form.get('browser'))
return redirect(url_for('tree', tree_uuid=perma_uuid))
user_agents: Dict[str, Any] = {}
if get_config('generic', 'use_user_agents_users'):
lookyloo.build_ua_file()
# NOTE: For now, just generate the file, so we have an idea of the size
# user_agents = get_user_agents('own_user_agents')
if not user_agents:
user_agents = get_user_agents()
user_agents.pop('by_frequency')
return render_template('scrape.html', user_agents=user_agents)
@app.route('/cookies/<string:cookie_name>', methods=['GET'])
def cookies_name_detail(cookie_name: str):
captures, domains = lookyloo.get_cookie_name_investigator(cookie_name)
return render_template('cookie_name.html', cookie_name=cookie_name, domains=domains, captures=captures)
@app.route('/body_hashes/<string:body_hash>', methods=['GET'])
def body_hash_details(body_hash: str):
captures, domains = lookyloo.get_body_hash_investigator(body_hash)
return render_template('body_hash.html', body_hash=body_hash, domains=domains, captures=captures)
# ##### Methods related to a specific URLNode #####
@app.route('/tree/<string:tree_uuid>/url/<string:node_uuid>/request_cookies', methods=['GET'])
def urlnode_request_cookies(tree_uuid: str, node_uuid: str):
urlnode = lookyloo.get_urlnode_from_tree(tree_uuid, node_uuid)
@ -284,245 +548,16 @@ def get_ressource(tree_uuid: str, node_uuid: str):
as_attachment=True, attachment_filename='file.zip')
@app.route('/tree/<string:tree_uuid>/trigger_modules/', defaults={'force': False})
@app.route('/tree/<string:tree_uuid>/trigger_modules/<int:force>', methods=['GET'])
def trigger_modules(tree_uuid: str, force: int):
lookyloo.trigger_modules(tree_uuid, True if force else False)
return redirect(url_for('modules', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>/url/<string:node_uuid>/hashes', methods=['GET'])
def hashes_urlnode(tree_uuid: str, node_uuid: str):
hashes = lookyloo.get_hashes(tree_uuid, urlnode_uuid=node_uuid)
return send_file(BytesIO('\n'.join(hashes).encode()),
mimetype='test/plain', as_attachment=True, attachment_filename='hashes.txt')
@app.route('/tree/<string:tree_uuid>/stats', methods=['GET'])
def stats(tree_uuid: str):
stats = lookyloo.get_statistics(tree_uuid)
return render_template('statistics.html', uuid=tree_uuid, stats=stats)
@app.route('/tree/<string:tree_uuid>/modules', methods=['GET'])
def modules(tree_uuid: str):
modules_responses = lookyloo.get_modules_responses(tree_uuid)
if not modules_responses:
return redirect(url_for('tree', tree_uuid=tree_uuid))
vt_short_result: Dict[str, Dict[str, Any]] = {}
if 'vt' in modules_responses:
# VirusTotal cleanup
vt = modules_responses.pop('vt')
# Get malicious entries
for url, full_report in vt.items():
vt_short_result[url] = {
'permaurl': f'https://www.virustotal.com/gui/url/{full_report["id"]}/detection',
'malicious': []
}
for vendor, result in full_report['attributes']['last_analysis_results'].items():
if result['category'] == 'malicious':
vt_short_result[url]['malicious'].append((vendor, result['result']))
pi_short_result: Dict[str, str] = {}
if 'pi' in modules_responses:
pi = modules_responses.pop('pi')
for url, full_report in pi.items():
if not full_report:
continue
pi_short_result[url] = full_report['results'][0]['tag_label']
return render_template('modules.html', uuid=tree_uuid, vt=vt_short_result, pi=pi_short_result)
@app.route('/tree/<string:tree_uuid>/image', methods=['GET'])
def image(tree_uuid: str):
to_return = lookyloo.get_screenshot(tree_uuid)
return send_file(to_return, mimetype='image/png',
as_attachment=True, attachment_filename='image.png')
@app.route('/tree/<string:tree_uuid>/html', methods=['GET'])
def html(tree_uuid: str):
to_return = lookyloo.get_html(tree_uuid)
return send_file(to_return, mimetype='text/html',
as_attachment=True, attachment_filename='page.html')
@app.route('/tree/<string:tree_uuid>/cookies', methods=['GET'])
def cookies(tree_uuid: str):
to_return = lookyloo.get_cookies(tree_uuid)
return send_file(to_return, mimetype='application/json',
as_attachment=True, attachment_filename='cookies.json')
@app.route('/tree/<string:tree_uuid>/export', methods=['GET'])
def export(tree_uuid: str):
to_return = lookyloo.get_capture(tree_uuid)
return send_file(to_return, mimetype='application/zip',
as_attachment=True, attachment_filename='capture.zip')
@app.route('/tree/<string:tree_uuid>/hide', methods=['GET'])
@app.route('/tree/<string:tree_uuid>/url/<string:node_uuid>/add_context', methods=['POST'])
@auth.login_required
def hide_capture(tree_uuid: str):
lookyloo.hide_capture(tree_uuid)
return redirect(url_for('tree', tree_uuid=tree_uuid))
@app.route('/redirects/<string:tree_uuid>', methods=['GET'])
def redirects(tree_uuid: str):
cache = lookyloo.capture_cache(tree_uuid)
if not cache:
return Response('Not available.', mimetype='text/text')
if not cache['redirects']:
return Response('No redirects.', mimetype='text/text')
if cache['url'] == cache['redirects'][0]:
to_return = BytesIO('\n'.join(cache['redirects']).encode())
else:
to_return = BytesIO('\n'.join([cache['url']] + cache['redirects']).encode())
return send_file(to_return, mimetype='text/text',
as_attachment=True, attachment_filename='redirects.txt')
@app.route('/cache_tree/<string:tree_uuid>', methods=['GET'])
def cache_tree(tree_uuid: str):
lookyloo.cache_tree(tree_uuid)
return redirect(url_for('index'))
@app.route('/tree/<string:tree_uuid>/send_mail', methods=['POST', 'GET'])
def send_mail(tree_uuid: str):
email: str = request.form.get('email') if request.form.get('email') else '' # type: ignore
if '@' not in email:
# skip clearly incorrect emails
email = ''
comment: str = request.form.get('comment') if request.form.get('comment') else '' # type: ignore
lookyloo.send_mail(tree_uuid, email, comment)
return redirect(url_for('tree', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>', methods=['GET'])
@app.route('/tree/<string:tree_uuid>/<string:urlnode_uuid>', methods=['GET'])
def tree(tree_uuid: str, urlnode_uuid: Optional[str]=None):
if tree_uuid == 'False':
flash("Unable to process your request. The domain may not exist, or splash isn't started", 'error')
return redirect(url_for('index'))
try:
cache = lookyloo.capture_cache(tree_uuid)
except MissingUUID:
flash(f'Unable to find this UUID ({tree_uuid}). The capture may still be ongoing, try again later.', 'error')
return redirect(url_for('index'))
if not cache:
flash('Invalid cache.', 'error')
return redirect(url_for('index'))
if 'error' in cache:
flash(cache['error'], 'error')
try:
if get_config('generic', 'enable_mail_notification'):
enable_mail_notification = True
else:
enable_mail_notification = False
if get_config('generic', 'enable_context_by_users'):
enable_context_by_users = True
else:
enable_context_by_users = False
tree_json, start_time, user_agent, root_url, meta = lookyloo.load_tree(tree_uuid)
return render_template('tree.html', tree_json=tree_json, start_time=start_time,
user_agent=user_agent, root_url=root_url, tree_uuid=tree_uuid,
meta=meta, enable_mail_notification=enable_mail_notification,
enable_context_by_users=enable_context_by_users,
blur_screenshot=blur_screenshot,
urlnode_uuid=urlnode_uuid, has_redirects=True if cache['redirects'] else False)
except NoValidHarFile as e:
return render_template('error.html', error_message=e)
def index_generic(show_hidden: bool=False):
titles = []
if time_delta_on_index:
# We want to filter the captures on the index
cut_time = datetime.now() - timedelta(**time_delta_on_index)
else:
cut_time = None # type: ignore
for capture_uuid in lookyloo.capture_uuids:
cached = lookyloo.capture_cache(capture_uuid)
if not cached:
continue
if show_hidden:
if 'no_index' not in cached:
# Only display the hidden ones
continue
elif 'no_index' in cached:
continue
if 'timestamp' not in cached:
# this is a buggy capture, skip
continue
if cut_time and datetime.fromisoformat(cached['timestamp'][:-1]) < cut_time:
continue
titles.append((cached['uuid'], cached['title'], cached['timestamp'], cached['url'],
cached['redirects'], True if cached['incomplete_redirects'] == '1' else False))
titles = sorted(titles, key=lambda x: (x[2], x[3]), reverse=True)
return render_template('index.html', titles=titles)
@app.route('/', methods=['GET'])
def index():
if request.method == 'HEAD':
# Just returns ack if the webserver is running
return 'Ack'
update_user_agents()
return index_generic()
@app.route('/hidden', methods=['GET'])
@auth.login_required
def index_hidden():
return index_generic(show_hidden=True)
@app.route('/cookies', methods=['GET'])
def cookies_lookup():
i = Indexing()
cookies_names = [(name, freq, i.cookies_names_number_domains(name)) for name, freq in i.cookies_names]
return render_template('cookies.html', cookies_names=cookies_names)
@app.route('/ressources', methods=['GET'])
def ressources():
i = Indexing()
ressources = []
for h, freq in i.ressources:
domain_freq = i.ressources_number_domains(h)
context = lookyloo.context.find_known_content(h)
capture_uuid, url_uuid, hostnode_uuid = i.get_hash_uuids(h)
ressources.append((h, freq, domain_freq, context.get(h), capture_uuid, url_uuid, hostnode_uuid))
return render_template('ressources.html', ressources=ressources)
@app.route('/cookies/<string:cookie_name>', methods=['GET'])
def cookies_name_detail(cookie_name: str):
captures, domains = lookyloo.get_cookie_name_investigator(cookie_name)
return render_template('cookie_name.html', cookie_name=cookie_name, domains=domains, captures=captures)
@app.route('/body_hashes/<string:body_hash>', methods=['GET'])
def body_hash_details(body_hash: str):
captures, domains = lookyloo.get_body_hash_investigator(body_hash)
return render_template('body_hash.html', body_hash=body_hash, domains=domains, captures=captures)
@app.route('/tree/<string:tree_uuid>/mark_as_legitimate', methods=['POST'])
@auth.login_required
def mark_as_legitimate(tree_uuid: str):
if request.data:
legitimate_entries = request.get_json(force=True)
lookyloo.add_to_legitimate(tree_uuid, **legitimate_entries)
else:
lookyloo.add_to_legitimate(tree_uuid)
return jsonify({'message': 'Legitimate entry added.'})
@app.route('/tree/<string:tree_uuid>/add_context/<string:urlnode_uuid>', methods=['POST'])
@auth.login_required
def add_context(tree_uuid: str, urlnode_uuid: str):
def add_context(tree_uuid: str, node_uuid: str):
context_data = request.form
ressource_hash: str = context_data.get('hash_to_contextualize') # type: ignore
hostnode_uuid: str = context_data.get('hostnode_uuid') # type: ignore
@ -544,7 +579,7 @@ def add_context(tree_uuid: str, urlnode_uuid: str):
if context_data.get('legitimate_description'):
legitimate_details['description'] = context_data['legitimate_description']
details['legitimate'] = legitimate_details
lookyloo.add_context(tree_uuid, urlnode_uuid, ressource_hash, legitimate, malicious, details)
lookyloo.add_context(tree_uuid, node_uuid, ressource_hash, legitimate, malicious, details)
if callback_str == 'hostnode_popup':
return redirect(url_for('hostnode_popup', tree_uuid=tree_uuid, node_uuid=hostnode_uuid))
elif callback_str == 'ressources':

View File

@ -69,7 +69,7 @@ function openTreeInNewTab(capture_uuid, hostnode_uuid=null) {
}
function open_hostnode_popup(hostnode_uuid) {
let win = window.open(`/tree/${treeUUID}/hostname_popup/${hostnode_uuid}`, '_blank', 'width=1024,height=768,left=200,top=100');
let win = window.open(`/tree/${treeUUID}/host/${hostnode_uuid}`, '_blank', 'width=1024,height=768,left=200,top=100');
if (win == null) {
alert("The browser didn't allow Lookyloo to open a pop-up. There should be an icon on the right of your URL bar to allow it.");
}

View File

@ -19,7 +19,7 @@
</button>
<div class="collapse" id="context_response_{{ urlnode_uuid }}">
<div class="card card-body">
<form role="form" action="{{ url_for('add_context', tree_uuid=tree_uuid, urlnode_uuid=urlnode_uuid) }}" method=post enctype=multipart/form-data>
<form role="form" action="{{ url_for('add_context', tree_uuid=tree_uuid, node_uuid=urlnode_uuid) }}" method=post enctype=multipart/form-data>
<div class="form-group">
<div class="form-check">
<input class="form-check-input" type="checkbox" name="legitimate" id="legitimate">