chg: Major refactoring, move capture code to external script.

pull/251/head
Raphaël Vinot 2021-08-25 13:36:48 +02:00
parent d8416f0f47
commit bf700e7a7b
6 changed files with 342 additions and 311 deletions

View File

@ -1,10 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
import ipaddress
import json
import logging
import socket
from io import BufferedIOBase
from datetime import datetime
from pathlib import Path
from typing import Union, Dict, Optional, Tuple, List
from urllib.parse import urlsplit
from uuid import uuid4
from defang import refang # type: ignore
from redis import Redis
from scrapysplashwrapper import crawl
from lookyloo.abstractmanager import AbstractManager
from lookyloo.helpers import shutdown_requested
from lookyloo.helpers import (shutdown_requested, splash_status, get_socket_path,
load_cookies, safe_create_dir, get_config, get_splash_url,
get_captures_dir)
from lookyloo.lookyloo import Lookyloo
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
@ -17,10 +34,159 @@ class AsyncCapture(AbstractManager):
super().__init__(loglevel)
self.lookyloo = Lookyloo()
self.script_name = 'async_capture'
self.only_global_lookups: bool = get_config('generic', 'only_global_lookups')
self.capture_dir: Path = get_captures_dir()
self.splash_url: str = get_splash_url()
self.redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
def process_capture_queue(self) -> Union[bool, None]:
'''Process a query from the capture queue'''
if not self.redis.exists('to_capture'):
return None
status, message = splash_status()
if not status:
self.logger.critical(f'Splash is not running, unable to process the capture queue: {message}')
return None
value: Optional[List[Tuple[str, int]]] = self.redis.zpopmax('to_capture') # type: ignore
if not value or not value[0]:
return None
uuid, score = value[0]
queue: Optional[str] = self.redis.get(f'{uuid}_mgmt')
self.redis.sadd('ongoing', uuid)
lazy_cleanup = self.redis.pipeline()
lazy_cleanup.delete(f'{uuid}_mgmt')
if queue:
# queue shouldn't be none, but if it is, just ignore.
lazy_cleanup.zincrby('queues', -1, queue)
to_capture: Dict[str, str] = self.redis.hgetall(uuid)
to_capture['perma_uuid'] = uuid
if 'cookies' in to_capture:
to_capture['cookies_pseudofile'] = to_capture.pop('cookies')
status = self._capture(**to_capture) # type: ignore
lazy_cleanup.srem('ongoing', uuid)
lazy_cleanup.delete(uuid)
# make sure to expire the key if nothing was process for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
if status:
self.logger.info(f'Processed {to_capture["url"]}')
return True
self.logger.warning(f'Unable to capture {to_capture["url"]}')
return False
def _capture(self, url: str, *, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
referer: str='', proxy: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
browser: Optional[str]=None, parent: Optional[str]=None) -> Union[bool, str]:
'''Launch a capture'''
url = url.strip()
url = refang(url)
if not url.startswith('http'):
url = f'http://{url}'
if self.only_global_lookups:
splitted_url = urlsplit(url)
if splitted_url.netloc:
if splitted_url.hostname:
if splitted_url.hostname.split('.')[-1] != 'onion':
try:
ip = socket.gethostbyname(splitted_url.hostname)
except socket.gaierror:
self.logger.info('Name or service not known')
return False
if not ipaddress.ip_address(ip).is_global:
return False
else:
return False
cookies = load_cookies(cookies_pseudofile)
if not user_agent:
# Catch case where the UA is broken on the UI, and the async submission.
ua: str = get_config('generic', 'default_user_agent')
else:
ua = user_agent
if int(depth) > int(get_config('generic', 'max_depth')):
self.logger.warning(f'Not allowed to capture on a depth higher than {get_config("generic", "max_depth")}: {depth}')
depth = int(get_config('generic', 'max_depth'))
if not perma_uuid:
perma_uuid = str(uuid4())
self.logger.info(f'Capturing {url}')
try:
items = crawl(self.splash_url, url, cookies=cookies, depth=depth, user_agent=ua,
referer=referer, proxy=proxy, log_enabled=True, log_level=get_config('generic', 'splash_loglevel'))
except Exception as e:
self.logger.critical(f'Something went terribly wrong when capturing {url}.')
raise e
if not items:
# broken
self.logger.critical(f'Something went terribly wrong when capturing {url}.')
return False
width = len(str(len(items)))
dirpath = self.capture_dir / datetime.now().isoformat()
safe_create_dir(dirpath)
if os or browser:
meta = {}
if os:
meta['os'] = os
if browser:
meta['browser'] = browser
with (dirpath / 'meta').open('w') as _meta:
json.dump(meta, _meta)
# Write UUID
with (dirpath / 'uuid').open('w') as _uuid:
_uuid.write(perma_uuid)
# Write no_index marker (optional)
if not listing:
(dirpath / 'no_index').touch()
# Write parent UUID (optional)
if parent:
with (dirpath / 'parent').open('w') as _parent:
_parent.write(parent)
for i, item in enumerate(items):
if 'error' in item:
with (dirpath / 'error.txt').open('w') as _error:
json.dump(item['error'], _error)
# The capture went fine
harfile = item['har']
png = base64.b64decode(item['png'])
html = item['html']
last_redirect = item['last_redirected_url']
with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as _har:
json.dump(harfile, _har)
with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as _img:
_img.write(png)
with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as _html:
_html.write(html)
with (dirpath / '{0:0{width}}.last_redirect.txt'.format(i, width=width)).open('w') as _redir:
_redir.write(last_redirect)
if 'childFrames' in item:
child_frames = item['childFrames']
with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as _iframes:
json.dump(child_frames, _iframes)
if 'cookies' in item:
cookies = item['cookies']
with (dirpath / '{0:0{width}}.cookies.json'.format(i, width=width)).open('w') as _cookies:
json.dump(cookies, _cookies)
self.redis.hset('lookup_dirs', perma_uuid, str(dirpath))
return perma_uuid
def _to_run_forever(self):
while True:
url = self.lookyloo.process_capture_queue()
url = self.process_capture_queue()
if url is None or shutdown_requested():
break

View File

@ -22,6 +22,9 @@ def main():
print('Start background indexer...')
Popen(['background_indexer'])
print('done.')
print('Start background processing...')
Popen(['processing'])
print('done.')
print('Start website...')
Popen(['start_website'])
print('done.')

View File

@ -7,7 +7,8 @@ import json
import traceback
import pickle
import pkg_resources
from typing import List, Optional, Dict, Union, Any, Set
from typing import List, Optional, Dict, Union, Any, Set, Tuple
from urllib.parse import urljoin
from io import BufferedIOBase
from pathlib import Path
from datetime import datetime, timedelta
@ -19,6 +20,8 @@ from enum import IntEnum, unique
from har2tree import CrawledTree, HostNode, URLNode
from redis import Redis
from redis.exceptions import ConnectionError
import requests
from requests.exceptions import HTTPError
from publicsuffix2 import PublicSuffixList, fetch # type: ignore
from bs4 import BeautifulSoup # type: ignore
from pytaxonomies import Taxonomies
@ -377,3 +380,38 @@ def reload_uuids_index() -> None:
p.delete('lookup_dirs')
p.hset('lookup_dirs', mapping=recent_uuids) # type: ignore
p.execute()
def get_capture_status(capture_uuid: str, /) -> CaptureStatus:
r = Redis(unix_socket_path=get_socket_path('cache'))
if r.zrank('to_capture', capture_uuid) is not None:
return CaptureStatus.QUEUED
elif r.hexists('lookup_dirs', capture_uuid):
return CaptureStatus.DONE
elif r.sismember('ongoing', capture_uuid):
return CaptureStatus.ONGOING
return CaptureStatus.UNKNOWN
@lru_cache(64)
def get_splash_url() -> str:
if os.environ.get('SPLASH_URL_DOCKER'):
# In order to have a working default for the docker image, it is easier to use an environment variable
return os.environ['SPLASH_URL_DOCKER']
else:
return get_config('generic', 'splash_url')
def splash_status() -> Tuple[bool, str]:
try:
splash_status = requests.get(urljoin(get_splash_url(), '_ping'))
splash_status.raise_for_status()
json_status = splash_status.json()
if json_status['status'] == 'ok':
return True, 'Splash is up'
else:
return False, str(json_status)
except HTTPError as http_err:
return False, f'HTTP error occurred: {http_err}'
except Exception as err:
return False, f'Other error occurred: {err}'

View File

@ -1,46 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import base64
from collections import defaultdict
from datetime import datetime, date
from email.message import EmailMessage
from io import BufferedIOBase, BytesIO
import ipaddress
from io import BytesIO
import json
import logging
from pathlib import Path
import pickle
import smtplib
import socket
import sys
from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping, Set, Iterable
from urllib.parse import urlsplit, urljoin
from uuid import uuid4
from zipfile import ZipFile
import operator
import time
from defang import refang # type: ignore
import dns.resolver
import dns.rdatatype
from har2tree import CrawledTree, Har2TreeError, HarFile, HostNode, URLNode
from PIL import Image # type: ignore
from pymisp import MISPEvent, MISPAttribute, MISPObject
from pymisp.tools import URLObject, FileObject
import requests
from requests.exceptions import HTTPError
from redis import Redis, ConnectionPool
from redis.connection import UnixDomainSocketConnection
from scrapysplashwrapper import crawl
from werkzeug.useragents import UserAgent
from .exceptions import NoValidHarFile, MissingUUID, LookylooException, MissingCaptureDirectory
from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree,
from .helpers import (get_homedir, get_socket_path, get_config, get_email_template, load_pickle_tree,
remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains,
CaptureStatus, try_make_file, get_captures_dir)
try_make_file, get_captures_dir, get_splash_url)
from .modules import VirusTotal, SaneJavaScript, PhishingInitiative, MISP, UniversalWhois, UrlScan
from .capturecache import CaptureCache
from .context import Context
@ -60,12 +51,7 @@ class Lookyloo():
self.redis_pool: ConnectionPool = ConnectionPool(connection_class=UnixDomainSocketConnection,
path=get_socket_path('cache'), decode_responses=True)
self.capture_dir: Path = get_captures_dir()
if os.environ.get('SPLASH_URL_DOCKER'):
# In order to have a working default for the docker image, it is easier to use an environment variable
self.splash_url: str = os.environ['SPLASH_URL_DOCKER']
else:
self.splash_url = get_config('generic', 'splash_url')
self.only_global_lookups: bool = get_config('generic', 'only_global_lookups')
self.splash_url: str = get_splash_url()
self._priority = get_config('generic', 'priority')
@ -101,11 +87,6 @@ class Lookyloo():
def redis(self):
return Redis(connection_pool=self.redis_pool)
def cache_user_agents(self, user_agent: str, remote_ip: str) -> None:
'''Cache the useragents of the visitors'''
today = date.today().isoformat()
self.redis.zincrby(f'user_agents|{today}', 1, f'{remote_ip}|{user_agent}')
def _get_capture_dir(self, capture_uuid: str, /) -> Path:
'''Use the cache to get a capture directory from a capture UUID'''
capture_dir: Optional[Union[str, Path]]
@ -135,6 +116,31 @@ class Lookyloo():
def _cache_capture(self, capture_uuid: str, /) -> CrawledTree:
'''Generate the pickle, set the cache, add capture in the indexes'''
def _ensure_meta(capture_dir: Path, tree: CrawledTree) -> None:
'''Make sure the meta file is present, it contains information about the User Agent used for the capture.'''
metafile = capture_dir / 'meta'
if metafile.exists():
return
ua = UserAgent(tree.root_hartree.user_agent)
to_dump = {}
if ua.platform:
to_dump['os'] = ua.platform
if ua.browser:
if ua.version:
to_dump['browser'] = f'{ua.browser} {ua.version}'
else:
to_dump['browser'] = ua.browser
if ua.language:
to_dump['language'] = ua.language
if not to_dump:
# UA not recognized
self.logger.info(f'Unable to recognize the User agent: {ua}')
to_dump['user_agent'] = ua.string
with metafile.open('w') as f:
json.dump(to_dump, f)
capture_dir = self._get_capture_dir(capture_uuid)
har_files = sorted(capture_dir.glob('*.har'))
lock_file = capture_dir / 'lock'
@ -160,7 +166,7 @@ class Lookyloo():
index = True
try:
ct = CrawledTree(har_files, capture_uuid)
self._ensure_meta(capture_dir, ct)
_ensure_meta(capture_dir, ct)
self._resolve_dns(ct)
self.context.contextualize_tree(ct)
cache = self.capture_cache(capture_uuid)
@ -271,26 +277,27 @@ class Lookyloo():
self._captures_index.pop(uuid, None)
return cache
def _build_cname_chain(self, known_cnames: Dict[str, Optional[str]], hostname) -> List[str]:
'''Returns a list of CNAMEs starting from one hostname.
The CNAMEs resolutions are made in `_resolve_dns`. A hostname can have a CNAME entry
and the CNAME entry can have an other CNAME entry, and so on multiple times.
This method loops over the hostnames until there are no CNAMES.'''
cnames: List[str] = []
to_search = hostname
while True:
if known_cnames.get(to_search) is None:
break
# At this point, known_cnames[to_search] must exist and be a str
cnames.append(known_cnames[to_search]) # type: ignore
to_search = known_cnames[to_search]
return cnames
def _resolve_dns(self, ct: CrawledTree):
'''Resolves all domains of the tree, keeps A (IPv4), AAAA (IPv6), and CNAME entries
and store them in ips.json and cnames.json, in the capture directory.
Updates the nodes of the tree accordingly so the information is available.
'''
def _build_cname_chain(known_cnames: Dict[str, Optional[str]], hostname) -> List[str]:
'''Returns a list of CNAMEs starting from one hostname.
The CNAMEs resolutions are made in `_resolve_dns`. A hostname can have a CNAME entry
and the CNAME entry can have an other CNAME entry, and so on multiple times.
This method loops over the hostnames until there are no CNAMES.'''
cnames: List[str] = []
to_search = hostname
while True:
if known_cnames.get(to_search) is None:
break
# At this point, known_cnames[to_search] must exist and be a str
cnames.append(known_cnames[to_search]) # type: ignore
to_search = known_cnames[to_search]
return cnames
cnames_path = ct.root_hartree.har.path.parent / 'cnames.json'
ips_path = ct.root_hartree.har.path.parent / 'ips.json'
host_cnames: Dict[str, Optional[str]] = {}
@ -319,7 +326,7 @@ class Lookyloo():
except Exception:
host_cnames[node.name] = None
host_ips[node.name] = []
cnames = self._build_cname_chain(host_cnames, node.name)
cnames = _build_cname_chain(host_cnames, node.name)
if cnames:
node.add_feature('cname', cnames)
if cnames[-1] in host_ips:
@ -494,24 +501,6 @@ class Lookyloo():
to_return['urlscan']['result'] = result
return to_return
def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]:
if not self.misp.available:
return None
try:
ct = self.get_crawled_tree(capture_uuid)
except LookylooException:
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
return None
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
to_return: Dict[str, Set[str]] = defaultdict(set)
for node in nodes_to_lookup:
hits = self.misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
for event_id, values in hits.items():
if not isinstance(values, set):
continue
to_return[event_id].update(values)
return to_return
def hide_capture(self, capture_uuid: str, /) -> None:
"""Add the capture in the hidden pool (not shown on the front page)
NOTE: it won't remove the correlations until they are rebuilt.
@ -594,31 +583,23 @@ class Lookyloo():
raise NoValidHarFile(f'Unable to get tree from {capture_dir}')
return ct
def get_capture_status(self, capture_uuid: str, /) -> CaptureStatus:
redis = self.redis # use a single connection
if redis.zrank('to_capture', capture_uuid) is not None:
return CaptureStatus.QUEUED
elif redis.hexists('lookup_dirs', capture_uuid):
return CaptureStatus.DONE
elif redis.sismember('ongoing', capture_uuid):
return CaptureStatus.ONGOING
return CaptureStatus.UNKNOWN
def _get_priority(self, source: str, user: str, authenticated: bool) -> int:
src_prio: int = self._priority['sources'][source] if source in self._priority['sources'] else -1
if not authenticated:
usr_prio = self._priority['users']['_default_anon']
# reduce priority for anonymous users making lots of captures
queue_size = self.redis.zscore('queues', f'{source}|{authenticated}|{user}')
if queue_size is None:
queue_size = 0
usr_prio -= int(queue_size / 10)
else:
usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth']
return src_prio + usr_prio
def enqueue_capture(self, query: MutableMapping[str, Any], source: str, user: str, authenticated: bool) -> str:
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
def _get_priority(source: str, user: str, authenticated: bool) -> int:
src_prio: int = self._priority['sources'][source] if source in self._priority['sources'] else -1
if not authenticated:
usr_prio = self._priority['users']['_default_anon']
# reduce priority for anonymous users making lots of captures
queue_size = self.redis.zscore('queues', f'{source}|{authenticated}|{user}')
if queue_size is None:
queue_size = 0
usr_prio -= int(queue_size / 10)
else:
usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth']
return src_prio + usr_prio
priority = _get_priority(source, user, authenticated)
perma_uuid = str(uuid4())
p = self.redis.pipeline()
for key, value in query.items():
@ -627,53 +608,16 @@ class Lookyloo():
query[key] = 1 if value else ''
if isinstance(value, list):
query[key] = json.dumps(value)
if priority < -10:
# Someone is probably abusing the system with useless URLs, remove them from the index
query['listing'] = 0
p.hmset(perma_uuid, query)
priority = self._get_priority(source, user, authenticated)
p.zadd('to_capture', {perma_uuid: priority})
p.zincrby('queues', 1, f'{source}|{authenticated}|{user}')
p.set(f'{perma_uuid}_mgmt', f'{source}|{authenticated}|{user}')
p.execute()
return perma_uuid
def process_capture_queue(self) -> Union[bool, None]:
'''Process a query from the capture queue'''
redis = self.redis # use a single connection
if not redis.exists('to_capture'):
return None
status, message = self.splash_status()
if not status:
self.logger.critical(f'Splash is not running, unable to process the capture queue: {message}')
return None
value = redis.zpopmax('to_capture')
if not value or not value[0]:
return None
uuid, score = value[0]
queue: str = redis.get(f'{uuid}_mgmt')
redis.sadd('ongoing', uuid)
lazy_cleanup = redis.pipeline()
lazy_cleanup.delete(f'{uuid}_mgmt')
lazy_cleanup.zincrby('queues', -1, queue)
to_capture: Dict[str, Union[str, int, float]] = redis.hgetall(uuid)
to_capture['perma_uuid'] = uuid
if 'cookies' in to_capture:
to_capture['cookies_pseudofile'] = to_capture.pop('cookies')
status = self._capture(**to_capture) # type: ignore
lazy_cleanup.srem('ongoing', uuid)
lazy_cleanup.delete(uuid)
# make sure to expire the key if nothing was process for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
if status:
self.logger.info(f'Processed {to_capture["url"]}')
return True
self.logger.warning(f'Unable to capture {to_capture["url"]}')
return False
def send_mail(self, capture_uuid: str, /, email: str='', comment: str='') -> None:
'''Send an email notification regarding a specific capture'''
if not get_config('generic', 'enable_mail_notification'):
@ -716,30 +660,6 @@ class Lookyloo():
self.logger.exception(e)
self.logger.warning(msg.as_string())
def _ensure_meta(self, capture_dir: Path, tree: CrawledTree) -> None:
'''Make sure the meta file is present, it contains information about the User Agent used for the capture.'''
metafile = capture_dir / 'meta'
if metafile.exists():
return
ua = UserAgent(tree.root_hartree.user_agent)
to_dump = {}
if ua.platform:
to_dump['os'] = ua.platform
if ua.browser:
if ua.version:
to_dump['browser'] = f'{ua.browser} {ua.version}'
else:
to_dump['browser'] = ua.browser
if ua.language:
to_dump['language'] = ua.language
if not to_dump:
# UA not recognized
self.logger.info(f'Unable to recognize the User agent: {ua}')
to_dump['user_agent'] = ua.string
with metafile.open('w') as f:
json.dump(to_dump, f)
def _get_raw(self, capture_uuid: str, /, extension: str='*', all_files: bool=True) -> BytesIO:
'''Get file(s) from the capture directory'''
try:
@ -806,125 +726,6 @@ class Lookyloo():
return sorted(set(ct.root_hartree.rendered_node.urls_in_rendered_page)
- set(ct.root_hartree.all_url_requests.keys()))
def splash_status(self) -> Tuple[bool, str]:
try:
splash_status = requests.get(urljoin(self.splash_url, '_ping'))
splash_status.raise_for_status()
json_status = splash_status.json()
if json_status['status'] == 'ok':
return True, 'Splash is up'
else:
return False, str(json_status)
except HTTPError as http_err:
return False, f'HTTP error occurred: {http_err}'
except Exception as err:
return False, f'Other error occurred: {err}'
def _capture(self, url: str, *, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
referer: str='', proxy: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
browser: Optional[str]=None, parent: Optional[str]=None) -> Union[bool, str]:
'''Launch a capture'''
url = url.strip()
url = refang(url)
if not url.startswith('http'):
url = f'http://{url}'
if self.only_global_lookups:
splitted_url = urlsplit(url)
if splitted_url.netloc:
if splitted_url.hostname:
if splitted_url.hostname.split('.')[-1] != 'onion':
try:
ip = socket.gethostbyname(splitted_url.hostname)
except socket.gaierror:
self.logger.info('Name or service not known')
return False
if not ipaddress.ip_address(ip).is_global:
return False
else:
return False
cookies = load_cookies(cookies_pseudofile)
if not user_agent:
# Catch case where the UA is broken on the UI, and the async submission.
ua: str = get_config('generic', 'default_user_agent')
else:
ua = user_agent
if int(depth) > int(get_config('generic', 'max_depth')):
self.logger.warning(f'Not allowed to capture on a depth higher than {get_config("generic", "max_depth")}: {depth}')
depth = int(get_config('generic', 'max_depth'))
if not perma_uuid:
perma_uuid = str(uuid4())
self.logger.info(f'Capturing {url}')
try:
items = crawl(self.splash_url, url, cookies=cookies, depth=depth, user_agent=ua,
referer=referer, proxy=proxy, log_enabled=True, log_level=get_config('generic', 'splash_loglevel'))
except Exception as e:
self.logger.critical(f'Something went terribly wrong when capturing {url}.')
raise e
if not items:
# broken
self.logger.critical(f'Something went terribly wrong when capturing {url}.')
return False
width = len(str(len(items)))
dirpath = self.capture_dir / datetime.now().isoformat()
safe_create_dir(dirpath)
if os or browser:
meta = {}
if os:
meta['os'] = os
if browser:
meta['browser'] = browser
with (dirpath / 'meta').open('w') as _meta:
json.dump(meta, _meta)
# Write UUID
with (dirpath / 'uuid').open('w') as _uuid:
_uuid.write(perma_uuid)
# Write no_index marker (optional)
if not listing:
(dirpath / 'no_index').touch()
# Write parent UUID (optional)
if parent:
with (dirpath / 'parent').open('w') as _parent:
_parent.write(parent)
for i, item in enumerate(items):
if 'error' in item:
with (dirpath / 'error.txt').open('w') as _error:
json.dump(item['error'], _error)
# The capture went fine
harfile = item['har']
png = base64.b64decode(item['png'])
html = item['html']
last_redirect = item['last_redirected_url']
with (dirpath / '{0:0{width}}.har'.format(i, width=width)).open('w') as _har:
json.dump(harfile, _har)
with (dirpath / '{0:0{width}}.png'.format(i, width=width)).open('wb') as _img:
_img.write(png)
with (dirpath / '{0:0{width}}.html'.format(i, width=width)).open('w') as _html:
_html.write(html)
with (dirpath / '{0:0{width}}.last_redirect.txt'.format(i, width=width)).open('w') as _redir:
_redir.write(last_redirect)
if 'childFrames' in item:
child_frames = item['childFrames']
with (dirpath / '{0:0{width}}.frames.json'.format(i, width=width)).open('w') as _iframes:
json.dump(child_frames, _iframes)
if 'cookies' in item:
cookies = item['cookies']
with (dirpath / '{0:0{width}}.cookies.json'.format(i, width=width)).open('w') as _cookies:
json.dump(cookies, _cookies)
self.redis.hset('lookup_dirs', perma_uuid, str(dirpath))
return perma_uuid
def get_body_hash_investigator(self, body_hash: str, /) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
'''Returns all the captures related to a hash (sha512), used in the web interface.'''
total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1)
@ -1033,26 +834,6 @@ class Lookyloo():
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache.title, cache.timestamp.isoformat(), url_hostname))
return total_captures, captures_list
def _normalize_known_content(self, h: str, /, known_content: Dict[str, Any], url: URLNode) -> Tuple[Optional[Union[str, List[Any]]], Optional[Tuple[bool, Any]]]:
''' There are a few different sources to figure out known vs. legitimate content,
this method normalize it for the web interface.'''
known: Optional[Union[str, List[Any]]] = None
legitimate: Optional[Tuple[bool, Any]] = None
if h not in known_content:
return known, legitimate
if known_content[h]['type'] in ['generic', 'sanejs']:
known = known_content[h]['details']
elif known_content[h]['type'] == 'legitimate_on_domain':
legit = False
if url.hostname in known_content[h]['details']:
legit = True
legitimate = (legit, known_content[h]['details'])
elif known_content[h]['type'] == 'malicious':
legitimate = (False, known_content[h]['details'])
return known, legitimate
def get_ressource(self, tree_uuid: str, /, urlnode_uuid: str, h: Optional[str]) -> Optional[Tuple[str, BytesIO, str]]:
'''Get a specific resource from a URL node. If a hash s also given, we want an embeded resource'''
try:
@ -1178,6 +959,24 @@ class Lookyloo():
return [event]
def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]:
if not self.misp.available:
return None
try:
ct = self.get_crawled_tree(capture_uuid)
except LookylooException:
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
return None
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
to_return: Dict[str, Set[str]] = defaultdict(set)
for node in nodes_to_lookup:
hits = self.misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
for event_id, values in hits.items():
if not isinstance(values, set):
continue
to_return[event_id].update(values)
return to_return
def get_hashes(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
"""Return hashes of resources.
Only tree_uuid: All the hashes
@ -1227,6 +1026,27 @@ class Lookyloo():
def get_hostnode_investigator(self, capture_uuid: str, /, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
'''Gather all the informations needed to display the Hostnode investigator popup.'''
def _normalize_known_content(h: str, /, known_content: Dict[str, Any], url: URLNode) -> Tuple[Optional[Union[str, List[Any]]], Optional[Tuple[bool, Any]]]:
''' There are a few different sources to figure out known vs. legitimate content,
this method normalize it for the web interface.'''
known: Optional[Union[str, List[Any]]] = None
legitimate: Optional[Tuple[bool, Any]] = None
if h not in known_content:
return known, legitimate
if known_content[h]['type'] in ['generic', 'sanejs']:
known = known_content[h]['details']
elif known_content[h]['type'] == 'legitimate_on_domain':
legit = False
if url.hostname in known_content[h]['details']:
legit = True
legitimate = (legit, known_content[h]['details'])
elif known_content[h]['type'] == 'malicious':
legitimate = (False, known_content[h]['details'])
return known, legitimate
ct = self.get_crawled_tree(capture_uuid)
hostnode = ct.root_hartree.get_host_node_by_uuid(node_uuid)
if not hostnode:
@ -1270,13 +1090,13 @@ class Lookyloo():
if freq_embedded['hash_freq'] > 1:
to_append['embedded_ressources'][h]['other_captures'] = self.hash_lookup(h, url.name, capture_uuid)
for h in to_append['embedded_ressources'].keys():
known, legitimate = self._normalize_known_content(h, known_content, url)
known, legitimate = _normalize_known_content(h, known_content, url)
if known:
to_append['embedded_ressources'][h]['known_content'] = known
elif legitimate:
to_append['embedded_ressources'][h]['legitimacy'] = legitimate
known, legitimate = self._normalize_known_content(url.body_hash, known_content, url)
known, legitimate = _normalize_known_content(url.body_hash, known_content, url)
if known:
to_append['known_content'] = known
elif legitimate:

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from io import BytesIO, StringIO
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta, timezone, date
import json
import http
import calendar
@ -21,13 +21,14 @@ from werkzeug.security import check_password_hash
from pymisp import MISPEvent, MISPServerError
from lookyloo.helpers import (get_user_agents, get_config,
get_taxonomies, load_cookies, CaptureStatus)
from lookyloo.helpers import (get_user_agents, get_config, get_taxonomies, load_cookies,
CaptureStatus, splash_status, get_capture_status)
from lookyloo.lookyloo import Lookyloo, Indexing
from lookyloo.exceptions import NoValidHarFile, MissingUUID
from .proxied import ReverseProxied
from .helpers import src_request_ip, User, load_user_from_request, build_users_table, get_secret_key, sri_load
from .helpers import (src_request_ip, User, load_user_from_request, build_users_table,
get_secret_key, sri_load)
app: Flask = Flask(__name__)
app.wsgi_app = ReverseProxied(app.wsgi_app) # type: ignore
@ -157,18 +158,20 @@ app.jinja_env.globals.update(get_sri=get_sri)
@app.after_request
def after_request(response):
# We keep a list user agents in order to build a list to use in the capture
# interface: this is the easiest way to have something up to date.
# The reason we also get the IP address of the client is because we
# count the frequency of each user agents and use it to sort them on the
# capture page, and we want to avoid counting the same user (same IP)
# multiple times in a day.
# The cache of IPs is deleted after the UA file is generated (see lookyloo.build_ua_file),
# once a day.
ua = request.headers.get('User-Agent')
real_ip = src_request_ip(request)
if ua:
lookyloo.cache_user_agents(ua, real_ip)
if use_own_ua:
# We keep a list user agents in order to build a list to use in the capture
# interface: this is the easiest way to have something up to date.
# The reason we also get the IP address of the client is because we
# count the frequency of each user agents and use it to sort them on the
# capture page, and we want to avoid counting the same user (same IP)
# multiple times in a day.
# The cache of IPs is deleted after the UA file is generated once a day.
# See bin/background_processing.py
ua = request.headers.get('User-Agent')
real_ip = src_request_ip(request)
if ua:
today = date.today().isoformat()
lookyloo.redis.zincrby(f'user_agents|{today}', 1, f'{real_ip}|{ua}')
# Opt out of FLoC
response.headers.set('Permissions-Policy', 'interest-cohort=()')
return response
@ -554,8 +557,8 @@ def tree(tree_uuid: str, node_uuid: Optional[str]=None):
try:
cache = lookyloo.capture_cache(tree_uuid)
except MissingUUID:
status = lookyloo.get_capture_status(tree_uuid)
splash_up, splash_message = lookyloo.splash_status()
status = get_capture_status(tree_uuid)
splash_up, splash_message = splash_status()
if not splash_up:
flash(f'The capture module is not reachable ({splash_message}).', 'error')
flash('The request will be enqueued, but capturing may take a while and require the administrator to wake up.', 'error')
@ -809,7 +812,7 @@ def capture_web():
if 'bot' not in ua['useragent'].lower():
default_ua = ua
break
splash_up, message = lookyloo.splash_status()
splash_up, message = splash_status()
if not splash_up:
flash(f'The capture module is not reachable ({message}).', 'error')
flash('The request will be enqueued, but capturing may take a while and require the administrator to wake up.', 'error')

View File

@ -12,7 +12,8 @@ from werkzeug.security import check_password_hash
from lookyloo.lookyloo import Lookyloo
from .helpers import src_request_ip, load_user_from_request, build_users_table
from .helpers import (src_request_ip, load_user_from_request, build_users_table)
from lookyloo.helpers import splash_status, get_capture_status
api = Namespace('GenericAPI', description='Generic Lookyloo API', path='/')
@ -61,7 +62,7 @@ class AuthToken(Resource):
@api.doc(description='Get status of splash.')
class SplashStatus(Resource):
def get(self):
status, info = lookyloo.splash_status()
status, info = splash_status()
return {'is_up': status, 'info': info}
@ -70,7 +71,7 @@ class SplashStatus(Resource):
params={'capture_uuid': 'The UUID of the capture'})
class CaptureStatusQuery(Resource):
def get(self, capture_uuid: str):
return {'status_code': lookyloo.get_capture_status(capture_uuid)}
return {'status_code': get_capture_status(capture_uuid)}
@api.route('/json/<string:capture_uuid>/hostnames')