new: controller to list/stop/restart scripts indiviually

pull/937/head
Raphaël Vinot 2024-08-07 13:47:55 +02:00
parent fa311ef77c
commit 3a09e81b9a
5 changed files with 81 additions and 26 deletions

View File

@ -7,6 +7,7 @@ import logging
import logging.config
import signal
from asyncio import Task
from pathlib import Path
from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
@ -43,6 +44,10 @@ class AsyncCapture(AbstractManager):
async def _trigger_captures(self) -> None:
# Only called if LacusCore is used
def clear_list_callback(task: Task) -> None: # type: ignore[type-arg]
self.captures.discard(task)
self.unset_running()
max_new_captures = get_config('generic', 'async_capture_processes') - len(self.captures)
self.logger.debug(f'{len(self.captures)} ongoing captures.')
if max_new_captures <= 0:
@ -50,7 +55,8 @@ class AsyncCapture(AbstractManager):
return None
for capture_task in self.lookyloo.lacus.consume_queue(max_new_captures): # type: ignore[union-attr]
self.captures.add(capture_task)
capture_task.add_done_callback(self.captures.discard)
self.set_running()
capture_task.add_done_callback(clear_list_callback)
def uuids_ready(self) -> list[str]:
'''Get the list of captures ready to be processed'''
@ -115,7 +121,6 @@ class AsyncCapture(AbstractManager):
# make sure to expire the key if nothing was processed for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
self.unset_running()
self.logger.info(f'Done with {uuid}')
async def _to_run_forever_async(self) -> None:
@ -125,10 +130,6 @@ class AsyncCapture(AbstractManager):
try:
if isinstance(self.lookyloo.lacus, LacusCore):
await self._trigger_captures()
# NOTE: +1 because running this method also counts for one and will
# be decremented when it finishes
self.set_running(len(self.captures) + 1)
self.process_capture_queue()
except LacusUnreachable:
self.logger.error('Lacus is unreachable, retrying later.')
@ -139,10 +140,7 @@ class AsyncCapture(AbstractManager):
while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
await asyncio.sleep(5)
# NOTE: +1 so we don't quit before the final process capture queue
self.set_running(len(self.captures) + 1)
self.process_capture_queue()
self.unset_running()
self.logger.info('No more captures')
except LacusUnreachable:
self.logger.error('Lacus is unreachable, nothing to wait for')

63
bin/scripts_controller.py Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import time
from subprocess import Popen
from psutil import Process
from redis import Redis
from lookyloo.default import get_homedir, get_socket_path, AbstractManager
def _get_cmdline(pid: str) -> list[str]:
process = Process(int(pid))
return process.cmdline()
def main() -> None:
parser = argparse.ArgumentParser(description='Manage the scripts.')
parser.add_argument('action', choices=['list', 'stop', 'restart'], help='The action to perform.', default='list')
parser.add_argument('script', help='The script to manage.', nargs='?')
args = parser.parse_args()
# Just fail if the env isn't set.
get_homedir()
if args.action == 'list':
try:
print(AbstractManager.is_running())
except FileNotFoundError:
print('Redis is down.')
else:
if args.action == 'restart':
# we need to keep the cmdline for the restart
try:
running_services = AbstractManager.is_running()
except KeyError:
print(f'{args.script} is not running.')
return
for name, numbers, pids in running_services:
if name == args.script:
to_restart = _get_cmdline(pids.pop())
break
print(f'Request {args.script} to {args.action}...')
r = Redis(unix_socket_path=get_socket_path('cache'), db=1)
r.sadd('shutdown_manual', args.script)
while r.zscore('running', args.script) is not None:
print(f'Wait for {args.script} to stop...')
time.sleep(1)
print('done.')
r.srem('shutdown_manual', args.script)
if args.action == 'restart':
print(f'Start {args.script}...')
Popen(to_restart)
print('done.')
if __name__ == '__main__':
main()

View File

@ -33,9 +33,10 @@ class AbstractManager(ABC):
self.force_stop = False
@staticmethod
def is_running() -> list[tuple[str, float]]:
def is_running() -> list[tuple[str, float, set[str]]]:
try:
r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
running_scripts: dict[str, set[str]] = {}
for script_name, score in r.zrangebyscore('running', '-inf', '+inf', withscores=True):
for pid in r.smembers(f'service|{script_name}'):
try:
@ -48,7 +49,8 @@ class AbstractManager(ABC):
r.zadd('running', {script_name: other_same_services})
else:
r.zrem('running', script_name)
return r.zrangebyscore('running', '-inf', '+inf', withscores=True)
running_scripts[script_name] = r.smembers(f'service|{script_name}')
return [(name, rank, running_scripts[name] if name in running_scripts else set()) for name, rank in r.zrangebyscore('running', '-inf', '+inf', withscores=True)]
except RedisConnectionError:
print('Unable to connect to redis, the system is down.')
return []
@ -104,7 +106,8 @@ class AbstractManager(ABC):
def shutdown_requested(self) -> bool:
try:
return bool(self.__redis.exists('shutdown'))
return (bool(self.__redis.exists('shutdown'))
or bool(self.__redis.sismember('shutdown_manual', self.script_name)))
except ConnectionRefusedError:
return True
except RedisConnectionError:
@ -133,6 +136,7 @@ class AbstractManager(ABC):
def run(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
self.set_running()
while not self.force_stop:
if self.shutdown_requested():
break
@ -142,15 +146,9 @@ class AbstractManager(ABC):
self.logger.critical(f'Unable to start {self.script_name}.')
break
else:
self.set_running()
self._to_run_forever()
except Exception: # nosec B110
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally:
if not self.process:
# self.process means we run an external script, all the time,
# do not unset between sleep.
self.unset_running()
if not self.long_sleep(sleep_in_sec):
break
except KeyboardInterrupt:
@ -187,6 +185,7 @@ class AbstractManager(ABC):
async def run_async(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}')
try:
self.set_running()
while not self.force_stop:
if self.shutdown_requested():
break
@ -196,15 +195,9 @@ class AbstractManager(ABC):
self.logger.critical(f'Unable to start {self.script_name}.')
break
else:
self.set_running()
await self._to_run_forever_async()
except Exception: # nosec B110
self.logger.exception(f'Something went terribly wrong in {self.__class__.__name__}.')
finally:
if not self.process:
# self.process means we run an external script, all the time,
# do not unset between sleep.
self.unset_running()
if not await self.long_sleep_async(sleep_in_sec):
break
except KeyboardInterrupt:

View File

@ -34,6 +34,7 @@ background_full_indexer = "bin.background_indexer:main_full_indexer"
archiver = "bin.archiver:main"
processing = "bin.background_processing:main"
start_website = "bin.start_website:main"
scripts_controller = "bin.scripts_controller:main"
[tool.poetry.dependencies]

View File

@ -123,8 +123,8 @@ if __name__ == '__main__':
console.print('Services currently running:')
running = AbstractManager.is_running()
for service, number in running:
s = Padding(f'{service} ({int(number)} service(s))', (0, 2))
for service, number, pids in running:
s = Padding(f'{service} ({int(number)} service(s)) - PIDs: {", ".join(pids)}', (0, 2))
console.print(s)
console.print('Current cache status:')