chg: [CLI] Track worker process ID

pull/9479/head
Jakub Onderka 2024-01-12 11:58:49 +01:00
parent 391f0324e9
commit 59916f848a
4 changed files with 30 additions and 10 deletions

View File

@ -83,7 +83,7 @@ class StartWorkerShell extends AppShell
$start = microtime(true);
$job->run(function (array $status) use ($job) {
$this->getBackgroundJobsTool()->markAsRunning($this->worker, $job);
$this->getBackgroundJobsTool()->markAsRunning($this->worker, $job, $status['pid']);
});
$duration = number_format(microtime(true) - $start, 3, '.', '');

View File

@ -34,13 +34,20 @@ class WorkerShell extends AppShell
return $parser;
}
/**
* @throws RedisException
* @throws JsonException
*/
public function showQueues()
{
$tool = $this->getBackgroundJobsTool();
$runningJobs = $tool->runningJobs();
foreach (BackgroundJobsTool::VALID_QUEUES as $queue) {
$this->out("{$queue}:\t{$tool->getQueueSize($queue)}");
foreach ($tool->runningJobs($queue) as $jobId) {
$this->out(" - $jobId");
$queueJobs = $runningJobs[$queue] ?? [];
foreach ($queueJobs as $jobId => $data) {
$this->out(" - $jobId (" . JsonTool::encode($data) .")");
}
}
}

View File

@ -113,6 +113,14 @@ class BackgroundJob implements JsonSerializable
$this->output = '';
$this->error = '';
if ($runningCallback) {
$status = proc_get_status($process);
if ($status === false) {
throw new RuntimeException("Could not get process status");
}
$runningCallback($status);
}
while (true) {
$read = [$pipes[1], $pipes[2]];
$write = null;

View File

@ -281,13 +281,17 @@ class BackgroundJobsTool
/**
* @param Worker $worker
* @param BackgroundJob $job
* @param int|null $pid
* @return void
* @throws RedisException
*/
public function markAsRunning(Worker $worker, BackgroundJob $job)
public function markAsRunning(Worker $worker, BackgroundJob $job, $pid = null)
{
$key = self::RUNNING_JOB_PREFIX . ':' . $worker->queue() . ':' . $job->id();
$this->RedisConnection->setex($key, 60, $worker->pid());
$this->RedisConnection->setex($key, 60, [
'worker_pid' => $worker->pid(),
'process_pid' => $pid,
]);
}
/**
@ -304,19 +308,20 @@ class BackgroundJobsTool
/**
* Return current running jobs
* @param string $queue
* @return string[] Background jobs IDs
* @return array
* @throws RedisException
*/
public function runningJobs(string $queue): array
public function runningJobs(): array
{
$pattern = $this->RedisConnection->_prefix(self::RUNNING_JOB_PREFIX . ':' . $queue . ':*');
$pattern = $this->RedisConnection->_prefix(self::RUNNING_JOB_PREFIX . ':*');
$keys = RedisTool::keysByPattern($this->RedisConnection, $pattern);
$jobIds = [];
foreach ($keys as $key) {
$parts = explode(':', $key);
$jobIds[] = end($parts);
$queue = $parts[2];
$jobId = $parts[3];
$jobIds[$queue][$jobId] = $this->RedisConnection->get(self::RUNNING_JOB_PREFIX . ":$queue:$jobId");
}
return $jobIds;
}