Merge pull request #7965 from JakubOnderka/bg-worker-simplify

chg: [internal] Bg worker cleanup
pull/7971/head
Jakub Onderka 2021-11-18 09:46:45 +01:00 committed by GitHub
commit 0f7d70a825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 28 deletions

View File

@ -17,8 +17,6 @@ class StartWorkerShell extends AppShell
private const DEFAULT_MAX_EXECUTION_TIME = 86400; // 1 day
public $tasks = ['ConfigLoad'];
public function initialize(): void
{
parent::initialize();
@ -40,7 +38,7 @@ class StartWorkerShell extends AppShell
->addOption(
'maxExecutionTime',
[
'help' => 'Worker maximum execution time (seconds) before it self-destruct.',
'help' => 'Worker maximum execution time (seconds) before it self-destruct. Zero means unlimited.',
'default' => self::DEFAULT_MAX_EXECUTION_TIME,
'required' => false
]
@ -64,18 +62,17 @@ class StartWorkerShell extends AppShell
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - starting to process background jobs...");
while (true) {
$this->ConfigLoad->execute();
$this->checkMaxExecutionTime();
$job = $this->BackgroundJobsTool->dequeue($this->worker->queue());
if ($job) {
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - launching job with id: {$job->id()} ...");
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - launching job with ID: {$job->id()}...");
try {
$this->BackgroundJobsTool->run($job);
} catch (Exception $exception) {
CakeLog::error("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - job id: {$job->id()} failed with exception: {$exception->getMessage()}");
CakeLog::error("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - job ID: {$job->id()} failed with exception: {$exception->getMessage()}");
$job->setStatus(BackgroundJob::STATUS_FAILED);
$this->BackgroundJobsTool->update($job);
}
@ -90,6 +87,9 @@ class StartWorkerShell extends AppShell
*/
private function checkMaxExecutionTime(): void
{
if ($this->maxExecutionTime === 0) {
return;
}
if ((time() - $this->worker->createdAt()) > $this->maxExecutionTime) {
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - worker max execution time reached, exiting gracefully worker...");
exit;

View File

@ -2,8 +2,6 @@
declare(strict_types=1);
App::uses('Model', 'Model');
class BackgroundJob implements JsonSerializable
{
public const

View File

@ -2,8 +2,6 @@
declare(strict_types=1);
App::uses('Model', 'Model');
class Worker implements JsonSerializable
{
/** @var integer|null */

View File

@ -116,7 +116,6 @@ class BackgroundJobsTool
if ($this->settings['enabled'] === true) {
$this->RedisConnection = $this->createRedisConnection();
$this->Supervisor = $this->createSupervisorConnection();
}
}
@ -128,8 +127,8 @@ class BackgroundJobsTool
* @param array $args Arguments passed to the job.
* @param boolean|null $trackStatus Whether to track the status of the job.
* @param int|null $jobId Id of the relational database record representing the job.
* @return string Background Job Id.
* @param array $metadata Related to the job.
* @return string Background Job ID.
* @throws InvalidArgumentException
*/
public function enqueue(
@ -223,11 +222,7 @@ class BackgroundJobsTool
$rawJob = $this->RedisConnection->blpop($queue, $timeout);
if (!empty($rawJob)) {
try {
return new BackgroundJob($rawJob[1]);
} catch (Exception $exception) {
CakeLog::error("Failed to parse job, invalid format: {$rawJob[1]}. exception: {$exception->getMessage()}");
}
return new BackgroundJob($rawJob[1]);
}
return null;
@ -236,7 +231,7 @@ class BackgroundJobsTool
/**
* Get the job status.
*
* @param string $jobId Backgroun Job Id.
* @param string $jobId Background Job Id.
*
* @return BackgroundJob|null job status.
*
@ -286,7 +281,7 @@ class BackgroundJobsTool
public function getWorkers(): array
{
$workers = [];
$procs = $this->Supervisor->getAllProcesses();
$procs = $this->getSupervisor()->getAllProcesses();
foreach ($procs as $proc) {
if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP) {
@ -374,7 +369,7 @@ class BackgroundJobsTool
{
$this->validateWorkerName($name);
return $this->Supervisor->startProcess(
return $this->getSupervisor()->startProcess(
sprintf(
'%s:%s',
self::MISP_WORKERS_PROCESS_GROUP,
@ -395,13 +390,13 @@ class BackgroundJobsTool
{
$this->validateQueue($queue);
$procs = $this->Supervisor->getAllProcesses();
$procs = $this->getSupervisor()->getAllProcesses();
foreach ($procs as $proc) {
if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP) {
$name = explode("_", $proc->offsetGet('name'))[0];
if ($name === $queue && $proc->offsetGet('state') != \Supervisor\Process::RUNNING) {
return $this->Supervisor->startProcess(
return $this->getSupervisor()->startProcess(
sprintf(
'%s:%s',
self::MISP_WORKERS_PROCESS_GROUP,
@ -434,7 +429,7 @@ class BackgroundJobsTool
$this->validateWorkerName($name);
return $this->Supervisor->stopProcess(
return $this->getSupervisor()->stopProcess(
sprintf(
'%s:%s',
self::MISP_WORKERS_PROCESS_GROUP,
@ -452,8 +447,8 @@ class BackgroundJobsTool
*/
public function restartWorkers(bool $waitForRestart = false): void
{
$this->Supervisor->stopProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
$this->Supervisor->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
$this->getSupervisor()->stopProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
$this->getSupervisor()->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
}
/**
@ -464,7 +459,7 @@ class BackgroundJobsTool
*/
public function restartDeadWorkers(bool $waitForRestart = false): void
{
$this->Supervisor->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
$this->getSupervisor()->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
}
/**
@ -499,7 +494,7 @@ class BackgroundJobsTool
}
try {
$supervisorStatus = ($this->Supervisor->getState()['statecode'] === \Supervisor\Supervisor::RUNNING);
$supervisorStatus = $this->getSupervisor()->getState()['statecode'] === \Supervisor\Supervisor::RUNNING;
} catch (Exception $exception) {
CakeLog::error("SimpleBackgroundJobs Supervisor error: {$exception->getMessage()}");
$supervisorStatus = false;
@ -597,6 +592,17 @@ class BackgroundJobsTool
return $redis;
}
/**
* @return \Supervisor\Supervisor
*/
private function getSupervisor()
{
if (!$this->Supervisor) {
$this->Supervisor = $this->createSupervisorConnection();
}
return $this->Supervisor;
}
/**
* @return \Supervisor\Supervisor
*/
@ -644,7 +650,7 @@ class BackgroundJobsTool
*/
private function getProcessByPid(int $pid): \Supervisor\Process
{
$procs = $this->Supervisor->getAllProcesses();
$procs = $this->getSupervisor()->getAllProcesses();
foreach ($procs as $proc) {
if (