From 9653fc2cc768829a2e1db7e322b90a00378344e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 1 Dec 2022 15:52:21 +0100 Subject: [PATCH] new: Avoid dangling keys in running, sync AbstractManager with pandora --- lookyloo/default/abstractmanager.py | 48 ++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/lookyloo/default/abstractmanager.py b/lookyloo/default/abstractmanager.py index 62767292..75fe3279 100644 --- a/lookyloo/default/abstractmanager.py +++ b/lookyloo/default/abstractmanager.py @@ -2,6 +2,7 @@ import asyncio import logging +import os import signal import time from abc import ABC @@ -10,7 +11,7 @@ from subprocess import Popen from typing import List, Optional, Tuple from redis import Redis -from redis.exceptions import ConnectionError +from redis.exceptions import ConnectionError as RedisConnectionError from .helpers import get_socket_path @@ -33,21 +34,42 @@ class AbstractManager(ABC): def is_running() -> List[Tuple[str, float]]: try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) + for script_name, score in r.zrangebyscore('running', '-inf', '+inf', withscores=True): + for pid in r.smembers(f'service|{script_name}'): + try: + os.kill(int(pid), 0) + except OSError: + print(f'Got a dead script: {script_name} - {pid}') + r.srem(f'service|{script_name}', pid) + other_same_services = r.scard(f'service|{script_name}') + if other_same_services: + r.zadd('running', {script_name: other_same_services}) + else: + r.zrem('running', script_name) return r.zrangebyscore('running', '-inf', '+inf', withscores=True) - except ConnectionError: + except RedisConnectionError: print('Unable to connect to redis, the system is down.') return [] + @staticmethod + def clear_running(): + try: + r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) + r.delete('running') + except RedisConnectionError: + print('Unable to connect to redis, the system is down.') + @staticmethod def force_shutdown(): try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) r.set('shutdown', 1) - except ConnectionError: + except RedisConnectionError: print('Unable to connect to redis, the system is down.') def set_running(self) -> None: self.__redis.zincrby('running', 1, self.script_name) + self.__redis.sadd(f'service|{self.script_name}', os.getpid()) def unset_running(self) -> None: current_running = self.__redis.zincrby('running', -1, self.script_name) @@ -55,8 +77,7 @@ class AbstractManager(ABC): self.__redis.zrem('running', self.script_name) def long_sleep(self, sleep_in_sec: int, shutdown_check: int=10) -> bool: - if shutdown_check > sleep_in_sec: - shutdown_check = sleep_in_sec + shutdown_check = min(sleep_in_sec, shutdown_check) sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec) while sleep_until > datetime.now(): time.sleep(shutdown_check) @@ -65,8 +86,7 @@ class AbstractManager(ABC): return True async def long_sleep_async(self, sleep_in_sec: int, shutdown_check: int=10) -> bool: - if shutdown_check > sleep_in_sec: - shutdown_check = sleep_in_sec + shutdown_check = min(sleep_in_sec, shutdown_check) sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec) while sleep_until > datetime.now(): await asyncio.sleep(shutdown_check) @@ -76,10 +96,10 @@ class AbstractManager(ABC): def shutdown_requested(self) -> bool: try: - return True if self.__redis.exists('shutdown') else False + return bool(self.__redis.exists('shutdown')) except ConnectionRefusedError: return True - except ConnectionError: + except RedisConnectionError: return True def _to_run_forever(self) -> None: @@ -116,7 +136,7 @@ class AbstractManager(ABC): else: self.set_running() self._to_run_forever() - except Exception: + except Exception: # nosec B110 self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') finally: if not self.process: @@ -132,7 +152,7 @@ class AbstractManager(ABC): self._kill_process() try: self.unset_running() - except Exception: + except Exception: # nosec B110 # the services can already be down at that point. pass self.logger.info(f'Shutting down {self.__class__.__name__}') @@ -166,7 +186,7 @@ class AbstractManager(ABC): else: self.set_running() await self._to_run_forever_async() - except Exception: + except Exception: # nosec B110 self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') finally: if not self.process: @@ -177,7 +197,7 @@ class AbstractManager(ABC): break except KeyboardInterrupt: self.logger.warning(f'{self.script_name} killed by user.') - except Exception as e: + except Exception as e: # nosec B110 self.logger.exception(e) finally: await self._wait_to_finish() @@ -185,7 +205,7 @@ class AbstractManager(ABC): self._kill_process() try: self.unset_running() - except Exception: + except Exception: # nosec B110 # the services can already be down at that point. pass self.logger.info(f'Shutting down {self.__class__.__name__}')