From 407e78ae7f1e6862387fb4933341c0924cce61b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 25 Aug 2021 16:40:51 +0200 Subject: [PATCH] chg: More cleanup, support clean shutdown of multiple async captures --- bin/async_capture.py | 49 ++++++++++++++++---------------------- bin/shutdown.py | 4 ++-- bin/start_website.py | 5 +--- known_content/generic.json | 3 ++- lookyloo/helpers.py | 12 ++++++---- 5 files changed, 33 insertions(+), 40 deletions(-) diff --git a/bin/async_capture.py b/bin/async_capture.py index d655fd3..d1cf763 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -12,7 +12,6 @@ from datetime import datetime from pathlib import Path from typing import Union, Dict, Optional, Tuple, List from urllib.parse import urlsplit -from uuid import uuid4 from defang import refang # type: ignore from redis import Redis @@ -39,20 +38,13 @@ class AsyncCapture(AbstractManager): self.splash_url: str = get_splash_url() self.redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True) - def process_capture_queue(self) -> Union[bool, None]: + def process_capture_queue(self) -> None: '''Process a query from the capture queue''' - if not self.redis.exists('to_capture'): - return None - - status, message = splash_status() - if not status: - self.logger.critical(f'Splash is not running, unable to process the capture queue: {message}') - return None - value: Optional[List[Tuple[str, int]]] = self.redis.zpopmax('to_capture') # type: ignore if not value or not value[0]: - return None - uuid, score = value[0] + # The queue was consumed by an other process. + return + uuid, _score = value[0] queue: Optional[str] = self.redis.get(f'{uuid}_mgmt') self.redis.sadd('ongoing', uuid) @@ -67,22 +59,20 @@ class AsyncCapture(AbstractManager): if 'cookies' in to_capture: to_capture['cookies_pseudofile'] = to_capture.pop('cookies') - status = self._capture(**to_capture) # type: ignore + if self._capture(**to_capture): # type: ignore + self.logger.info(f'Processed {to_capture["url"]}') + else: + self.logger.warning(f'Unable to capture {to_capture["url"]}') lazy_cleanup.srem('ongoing', uuid) lazy_cleanup.delete(uuid) - # make sure to expire the key if nothing was process for a while (= queues empty) + # make sure to expire the key if nothing was processed for a while (= queues empty) lazy_cleanup.expire('queues', 600) lazy_cleanup.execute() - if status: - self.logger.info(f'Processed {to_capture["url"]}') - return True - self.logger.warning(f'Unable to capture {to_capture["url"]}') - return False - def _capture(self, url: str, *, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None, + def _capture(self, url: str, *, perma_uuid: str, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None, depth: int=1, listing: bool=True, user_agent: Optional[str]=None, - referer: str='', proxy: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None, - browser: Optional[str]=None, parent: Optional[str]=None) -> Union[bool, str]: + referer: str='', proxy: str='', os: Optional[str]=None, + browser: Optional[str]=None, parent: Optional[str]=None) -> bool: '''Launch a capture''' url = url.strip() url = refang(url) @@ -113,8 +103,6 @@ class AsyncCapture(AbstractManager): if int(depth) > int(get_config('generic', 'max_depth')): self.logger.warning(f'Not allowed to capture on a depth higher than {get_config("generic", "max_depth")}: {depth}') depth = int(get_config('generic', 'max_depth')) - if not perma_uuid: - perma_uuid = str(uuid4()) self.logger.info(f'Capturing {url}') try: items = crawl(self.splash_url, url, cookies=cookies, depth=depth, user_agent=ua, @@ -182,12 +170,17 @@ class AsyncCapture(AbstractManager): with (dirpath / '{0:0{width}}.cookies.json'.format(i, width=width)).open('w') as _cookies: json.dump(cookies, _cookies) self.redis.hset('lookup_dirs', perma_uuid, str(dirpath)) - return perma_uuid + return True def _to_run_forever(self): - while True: - url = self.process_capture_queue() - if url is None or shutdown_requested(): + while self.redis.exists('to_capture'): + status, message = splash_status() + if not status: + self.logger.critical(f'Splash is not running, unable to process the capture queue: {message}') + break + + self.process_capture_queue() + if shutdown_requested(): break diff --git a/bin/shutdown.py b/bin/shutdown.py index aee230f..c211b0c 100755 --- a/bin/shutdown.py +++ b/bin/shutdown.py @@ -3,11 +3,11 @@ from lookyloo.helpers import is_running, get_socket_path import time -from redis import StrictRedis +from redis import Redis def main(): - r = StrictRedis(unix_socket_path=get_socket_path('cache'), db=1) + r = Redis(unix_socket_path=get_socket_path('cache'), db=1) r.set('shutdown', 1) time.sleep(5) while True: diff --git a/bin/start_website.py b/bin/start_website.py index 6596408..d255799 100755 --- a/bin/start_website.py +++ b/bin/start_website.py @@ -4,13 +4,10 @@ import time import signal from subprocess import Popen -from lookyloo.helpers import get_homedir, shutdown_requested, set_running, unset_running, get_socket_path, get_config -from redis import StrictRedis +from lookyloo.helpers import get_homedir, shutdown_requested, set_running, unset_running, get_config def main(): - r = StrictRedis(unix_socket_path=get_socket_path('cache')) - r.delete('cache_loaded') website_dir = get_homedir() / 'website' ip = get_config('generic', 'website_listen_ip') port = get_config('generic', 'website_listen_port') diff --git a/known_content/generic.json b/known_content/generic.json index dbcb0de..2605799 100644 --- a/known_content/generic.json +++ b/known_content/generic.json @@ -53,7 +53,8 @@ "f1c33e72643ce366fd578e3b5d393799e8c9ea27b180987826af43b4fc00b65a4eaae5e6426a23448956fee99e3108c6a86f32fb4896c156e24af0571a11c498", "dc7c40381b3d22919e32c1b700ccb77b1b0aea2690642d01c1ac802561e135c01d5a4d2a0ea18efc0ec3362e8c549814a10a23563f1f56bd62aee0ced7e2bd99", "c2c239cb5cdd0b670780ad6414ef6be9ccd4c21ce46bb93d1fa3120ac812f1679445162978c3df05cb2e1582a1844cc4c41cf74960b8fdae3123999c5d2176cc", - "6ad523f5b65487369d305613366b9f68dcdeee225291766e3b25faf45439ca069f614030c08ca54c714fdbf7a944fac489b1515a8bf9e0d3191e1bcbbfe6a9df" + "6ad523f5b65487369d305613366b9f68dcdeee225291766e3b25faf45439ca069f614030c08ca54c714fdbf7a944fac489b1515a8bf9e0d3191e1bcbbfe6a9df", + "5065931218ce18ded3a022bd14e8208247f6d0900fff3b41901f9dba45dc417d84e386549e64446f390073431ed23a83d9f4c018da389d2e43f59c26febfc0de" ] }, "empty_svg" : { diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index 8b420e3..8382d53 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -170,18 +170,20 @@ def safe_create_dir(to_create: Path) -> None: def set_running(name: str) -> None: - r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) - r.hset('running', name, 1) + r = Redis(unix_socket_path=get_socket_path('cache'), db=1) + r.zincrby('running', 1, name) def unset_running(name: str) -> None: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) - r.hdel('running', name) + current_running = r.zincrby('running', -1, name) + if int(current_running) <= 0: + r.zrem('running', name) -def is_running() -> Dict[Any, Any]: +def is_running() -> List[Tuple[str, float]]: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) - return r.hgetall('running') + return r.zrangebyscore('running', '-inf', '+inf', withscores=True) def get_socket_path(name: str) -> str: