new: [CLI] Add ability to show running jobs

pull/9480/head
Jakub Onderka 2024-01-08 18:42:43 +01:00
parent bb36276a11
commit 11a67099cc
5 changed files with 49 additions and 20 deletions

View File

@ -46,15 +46,19 @@ class StartWorkerShell extends AppShell
);
$this->maxExecutionTime = (int)$this->params['maxExecutionTime'];
$queue = $this->worker->queue();
$backgroundJobTool = $this->getBackgroundJobsTool();
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - starting to process background jobs...");
CakeLog::info("[WORKER PID: {$this->worker->pid()}][{$queue}] - starting to process background jobs...");
while (true) {
$this->checkMaxExecutionTime();
$job = $this->getBackgroundJobsTool()->dequeue($this->worker->queue());
$job = $backgroundJobTool->dequeue($queue);
if ($job) {
$backgroundJobTool->addToRunning($queue, $job);
$this->runJob($job);
$backgroundJobTool->removeFromRunning($queue, $job);
}
}
}

View File

@ -36,9 +36,12 @@ class WorkerShell extends AppShell
public function showQueues()
{
$queues = $this->getBackgroundJobsTool()->getQueues();
foreach ($queues as $queue) {
$this->out("{$queue}:\t{$this->getBackgroundJobsTool()->getQueueSize($queue)}");
$tool = $this->getBackgroundJobsTool();
foreach (BackgroundJobsTool::VALID_QUEUES as $queue) {
$this->out("{$queue}:\t{$tool->getQueueSize($queue)}");
foreach ($tool->runningJobs($queue) as $jobId) {
$this->out(" - $jobId");
}
}
}

View File

@ -1861,7 +1861,7 @@ class ServersController extends AppController
}
if (Configure::read('SimpleBackgroundJobs.enabled')) {
$this->Server->getBackgroundJobsTool()->purgeQueue($worker);
$this->Server->getBackgroundJobsTool()->clearQueue($worker);
} else {
// CakeResque
$worker_array = array('cache', 'default', 'email', 'prio');

View File

@ -153,6 +153,9 @@ class BackgroundJob implements JsonSerializable
return ['id', 'command', 'args', 'createdAt', 'updatedAt', 'status', 'output', 'error', 'metadata'];
}
/**
* @return string Background job ID in UUID format
*/
public function id(): string
{
return $this->id;

View File

@ -91,7 +91,8 @@ class BackgroundJobsTool
];
const JOB_STATUS_PREFIX = 'job_status',
DATA_CONTENT_PREFIX = 'data_content';
DATA_CONTENT_PREFIX = 'data_content',
RUNNING_JOB_PREFIX = 'running';
/** @var array */
private $settings;
@ -277,6 +278,37 @@ class BackgroundJobsTool
return null;
}
/**
* @param string $queue
* @param BackgroundJob $job
* @return void
*/
public function addToRunning(string $queue, BackgroundJob $job)
{
$this->RedisConnection->sAdd(self::RUNNING_JOB_PREFIX . ':' . $queue, $job->id());
}
/**
* @param string $queue
* @param BackgroundJob $job
* @return void
*/
public function removeFromRunning(string $queue, BackgroundJob $job)
{
$this->RedisConnection->sRem(self::RUNNING_JOB_PREFIX . ':' . $queue, $job->id());
}
/**
* Return current running jobs
* @param string $queue
* @return string[] Background jobs IDs
* @throws RedisException
*/
public function runningJobs(string $queue): array
{
return $this->RedisConnection->sMembers(self::RUNNING_JOB_PREFIX . ':' . $queue);
}
/**
* Get the job status.
*
@ -500,19 +532,6 @@ class BackgroundJobsTool
$this->getSupervisor()->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
}
/**
* Purge queue
*
* @param string $queue
* @return void
*/
public function purgeQueue(string $queue)
{
$this->validateQueue($queue);
$this->RedisConnection->del($queue);
}
/**
* Return Background Jobs status
*