lookyloo/lookyloo/default/abstractmanager.py

175 lines
6.5 KiB
Python

#!/usr/bin/env python3
import asyncio
import logging
import signal
import time
from abc import ABC
from datetime import datetime, timedelta
from subprocess import Popen
from typing import List, Optional, Tuple
from redis import Redis
from redis.exceptions import ConnectionError
from .helpers import get_socket_path
class AbstractManager(ABC):
script_name: str
def __init__(self, loglevel: int=logging.DEBUG):
self.loglevel = loglevel
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(loglevel)
self.logger.info(f'Initializing {self.__class__.__name__}')
self.process: Optional[Popen] = None
self.__redis = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
@staticmethod
def is_running() -> List[Tuple[str, float]]:
try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
return r.zrangebyscore('running', '-inf', '+inf', withscores=True)
except ConnectionError:
print('Unable to connect to redis, the system is down.')
return []
@staticmethod
def force_shutdown():
try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
r.set('shutdown', 1)
except ConnectionError:
print('Unable to connect to redis, the system is down.')
def set_running(self) -> None:
self.__redis.zincrby('running', 1, self.script_name)
def unset_running(self) -> None:
current_running = self.__redis.zincrby('running', -1, self.script_name)
if int(current_running) <= 0:
self.__redis.zrem('running', self.script_name)
def long_sleep(self, sleep_in_sec: int, shutdown_check: int=10) -> bool:
if shutdown_check > sleep_in_sec:
shutdown_check = sleep_in_sec
sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec)
while sleep_until > datetime.now():
time.sleep(shutdown_check)
if self.shutdown_requested():
return False
return True
async def long_sleep_async(self, sleep_in_sec: int, shutdown_check: int=10) -> bool:
if shutdown_check > sleep_in_sec:
shutdown_check = sleep_in_sec
sleep_until = datetime.now() + timedelta(seconds=sleep_in_sec)
while sleep_until > datetime.now():
await asyncio.sleep(shutdown_check)
if self.shutdown_requested():
return False
return True
def shutdown_requested(self) -> bool:
try:
return True if self.__redis.exists('shutdown') else False
except ConnectionRefusedError:
return True
except ConnectionError:
return True
def _to_run_forever(self) -> None:
pass
def _kill_process(self):
if self.process is None:
return
kill_order = [signal.SIGWINCH, signal.SIGTERM, signal.SIGINT, signal.SIGKILL]
for sig in kill_order:
if self.process.poll() is None:
self.logger.info(f'Sending {sig} to {self.process.pid}.')
self.process.send_signal(sig)
time.sleep(1)
else:
break
else:
self.logger.warning(f'Unable to kill {self.process.pid}, keep sending SIGKILL')
while self.process.poll() is None:
self.process.send_signal(signal.SIGKILL)
time.sleep(1)
def run(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
while True:
if self.shutdown_requested():
break
try:
if self.process:
if self.process.poll() is not None:
self.logger.critical(f'Unable to start {self.script_name}.')
break
else:
self.set_running()
self._to_run_forever()
except Exception:
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally:
if not self.process:
# self.process means we run an external script, all the time,
# do not unset between sleep.
self.unset_running()
if not self.long_sleep(sleep_in_sec):
break
except KeyboardInterrupt:
self.logger.warning(f'{self.script_name} killed by user.')
finally:
if self.process:
self._kill_process()
try:
self.unset_running()
except Exception:
# the services can already be down at that point.
pass
self.logger.info(f'Shutting down {self.__class__.__name__}')
async def _to_run_forever_async(self) -> None:
pass
async def run_async(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
while True:
if self.shutdown_requested():
break
try:
if self.process:
if self.process.poll() is not None:
self.logger.critical(f'Unable to start {self.script_name}.')
break
else:
self.set_running()
await self._to_run_forever_async()
except Exception:
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally:
if not self.process:
# self.process means we run an external script, all the time,
# do not unset between sleep.
self.unset_running()
if not await self.long_sleep_async(sleep_in_sec):
break
except KeyboardInterrupt:
self.logger.warning(f'{self.script_name} killed by user.')
finally:
if self.process:
self._kill_process()
try:
self.unset_running()
except Exception:
# the services can already be down at that point.
pass
self.logger.info(f'Shutting down {self.__class__.__name__}')