new: Avoid dangling keys in running, sync AbstractManager with pandora

pull/557/head
Raphaël Vinot 2022-12-01 15:52:21 +01:00
parent af9515c123
commit 9653fc2cc7
1 changed files with 34 additions and 14 deletions

View File

@ -2,6 +2,7 @@
import asyncio import asyncio
import logging import logging
import os
import signal import signal
import time import time
from abc import ABC from abc import ABC
@ -10,7 +11,7 @@ from subprocess import Popen
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from redis import Redis from redis import Redis
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError as RedisConnectionError
from .helpers import get_socket_path from .helpers import get_socket_path
@ -33,21 +34,42 @@ class AbstractManager(ABC):
def is_running() -> List[Tuple[str, float]]: def is_running() -> List[Tuple[str, float]]:
try: try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
for script_name, score in r.zrangebyscore('running', '-inf', '+inf', withscores=True):
for pid in r.smembers(f'service|{script_name}'):
try:
os.kill(int(pid), 0)
except OSError:
print(f'Got a dead script: {script_name} - {pid}')
r.srem(f'service|{script_name}', pid)
other_same_services = r.scard(f'service|{script_name}')
if other_same_services:
r.zadd('running', {script_name: other_same_services})
else:
r.zrem('running', script_name)
return r.zrangebyscore('running', '-inf', '+inf', withscores=True) return r.zrangebyscore('running', '-inf', '+inf', withscores=True)
except ConnectionError: except RedisConnectionError:
print('Unable to connect to redis, the system is down.') print('Unable to connect to redis, the system is down.')
return [] return []
@staticmethod
def clear_running():
try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
r.delete('running')
except RedisConnectionError:
print('Unable to connect to redis, the system is down.')
@staticmethod @staticmethod
def force_shutdown(): def force_shutdown():
try: try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
r.set('shutdown', 1) r.set('shutdown', 1)
except ConnectionError: except RedisConnectionError:
print('Unable to connect to redis, the system is down.') print('Unable to connect to redis, the system is down.')
def set_running(self) -> None: def set_running(self) -> None:
self.__redis.zincrby('running', 1, self.script_name) self.__redis.zincrby('running', 1, self.script_name)
self.__redis.sadd(f'service|{self.script_name}', os.getpid())
def unset_running(self) -> None: def unset_running(self) -> None:
current_running = self.__redis.zincrby('running', -1, self.script_name) current_running = self.__redis.zincrby('running', -1, self.script_name)
@ -55,8 +77,7 @@ class AbstractManager(ABC):
self.__redis.zrem('running', self.script_name) self.__redis.zrem('running', self.script_name)
def long_sleep(self, sleep_in_sec: int, shutdown_check: int=10) -> bool: def long_sleep(self, sleep_in_sec: int, shutdown_check: int=10) -> bool:
if shutdown_check > sleep_in_sec: shutdown_check = min(sleep_in_sec, shutdown_check)
shutdown_check = sleep_in_sec
sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec) sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec)
while sleep_until > datetime.now(): while sleep_until > datetime.now():
time.sleep(shutdown_check) time.sleep(shutdown_check)
@ -65,8 +86,7 @@ class AbstractManager(ABC):
return True return True
async def long_sleep_async(self, sleep_in_sec: int, shutdown_check: int=10) -> bool: async def long_sleep_async(self, sleep_in_sec: int, shutdown_check: int=10) -> bool:
if shutdown_check > sleep_in_sec: shutdown_check = min(sleep_in_sec, shutdown_check)
shutdown_check = sleep_in_sec
sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec) sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec)
while sleep_until > datetime.now(): while sleep_until > datetime.now():
await asyncio.sleep(shutdown_check) await asyncio.sleep(shutdown_check)
@ -76,10 +96,10 @@ class AbstractManager(ABC):
def shutdown_requested(self) -> bool: def shutdown_requested(self) -> bool:
try: try:
return True if self.__redis.exists('shutdown') else False return bool(self.__redis.exists('shutdown'))
except ConnectionRefusedError: except ConnectionRefusedError:
return True return True
except ConnectionError: except RedisConnectionError:
return True return True
def _to_run_forever(self) -> None: def _to_run_forever(self) -> None:
@ -116,7 +136,7 @@ class AbstractManager(ABC):
else: else:
self.set_running() self.set_running()
self._to_run_forever() self._to_run_forever()
except Exception: except Exception: # nosec B110
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally: finally:
if not self.process: if not self.process:
@ -132,7 +152,7 @@ class AbstractManager(ABC):
self._kill_process() self._kill_process()
try: try:
self.unset_running() self.unset_running()
except Exception: except Exception: # nosec B110
# the services can already be down at that point. # the services can already be down at that point.
pass pass
self.logger.info(f'Shutting down {self.__class__.__name__}') self.logger.info(f'Shutting down {self.__class__.__name__}')
@ -166,7 +186,7 @@ class AbstractManager(ABC):
else: else:
self.set_running() self.set_running()
await self._to_run_forever_async() await self._to_run_forever_async()
except Exception: except Exception: # nosec B110
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.') self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally: finally:
if not self.process: if not self.process:
@ -177,7 +197,7 @@ class AbstractManager(ABC):
break break
except KeyboardInterrupt: except KeyboardInterrupt:
self.logger.warning(f'{self.script_name} killed by user.') self.logger.warning(f'{self.script_name} killed by user.')
except Exception as e: except Exception as e: # nosec B110
self.logger.exception(e) self.logger.exception(e)
finally: finally:
await self._wait_to_finish() await self._wait_to_finish()
@ -185,7 +205,7 @@ class AbstractManager(ABC):
self._kill_process() self._kill_process()
try: try:
self.unset_running() self.unset_running()
except Exception: except Exception: # nosec B110
# the services can already be down at that point. # the services can already be down at that point.
pass pass
self.logger.info(f'Shutting down {self.__class__.__name__}') self.logger.info(f'Shutting down {self.__class__.__name__}')