diff --git a/bin/async_capture.py b/bin/async_capture.py index bcaff971..070c5471 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -7,6 +7,7 @@ import logging import logging.config import signal +from asyncio import Task from pathlib import Path from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore @@ -43,6 +44,10 @@ class AsyncCapture(AbstractManager): async def _trigger_captures(self) -> None: # Only called if LacusCore is used + def clear_list_callback(task: Task) -> None: # type: ignore[type-arg] + self.captures.discard(task) + self.unset_running() + max_new_captures = get_config('generic', 'async_capture_processes') - len(self.captures) self.logger.debug(f'{len(self.captures)} ongoing captures.') if max_new_captures <= 0: @@ -50,7 +55,8 @@ class AsyncCapture(AbstractManager): return None for capture_task in self.lookyloo.lacus.consume_queue(max_new_captures): # type: ignore[union-attr] self.captures.add(capture_task) - capture_task.add_done_callback(self.captures.discard) + self.set_running() + capture_task.add_done_callback(clear_list_callback) def uuids_ready(self) -> list[str]: '''Get the list of captures ready to be processed''' @@ -115,7 +121,6 @@ class AsyncCapture(AbstractManager): # make sure to expire the key if nothing was processed for a while (= queues empty) lazy_cleanup.expire('queues', 600) lazy_cleanup.execute() - self.unset_running() self.logger.info(f'Done with {uuid}') async def _to_run_forever_async(self) -> None: @@ -125,10 +130,6 @@ class AsyncCapture(AbstractManager): try: if isinstance(self.lookyloo.lacus, LacusCore): await self._trigger_captures() - # NOTE: +1 because running this method also counts for one and will - # be decremented when it finishes - self.set_running(len(self.captures) + 1) - self.process_capture_queue() except LacusUnreachable: self.logger.error('Lacus is unreachable, retrying later.') @@ -139,10 +140,7 @@ class AsyncCapture(AbstractManager): while self.captures: self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...') await asyncio.sleep(5) - # NOTE: +1 so we don't quit before the final process capture queue - self.set_running(len(self.captures) + 1) self.process_capture_queue() - self.unset_running() self.logger.info('No more captures') except LacusUnreachable: self.logger.error('Lacus is unreachable, nothing to wait for') diff --git a/bin/scripts_controller.py b/bin/scripts_controller.py new file mode 100755 index 00000000..8b194608 --- /dev/null +++ b/bin/scripts_controller.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse + +import time + +from subprocess import Popen + +from psutil import Process +from redis import Redis + +from lookyloo.default import get_homedir, get_socket_path, AbstractManager + + +def _get_cmdline(pid: str) -> list[str]: + process = Process(int(pid)) + return process.cmdline() + + +def main() -> None: + parser = argparse.ArgumentParser(description='Manage the scripts.') + parser.add_argument('action', choices=['list', 'stop', 'restart'], help='The action to perform.', default='list') + parser.add_argument('script', help='The script to manage.', nargs='?') + args = parser.parse_args() + # Just fail if the env isn't set. + get_homedir() + if args.action == 'list': + try: + print(AbstractManager.is_running()) + except FileNotFoundError: + print('Redis is down.') + else: + if args.action == 'restart': + # we need to keep the cmdline for the restart + try: + running_services = AbstractManager.is_running() + except KeyError: + print(f'{args.script} is not running.') + return + for name, numbers, pids in running_services: + if name == args.script: + to_restart = _get_cmdline(pids.pop()) + break + + print(f'Request {args.script} to {args.action}...') + r = Redis(unix_socket_path=get_socket_path('cache'), db=1) + r.sadd('shutdown_manual', args.script) + while r.zscore('running', args.script) is not None: + print(f'Wait for {args.script} to stop...') + time.sleep(1) + print('done.') + r.srem('shutdown_manual', args.script) + + if args.action == 'restart': + print(f'Start {args.script}...') + Popen(to_restart) + print('done.') + + +if __name__ == '__main__': + main() diff --git a/lookyloo/default/abstractmanager.py b/lookyloo/default/abstractmanager.py index 70d85ad9..9dcc6851 100644 --- a/lookyloo/default/abstractmanager.py +++ b/lookyloo/default/abstractmanager.py @@ -33,9 +33,10 @@ class AbstractManager(ABC): self.force_stop = False @staticmethod - def is_running() -> list[tuple[str, float]]: + def is_running() -> list[tuple[str, float, set[str]]]: try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) + running_scripts: dict[str, set[str]] = {} for script_name, score in r.zrangebyscore('running', '-inf', '+inf', withscores=True): for pid in r.smembers(f'service|{script_name}'): try: @@ -48,7 +49,8 @@ class AbstractManager(ABC): r.zadd('running', {script_name: other_same_services}) else: r.zrem('running', script_name) - return r.zrangebyscore('running', '-inf', '+inf', withscores=True) + running_scripts[script_name] = r.smembers(f'service|{script_name}') + return [(name, rank, running_scripts[name] if name in running_scripts else set()) for name, rank in r.zrangebyscore('running', '-inf', '+inf', withscores=True)] except RedisConnectionError: print('Unable to connect to redis, the system is down.') return [] @@ -104,7 +106,8 @@ class AbstractManager(ABC): def shutdown_requested(self) -> bool: try: - return bool(self.__redis.exists('shutdown')) + return (bool(self.__redis.exists('shutdown')) + or bool(self.__redis.sismember('shutdown_manual', self.script_name))) except ConnectionRefusedError: return True except RedisConnectionError: @@ -133,6 +136,7 @@ class AbstractManager(ABC): def run(self, sleep_in_sec: int) -> None: self.logger.info(f'Launching {self.__class__.__name__}') try: + self.set_running() while not self.force_stop: if self.shutdown_requested(): break @@ -142,15 +146,9 @@ class AbstractManager(ABC): self.logger.critical(f'Unable to start {self.script_name}.') break else: - self.set_running() self._to_run_forever() except Exception: # nosec B110 self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') - finally: - if not self.process: - # self.process means we run an external script, all the time, - # do not unset between sleep. - self.unset_running() if not self.long_sleep(sleep_in_sec): break except KeyboardInterrupt: @@ -187,6 +185,7 @@ class AbstractManager(ABC): async def run_async(self, sleep_in_sec: int) -> None: self.logger.info(f'Launching {self.__class__.__name__}') try: + self.set_running() while not self.force_stop: if self.shutdown_requested(): break @@ -196,15 +195,9 @@ class AbstractManager(ABC): self.logger.critical(f'Unable to start {self.script_name}.') break else: - self.set_running() await self._to_run_forever_async() except Exception: # nosec B110 self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') - finally: - if not self.process: - # self.process means we run an external script, all the time, - # do not unset between sleep. - self.unset_running() if not await self.long_sleep_async(sleep_in_sec): break except KeyboardInterrupt: diff --git a/pyproject.toml b/pyproject.toml index a514c5c3..0ffc94d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ background_full_indexer = "bin.background_indexer:main_full_indexer" archiver = "bin.archiver:main" processing = "bin.background_processing:main" start_website = "bin.start_website:main" +scripts_controller = "bin.scripts_controller:main" [tool.poetry.dependencies] diff --git a/tools/monitoring.py b/tools/monitoring.py index c057d4ee..1be9e96b 100755 --- a/tools/monitoring.py +++ b/tools/monitoring.py @@ -123,8 +123,8 @@ if __name__ == '__main__': console.print('Services currently running:') running = AbstractManager.is_running() - for service, number in running: - s = Padding(f'{service} ({int(number)} service(s))', (0, 2)) + for service, number, pids in running: + s = Padding(f'{service} ({int(number)} service(s)) - PIDs: {", ".join(pids)}', (0, 2)) console.print(s) console.print('Current cache status:')