chg: remove workers monitor script, rely on Supervisor API for all worker-related stuff

pull/7939/head
Luciano Righetti 2021-11-10 10:36:11 +01:00
parent 69a69e6e10
commit fb7e7212ef
4 changed files with 32 additions and 168 deletions

View File

@ -1,73 +0,0 @@
<?php
declare(strict_types=1);
App::uses('BackgroundJobsTool', 'Tools');
class MonitorWorkersShell extends AppShell
{
/** @var BackgroundJobsTool */
private $BackgroundJobsTool;
/** @var int */
private $sleepInterval;
private const DEFAULT_SLEEP_INTERVAL = 5; // seconds
public $tasks = ['ConfigLoad'];
public function initialize(): void
{
parent::initialize();
$this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs'));
}
public function getOptionParser(): ConsoleOptionParser
{
$parser = parent::getOptionParser();
$parser->addOption(
'sleep',
[
'help' => 'Sleep interval between jobs (seconds).',
'default' => self::DEFAULT_SLEEP_INTERVAL,
'required' => false
]
);
return $parser;
}
public function main(): void
{
$this->sleepInterval = (int)$this->params['sleep'];
CakeLog::info("[WORKERS MONITOR] - starting to monitor workers...");
while (true) {
$this->ConfigLoad->execute();
$this->checkWorkersProcessStatus($this->BackgroundJobsTool->getWorkers());
sleep($this->sleepInterval);
}
}
/**
* Check workers process status
*
* @param Worker[] $workers
* @return void
*/
private function checkWorkersProcessStatus(array $workers): void
{
foreach ($workers as $worker) {
if (!file_exists("/proc/{$worker->pid()}")) {
CakeLog::info("[WORKERS MONITOR] - worker with pid {$worker->pid()} is gone.");
$this->BackgroundJobsTool->unregisterWorker($worker->pid());
}
$this->BackgroundJobsTool->updateWorkerStatus(
$worker->pid(),
Worker::STATUS_RUNNING
);
}
}
}

View File

@ -62,7 +62,6 @@ class StartWorkerShell extends AppShell
$this->maxExecutionTime = (int)$this->params['maxExecutionTime'];
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - starting to process background jobs...");
$this->BackgroundJobsTool->registerWorker($this->worker);
while (true) {
$this->ConfigLoad->execute();
@ -93,7 +92,6 @@ class StartWorkerShell extends AppShell
{
if ((time() - $this->worker->createdAt()) > $this->maxExecutionTime) {
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - worker max execution time reached, exiting gracefully worker...");
$this->BackgroundJobsTool->unregisterWorker($this->worker->pid());
exit;
}
}

View File

@ -52,7 +52,7 @@ class Worker implements JsonSerializable
$this->user = $properties['user'];
$this->createdAt = $properties['createdAt'] ?? time();
$this->updatedAt = $properties['updatedAt'] ?? null;
$this->status = $properties['status'] ?? self::STATUS_RUNNING;
$this->status = $properties['status'] ?? self::STATUS_UNKNOWN;
}
public function jsonSerialize(): array

View File

@ -102,7 +102,6 @@ class BackgroundJobsTool
];
public const JOB_STATUS_PREFIX = 'job_status';
public const WORKER_STATUS_PREFIX = 'worker_status';
/** @var array */
private $settings;
@ -300,22 +299,6 @@ class BackgroundJobsTool
return (bool) $this->RedisConnection->del($queue);
}
/**
* Get worker by PID.
*
* @return Worker|null Worker instance.
*/
public function getWorker(int $workerPid): ?Worker
{
$rawWorker = $this->RedisConnection->get(self::WORKER_STATUS_PREFIX . ':' . $workerPid);
if ($rawWorker) {
return new Worker($rawWorker);
}
return null;
}
/**
* Get all workers' instances.
*
@ -323,27 +306,20 @@ class BackgroundJobsTool
*/
public function getWorkers(): array
{
$pattern = self::WORKER_STATUS_PREFIX . ':*';
// get existing workers status keys
$iterator = null;
$workersKeys = [];
while ($keys = $this->RedisConnection->scan($iterator, $pattern)) {
foreach ($keys as $key) {
$workersKeys[] = $key;
}
}
if (!$workersKeys) {
return [];
}
// get workers status
$workersStatus = $this->RedisConnection->mget($workersKeys);
$workers = [];
foreach ($workersStatus as $worker) {
$workers[] = new Worker($worker);
$procs = $this->Supervisor->getAllProcesses();
foreach ($procs as $proc) {
if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP) {
$workers[] = new Worker([
'pid' => $proc->offsetGet('pid'),
'queue' => explode("_", $proc->offsetGet('name'))[0],
'user' => trim(shell_exec(sprintf("ps -o uname= -p %s", (int) $proc->offsetGet('pid')))),
'createdAt' => $proc->offsetGet('start'),
'updatedAt' => $proc->offsetGet('now'),
'status' => $this->convertProcessStatus($proc->offsetGet('state'))
]);
}
}
return $workers;
@ -406,61 +382,6 @@ class BackgroundJobsTool
return $job->returnCode();
}
/**
* Register worker
*
* @param integer $workerPid
* @param string $queue
* @param integer $createdAt
*
* @return void
*/
public function registerWorker(Worker $worker): void
{
$this->RedisConnection->set(
self::WORKER_STATUS_PREFIX . ':' . $worker->pid(),
$worker
);
}
/**
* Update worker
*
* @param integer $workerPid
* @param integer $status
*
* @return void
*/
public function updateWorkerStatus(int $workerPid, int $status): void
{
$worker = $this->getWorker($workerPid);
if (!$worker) {
CakeLog::warning("updateWorkerStatus: worker with PID: {$workerPid} not found.");
return;
}
$worker->setUpdatedAt(time());
$worker->setStatus($status);
$this->RedisConnection->set(
self::WORKER_STATUS_PREFIX . ':' . $workerPid,
$worker
);
}
/**
* Unregister worker
*
* @param integer $workerPid
*
* @return void
*/
public function unregisterWorker(int $workerPid): void
{
$this->RedisConnection->del(self::WORKER_STATUS_PREFIX . ':' . $workerPid);
}
/**
* Start worker by name
*
@ -721,4 +642,22 @@ class BackgroundJobsTool
throw new NotFoundException("Worker with pid=$pid not found.");
}
/**
* Convert process status to worker status
*
* @param integer $stateId
* @return integer
*/
private function convertProcessStatus(int $stateId): int
{
switch ($stateId) {
case \Supervisor\Process::RUNNING:
return Worker::STATUS_RUNNING;
case \Supervisor\Process::UNKNOWN:
return Worker::STATUS_UNKNOWN;
default:
return Worker::STATUS_FAILED;
}
}
}