chg: Cleanup archiver, initialize index captures in start

pull/251/head
Raphaël Vinot 2021-08-24 17:10:14 +02:00
parent ece30a33eb
commit 8433cbcc1b
4 changed files with 61 additions and 55 deletions

View File

@ -8,9 +8,10 @@ import logging
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from pathlib import Path from pathlib import Path
from redis import Redis
from lookyloo.abstractmanager import AbstractManager from lookyloo.abstractmanager import AbstractManager
from lookyloo.lookyloo import Lookyloo from lookyloo.helpers import get_config, get_homedir, get_socket_path, get_captures_dir
from lookyloo.helpers import get_config
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
level=logging.INFO, datefmt='%I:%M:%S') level=logging.INFO, datefmt='%I:%M:%S')
@ -21,17 +22,18 @@ class Archiver(AbstractManager):
def __init__(self, loglevel: int=logging.INFO): def __init__(self, loglevel: int=logging.INFO):
super().__init__(loglevel) super().__init__(loglevel)
self.script_name = 'archiver' self.script_name = 'archiver'
self._load_indexes() self.redis = Redis(unix_socket_path=get_socket_path('cache'))
# make sure archived captures dir exists
self.archived_captures_dir = get_homedir / 'archived_captures'
self.archived_captures_dir.mkdir(parents=True, exist_ok=True)
self._load_archives()
def _to_run_forever(self): def _to_run_forever(self):
self._archive() self._archive()
def _archive(self): def _archive(self):
# Initialize the lookyloo class here, no need to keep it in memory all the time.
lookyloo = Lookyloo()
# make sure archived captures dir exists
archived_captures_dir = lookyloo.capture_dir.parent / 'archived_captures'
archived_captures_dir.mkdir(parents=True, exist_ok=True)
archive_interval = timedelta(days=get_config('generic', 'archive')) archive_interval = timedelta(days=get_config('generic', 'archive'))
cut_time = (datetime.now() - archive_interval).date() cut_time = (datetime.now() - archive_interval).date()
cut_time = cut_time.replace(day=1) cut_time = cut_time.replace(day=1)
@ -39,7 +41,7 @@ class Archiver(AbstractManager):
# Format: # Format:
# { 2020: { 12: [(directory, uuid)] } } # { 2020: { 12: [(directory, uuid)] } }
to_archive: Dict[int, Dict[int, List[Tuple[Path, str]]]] = defaultdict(lambda: defaultdict(list)) to_archive: Dict[int, Dict[int, List[Tuple[Path, str]]]] = defaultdict(lambda: defaultdict(list))
for capture_path in lookyloo.capture_dir.glob('*'): for capture_path in get_captures_dir().glob('*'):
if not capture_path.is_dir(): if not capture_path.is_dir():
continue continue
timestamp = datetime.strptime(capture_path.name, '%Y-%m-%dT%H:%M:%S.%f') timestamp = datetime.strptime(capture_path.name, '%Y-%m-%dT%H:%M:%S.%f')
@ -58,7 +60,7 @@ class Archiver(AbstractManager):
archived_uuids = {} archived_uuids = {}
for year, month_captures in to_archive.items(): for year, month_captures in to_archive.items():
for month, captures in month_captures.items(): for month, captures in month_captures.items():
dest_dir = archived_captures_dir / str(year) / f'{month:02}' dest_dir = self.archived_captures_dir / str(year) / f'{month:02}'
dest_dir.mkdir(parents=True, exist_ok=True) dest_dir.mkdir(parents=True, exist_ok=True)
if (dest_dir / 'index').exists(): if (dest_dir / 'index').exists():
with (dest_dir / 'index').open('r') as _f: with (dest_dir / 'index').open('r') as _f:
@ -75,36 +77,22 @@ class Archiver(AbstractManager):
index_writer.writerow([uuid, dirname]) index_writer.writerow([uuid, dirname])
if archived_uuids: if archived_uuids:
lookyloo.redis.hdel('lookup_dirs', *archived_uuids.keys()) p = self.redis.pipeline()
lookyloo.redis.hset('lookup_dirs_archived', mapping=archived_uuids) p.redis.hdel('lookup_dirs', *archived_uuids.keys())
lookyloo.clear_captures_index_cache(archived_uuids.keys()) p.redis.hset('lookup_dirs_archived', mapping=archived_uuids)
p.execute()
self.logger.info('Archiving done.') self.logger.info('Archiving done.')
def _load_indexes(self): def _load_archives(self):
# Initialize the lookyloo class here, no need to keep it in memory all the time. # Initialize archives
lookyloo = Lookyloo() self.redis.delete('lookup_dirs_archived')
for year in self.archived_captures_dir.iterdir():
# NOTE: Initialize recent
recent_uuids = {}
for uuid_path in sorted(lookyloo.capture_dir.glob('*/uuid'), reverse=True):
with uuid_path.open() as f:
uuid = f.read()
recent_uuids[uuid] = str(uuid_path.parent)
lookyloo.redis.delete('lookup_dirs')
lookyloo.redis.hset('lookup_dirs', mapping=recent_uuids)
# NOTE: Initialize archives
# make sure archived captures dir exists
archived_captures_dir = lookyloo.capture_dir.parent / 'archived_captures'
archived_captures_dir.mkdir(parents=True, exist_ok=True)
lookyloo.redis.delete('lookup_dirs_archived')
for year in archived_captures_dir.iterdir():
for month in year.iterdir(): for month in year.iterdir():
if not (month / 'index').exists(): if not (month / 'index').exists():
continue continue
with (month / 'index').open('r') as _f: with (month / 'index').open('r') as _f:
archived_uuids = {uuid: str(month / dirname) for uuid, dirname in csv.reader(_f)} archived_uuids = {uuid: str(month / dirname) for uuid, dirname in csv.reader(_f)}
lookyloo.redis.hset('lookup_dirs_archived', mapping=archived_uuids) self.redis.hset('lookup_dirs_archived', mapping=archived_uuids)
def main(): def main():

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from subprocess import run, Popen from subprocess import run, Popen
from lookyloo.helpers import get_homedir, get_config from lookyloo.helpers import get_homedir, get_config, reload_uuids_index
def main(): def main():
@ -12,8 +12,11 @@ def main():
p = run(['run_backend', '--start']) p = run(['run_backend', '--start'])
p.check_returncode() p.check_returncode()
print('done.') print('done.')
print('Reload UUIDs index...')
reload_uuids_index()
print('done.')
print('Start asynchronous ingestor...') print('Start asynchronous ingestor...')
for i in range(get_config('generic', 'async_capture_processes')): for _ in range(get_config('generic', 'async_capture_processes')):
Popen(['async_capture']) Popen(['async_capture'])
print('done.') print('done.')
print('Start background indexer...') print('Start background indexer...')

View File

@ -102,6 +102,13 @@ Run the following command (assuming you run the code from the clonned repository
return Path(os.environ['LOOKYLOO_HOME']) return Path(os.environ['LOOKYLOO_HOME'])
@lru_cache(64)
def get_capture_dir() -> Path:
capture_dir = get_homedir() / 'scraped'
safe_create_dir(capture_dir)
return capture_dir
@lru_cache(64) @lru_cache(64)
def get_email_template() -> str: def get_email_template() -> str:
with (get_homedir() / 'config' / 'email.tmpl').open() as f: with (get_homedir() / 'config' / 'email.tmpl').open() as f:
@ -355,3 +362,16 @@ def try_make_file(filename: Path):
def get_useragent_for_requests(): def get_useragent_for_requests():
version = pkg_resources.get_distribution('lookyloo').version version = pkg_resources.get_distribution('lookyloo').version
return f'Lookyloo / {version}' return f'Lookyloo / {version}'
def reload_uuids_index() -> None:
recent_uuids = {}
for uuid_path in sorted(get_capture_dir().glob('*/uuid'), reverse=True):
with uuid_path.open() as f:
uuid = f.read()
recent_uuids[uuid] = str(uuid_path.parent)
r = Redis(unix_socket_path=get_socket_path('cache'))
p = r.pipeline()
p.delete('lookup_dirs')
p.hset('lookup_dirs', mapping=recent_uuids)
p.execute()

View File

@ -40,7 +40,7 @@ from .exceptions import NoValidHarFile, MissingUUID, LookylooException, MissingC
from .helpers import (get_homedir, get_socket_path, load_cookies, get_config, from .helpers import (get_homedir, get_socket_path, load_cookies, get_config,
safe_create_dir, get_email_template, load_pickle_tree, safe_create_dir, get_email_template, load_pickle_tree,
remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains, remove_pickle_tree, get_resources_hashes, get_taxonomies, uniq_domains,
CaptureStatus, try_make_file) CaptureStatus, try_make_file, get_captures_dir)
from .modules import VirusTotal, SaneJavaScript, PhishingInitiative, MISP, UniversalWhois, UrlScan from .modules import VirusTotal, SaneJavaScript, PhishingInitiative, MISP, UniversalWhois, UrlScan
from .capturecache import CaptureCache from .capturecache import CaptureCache
from .context import Context from .context import Context
@ -59,7 +59,7 @@ class Lookyloo():
self.redis_pool: ConnectionPool = ConnectionPool(connection_class=UnixDomainSocketConnection, self.redis_pool: ConnectionPool = ConnectionPool(connection_class=UnixDomainSocketConnection,
path=get_socket_path('cache'), decode_responses=True) path=get_socket_path('cache'), decode_responses=True)
self.capture_dir: Path = get_homedir() / 'scraped' self.capture_dir: Path = get_captures_dir()
if os.environ.get('SPLASH_URL_DOCKER'): if os.environ.get('SPLASH_URL_DOCKER'):
# In order to have a working default for the docker image, it is easier to use an environment variable # In order to have a working default for the docker image, it is easier to use an environment variable
self.splash_url: str = os.environ['SPLASH_URL_DOCKER'] self.splash_url: str = os.environ['SPLASH_URL_DOCKER']
@ -69,8 +69,6 @@ class Lookyloo():
self._priority = get_config('generic', 'priority') self._priority = get_config('generic', 'priority')
safe_create_dir(self.capture_dir)
# Initialize 3rd party components # Initialize 3rd party components
self.pi = PhishingInitiative(get_config('modules', 'PhishingInitiative')) self.pi = PhishingInitiative(get_config('modules', 'PhishingInitiative'))
if not self.pi.available: if not self.pi.available:
@ -103,19 +101,6 @@ class Lookyloo():
def redis(self): def redis(self):
return Redis(connection_pool=self.redis_pool) return Redis(connection_pool=self.redis_pool)
def _get_priority(self, source: str, user: str, authenticated: bool) -> int:
src_prio: int = self._priority['sources'][source] if source in self._priority['sources'] else -1
if not authenticated:
usr_prio = self._priority['users']['_default_anon']
# reduce priority for anonymous users making lots of captures
queue_size = self.redis.zscore('queues', f'{source}|{authenticated}|{user}')
if queue_size is None:
queue_size = 0
usr_prio -= int(queue_size / 10)
else:
usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth']
return src_prio + usr_prio
def cache_user_agents(self, user_agent: str, remote_ip: str) -> None: def cache_user_agents(self, user_agent: str, remote_ip: str) -> None:
'''Cache the useragents of the visitors''' '''Cache the useragents of the visitors'''
today = date.today().isoformat() today = date.today().isoformat()
@ -592,9 +577,6 @@ class Lookyloo():
all_cache.sort(key=operator.attrgetter('timestamp'), reverse=True) all_cache.sort(key=operator.attrgetter('timestamp'), reverse=True)
return all_cache return all_cache
def clear_captures_index_cache(self, uuids: Iterable[str]) -> None:
[self._captures_index.pop(uuid) for uuid in uuids if uuid in self._captures_index]
def capture_cache(self, capture_uuid: str, /) -> Optional[CaptureCache]: def capture_cache(self, capture_uuid: str, /) -> Optional[CaptureCache]:
"""Get the cache from redis.""" """Get the cache from redis."""
if capture_uuid in self._captures_index and not self._captures_index[capture_uuid].incomplete_redirects: if capture_uuid in self._captures_index and not self._captures_index[capture_uuid].incomplete_redirects:
@ -654,6 +636,19 @@ class Lookyloo():
return CaptureStatus.ONGOING return CaptureStatus.ONGOING
return CaptureStatus.UNKNOWN return CaptureStatus.UNKNOWN
def _get_priority(self, source: str, user: str, authenticated: bool) -> int:
src_prio: int = self._priority['sources'][source] if source in self._priority['sources'] else -1
if not authenticated:
usr_prio = self._priority['users']['_default_anon']
# reduce priority for anonymous users making lots of captures
queue_size = self.redis.zscore('queues', f'{source}|{authenticated}|{user}')
if queue_size is None:
queue_size = 0
usr_prio -= int(queue_size / 10)
else:
usr_prio = self._priority['users'][user] if self._priority['users'].get(user) else self._priority['users']['_default_auth']
return src_prio + usr_prio
def enqueue_capture(self, query: MutableMapping[str, Any], source: str, user: str, authenticated: bool) -> str: def enqueue_capture(self, query: MutableMapping[str, Any], source: str, user: str, authenticated: bool) -> str:
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)''' '''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
perma_uuid = str(uuid4()) perma_uuid = str(uuid4())