diff --git a/app/Console/Command/StartWorkerShell.php b/app/Console/Command/StartWorkerShell.php index 327e881db..3b025f9e9 100644 --- a/app/Console/Command/StartWorkerShell.php +++ b/app/Console/Command/StartWorkerShell.php @@ -17,8 +17,6 @@ class StartWorkerShell extends AppShell private const DEFAULT_MAX_EXECUTION_TIME = 86400; // 1 day - public $tasks = ['ConfigLoad']; - public function initialize(): void { parent::initialize(); @@ -40,7 +38,7 @@ class StartWorkerShell extends AppShell ->addOption( 'maxExecutionTime', [ - 'help' => 'Worker maximum execution time (seconds) before it self-destruct.', + 'help' => 'Worker maximum execution time (seconds) before it self-destruct. Zero means unlimited.', 'default' => self::DEFAULT_MAX_EXECUTION_TIME, 'required' => false ] @@ -64,18 +62,17 @@ class StartWorkerShell extends AppShell CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - starting to process background jobs..."); while (true) { - $this->ConfigLoad->execute(); $this->checkMaxExecutionTime(); $job = $this->BackgroundJobsTool->dequeue($this->worker->queue()); if ($job) { - CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - launching job with id: {$job->id()} ..."); + CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - launching job with ID: {$job->id()}..."); try { $this->BackgroundJobsTool->run($job); } catch (Exception $exception) { - CakeLog::error("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - job id: {$job->id()} failed with exception: {$exception->getMessage()}"); + CakeLog::error("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - job ID: {$job->id()} failed with exception: {$exception->getMessage()}"); $job->setStatus(BackgroundJob::STATUS_FAILED); $this->BackgroundJobsTool->update($job); } @@ -90,6 +87,9 @@ class StartWorkerShell extends AppShell */ private function checkMaxExecutionTime(): void { + if ($this->maxExecutionTime === 0) { + return; + } if ((time() - $this->worker->createdAt()) > $this->maxExecutionTime) { CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - worker max execution time reached, exiting gracefully worker..."); exit; diff --git a/app/Lib/Tools/BackgroundJobs/BackgroundJob.php b/app/Lib/Tools/BackgroundJobs/BackgroundJob.php index 8a7864104..ee216a217 100644 --- a/app/Lib/Tools/BackgroundJobs/BackgroundJob.php +++ b/app/Lib/Tools/BackgroundJobs/BackgroundJob.php @@ -2,8 +2,6 @@ declare(strict_types=1); -App::uses('Model', 'Model'); - class BackgroundJob implements JsonSerializable { public const diff --git a/app/Lib/Tools/BackgroundJobs/Worker.php b/app/Lib/Tools/BackgroundJobs/Worker.php index 741212678..232957ec7 100644 --- a/app/Lib/Tools/BackgroundJobs/Worker.php +++ b/app/Lib/Tools/BackgroundJobs/Worker.php @@ -2,8 +2,6 @@ declare(strict_types=1); -App::uses('Model', 'Model'); - class Worker implements JsonSerializable { /** @var integer|null */ diff --git a/app/Lib/Tools/BackgroundJobsTool.php b/app/Lib/Tools/BackgroundJobsTool.php index 6af896c26..42977ad56 100644 --- a/app/Lib/Tools/BackgroundJobsTool.php +++ b/app/Lib/Tools/BackgroundJobsTool.php @@ -116,7 +116,6 @@ class BackgroundJobsTool if ($this->settings['enabled'] === true) { $this->RedisConnection = $this->createRedisConnection(); - $this->Supervisor = $this->createSupervisorConnection(); } } @@ -128,8 +127,8 @@ class BackgroundJobsTool * @param array $args Arguments passed to the job. * @param boolean|null $trackStatus Whether to track the status of the job. * @param int|null $jobId Id of the relational database record representing the job. - * @return string Background Job Id. * @param array $metadata Related to the job. + * @return string Background Job ID. * @throws InvalidArgumentException */ public function enqueue( @@ -223,11 +222,7 @@ class BackgroundJobsTool $rawJob = $this->RedisConnection->blpop($queue, $timeout); if (!empty($rawJob)) { - try { - return new BackgroundJob($rawJob[1]); - } catch (Exception $exception) { - CakeLog::error("Failed to parse job, invalid format: {$rawJob[1]}. exception: {$exception->getMessage()}"); - } + return new BackgroundJob($rawJob[1]); } return null; @@ -236,7 +231,7 @@ class BackgroundJobsTool /** * Get the job status. * - * @param string $jobId Backgroun Job Id. + * @param string $jobId Background Job Id. * * @return BackgroundJob|null job status. * @@ -286,7 +281,7 @@ class BackgroundJobsTool public function getWorkers(): array { $workers = []; - $procs = $this->Supervisor->getAllProcesses(); + $procs = $this->getSupervisor()->getAllProcesses(); foreach ($procs as $proc) { if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP) { @@ -374,7 +369,7 @@ class BackgroundJobsTool { $this->validateWorkerName($name); - return $this->Supervisor->startProcess( + return $this->getSupervisor()->startProcess( sprintf( '%s:%s', self::MISP_WORKERS_PROCESS_GROUP, @@ -395,13 +390,13 @@ class BackgroundJobsTool { $this->validateQueue($queue); - $procs = $this->Supervisor->getAllProcesses(); + $procs = $this->getSupervisor()->getAllProcesses(); foreach ($procs as $proc) { if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP) { $name = explode("_", $proc->offsetGet('name'))[0]; if ($name === $queue && $proc->offsetGet('state') != \Supervisor\Process::RUNNING) { - return $this->Supervisor->startProcess( + return $this->getSupervisor()->startProcess( sprintf( '%s:%s', self::MISP_WORKERS_PROCESS_GROUP, @@ -434,7 +429,7 @@ class BackgroundJobsTool $this->validateWorkerName($name); - return $this->Supervisor->stopProcess( + return $this->getSupervisor()->stopProcess( sprintf( '%s:%s', self::MISP_WORKERS_PROCESS_GROUP, @@ -452,8 +447,8 @@ class BackgroundJobsTool */ public function restartWorkers(bool $waitForRestart = false): void { - $this->Supervisor->stopProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart); - $this->Supervisor->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart); + $this->getSupervisor()->stopProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart); + $this->getSupervisor()->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart); } /** @@ -464,7 +459,7 @@ class BackgroundJobsTool */ public function restartDeadWorkers(bool $waitForRestart = false): void { - $this->Supervisor->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart); + $this->getSupervisor()->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart); } /** @@ -499,7 +494,7 @@ class BackgroundJobsTool } try { - $supervisorStatus = ($this->Supervisor->getState()['statecode'] === \Supervisor\Supervisor::RUNNING); + $supervisorStatus = $this->getSupervisor()->getState()['statecode'] === \Supervisor\Supervisor::RUNNING; } catch (Exception $exception) { CakeLog::error("SimpleBackgroundJobs Supervisor error: {$exception->getMessage()}"); $supervisorStatus = false; @@ -597,6 +592,17 @@ class BackgroundJobsTool return $redis; } + /** + * @return \Supervisor\Supervisor + */ + private function getSupervisor() + { + if (!$this->Supervisor) { + $this->Supervisor = $this->createSupervisorConnection(); + } + return $this->Supervisor; + } + /** * @return \Supervisor\Supervisor */ @@ -644,7 +650,7 @@ class BackgroundJobsTool */ private function getProcessByPid(int $pid): \Supervisor\Process { - $procs = $this->Supervisor->getAllProcesses(); + $procs = $this->getSupervisor()->getAllProcesses(); foreach ($procs as $proc) { if (