From fb7e7212ef8f01cc5db9c0e7b0645fbcc1efdeea Mon Sep 17 00:00:00 2001 From: Luciano Righetti Date: Wed, 10 Nov 2021 10:36:11 +0100 Subject: [PATCH] chg: remove workers monitor script, rely on Supervisor API for all worker-related stuff --- app/Console/Command/MonitorWorkersShell.php | 73 ------------ app/Console/Command/StartWorkerShell.php | 2 - app/Lib/Tools/BackgroundJobs/Worker.php | 2 +- app/Lib/Tools/BackgroundJobsTool.php | 123 +++++--------------- 4 files changed, 32 insertions(+), 168 deletions(-) delete mode 100644 app/Console/Command/MonitorWorkersShell.php diff --git a/app/Console/Command/MonitorWorkersShell.php b/app/Console/Command/MonitorWorkersShell.php deleted file mode 100644 index 42af55201..000000000 --- a/app/Console/Command/MonitorWorkersShell.php +++ /dev/null @@ -1,73 +0,0 @@ -BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs')); - } - - public function getOptionParser(): ConsoleOptionParser - { - $parser = parent::getOptionParser(); - $parser->addOption( - 'sleep', - [ - 'help' => 'Sleep interval between jobs (seconds).', - 'default' => self::DEFAULT_SLEEP_INTERVAL, - 'required' => false - ] - ); - - return $parser; - } - - public function main(): void - { - $this->sleepInterval = (int)$this->params['sleep']; - CakeLog::info("[WORKERS MONITOR] - starting to monitor workers..."); - - while (true) { - $this->ConfigLoad->execute(); - $this->checkWorkersProcessStatus($this->BackgroundJobsTool->getWorkers()); - - sleep($this->sleepInterval); - } - } - - /** - * Check workers process status - * - * @param Worker[] $workers - * @return void - */ - private function checkWorkersProcessStatus(array $workers): void - { - foreach ($workers as $worker) { - if (!file_exists("/proc/{$worker->pid()}")) { - CakeLog::info("[WORKERS MONITOR] - worker with pid {$worker->pid()} is gone."); - $this->BackgroundJobsTool->unregisterWorker($worker->pid()); - } - - $this->BackgroundJobsTool->updateWorkerStatus( - $worker->pid(), - Worker::STATUS_RUNNING - ); - } - } -} diff --git a/app/Console/Command/StartWorkerShell.php b/app/Console/Command/StartWorkerShell.php index 0384044f7..608021b49 100644 --- a/app/Console/Command/StartWorkerShell.php +++ b/app/Console/Command/StartWorkerShell.php @@ -62,7 +62,6 @@ class StartWorkerShell extends AppShell $this->maxExecutionTime = (int)$this->params['maxExecutionTime']; CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - starting to process background jobs..."); - $this->BackgroundJobsTool->registerWorker($this->worker); while (true) { $this->ConfigLoad->execute(); @@ -93,7 +92,6 @@ class StartWorkerShell extends AppShell { if ((time() - $this->worker->createdAt()) > $this->maxExecutionTime) { CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - worker max execution time reached, exiting gracefully worker..."); - $this->BackgroundJobsTool->unregisterWorker($this->worker->pid()); exit; } } diff --git a/app/Lib/Tools/BackgroundJobs/Worker.php b/app/Lib/Tools/BackgroundJobs/Worker.php index 52567aa9f..66558bca8 100644 --- a/app/Lib/Tools/BackgroundJobs/Worker.php +++ b/app/Lib/Tools/BackgroundJobs/Worker.php @@ -52,7 +52,7 @@ class Worker implements JsonSerializable $this->user = $properties['user']; $this->createdAt = $properties['createdAt'] ?? time(); $this->updatedAt = $properties['updatedAt'] ?? null; - $this->status = $properties['status'] ?? self::STATUS_RUNNING; + $this->status = $properties['status'] ?? self::STATUS_UNKNOWN; } public function jsonSerialize(): array diff --git a/app/Lib/Tools/BackgroundJobsTool.php b/app/Lib/Tools/BackgroundJobsTool.php index 038faf336..3c8ebb169 100644 --- a/app/Lib/Tools/BackgroundJobsTool.php +++ b/app/Lib/Tools/BackgroundJobsTool.php @@ -102,7 +102,6 @@ class BackgroundJobsTool ]; public const JOB_STATUS_PREFIX = 'job_status'; - public const WORKER_STATUS_PREFIX = 'worker_status'; /** @var array */ private $settings; @@ -300,22 +299,6 @@ class BackgroundJobsTool return (bool) $this->RedisConnection->del($queue); } - /** - * Get worker by PID. - * - * @return Worker|null Worker instance. - */ - public function getWorker(int $workerPid): ?Worker - { - $rawWorker = $this->RedisConnection->get(self::WORKER_STATUS_PREFIX . ':' . $workerPid); - - if ($rawWorker) { - return new Worker($rawWorker); - } - - return null; - } - /** * Get all workers' instances. * @@ -323,27 +306,20 @@ class BackgroundJobsTool */ public function getWorkers(): array { - $pattern = self::WORKER_STATUS_PREFIX . ':*'; - - // get existing workers status keys - $iterator = null; - $workersKeys = []; - while ($keys = $this->RedisConnection->scan($iterator, $pattern)) { - foreach ($keys as $key) { - $workersKeys[] = $key; - } - } - - if (!$workersKeys) { - return []; - } - - // get workers status - $workersStatus = $this->RedisConnection->mget($workersKeys); - $workers = []; - foreach ($workersStatus as $worker) { - $workers[] = new Worker($worker); + $procs = $this->Supervisor->getAllProcesses(); + + foreach ($procs as $proc) { + if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP) { + $workers[] = new Worker([ + 'pid' => $proc->offsetGet('pid'), + 'queue' => explode("_", $proc->offsetGet('name'))[0], + 'user' => trim(shell_exec(sprintf("ps -o uname= -p %s", (int) $proc->offsetGet('pid')))), + 'createdAt' => $proc->offsetGet('start'), + 'updatedAt' => $proc->offsetGet('now'), + 'status' => $this->convertProcessStatus($proc->offsetGet('state')) + ]); + } } return $workers; @@ -406,61 +382,6 @@ class BackgroundJobsTool return $job->returnCode(); } - /** - * Register worker - * - * @param integer $workerPid - * @param string $queue - * @param integer $createdAt - * - * @return void - */ - public function registerWorker(Worker $worker): void - { - $this->RedisConnection->set( - self::WORKER_STATUS_PREFIX . ':' . $worker->pid(), - $worker - ); - } - - /** - * Update worker - * - * @param integer $workerPid - * @param integer $status - * - * @return void - */ - public function updateWorkerStatus(int $workerPid, int $status): void - { - $worker = $this->getWorker($workerPid); - - if (!$worker) { - CakeLog::warning("updateWorkerStatus: worker with PID: {$workerPid} not found."); - return; - } - - $worker->setUpdatedAt(time()); - $worker->setStatus($status); - - $this->RedisConnection->set( - self::WORKER_STATUS_PREFIX . ':' . $workerPid, - $worker - ); - } - - /** - * Unregister worker - * - * @param integer $workerPid - * - * @return void - */ - public function unregisterWorker(int $workerPid): void - { - $this->RedisConnection->del(self::WORKER_STATUS_PREFIX . ':' . $workerPid); - } - /** * Start worker by name * @@ -721,4 +642,22 @@ class BackgroundJobsTool throw new NotFoundException("Worker with pid=$pid not found."); } + + /** + * Convert process status to worker status + * + * @param integer $stateId + * @return integer + */ + private function convertProcessStatus(int $stateId): int + { + switch ($stateId) { + case \Supervisor\Process::RUNNING: + return Worker::STATUS_RUNNING; + case \Supervisor\Process::UNKNOWN: + return Worker::STATUS_UNKNOWN; + default: + return Worker::STATUS_FAILED; + } + } }