chg: [internal] Track running jobs

pull/9480/head
Jakub Onderka 2024-01-10 09:03:09 +01:00
parent 1413a13d52
commit 57459063cd
5 changed files with 64 additions and 21 deletions

View File

@ -37,9 +37,14 @@ class StartWorkerShell extends AppShell
public function main()
{
$pid = getmypid();
if ($pid === false) {
throw new RuntimeException("Could not get current process ID");
}
$this->worker = new Worker(
[
'pid' => getmypid(),
'pid' => $pid,
'queue' => $this->args[0],
'user' => ProcessTool::whoami(),
]
@ -56,9 +61,8 @@ class StartWorkerShell extends AppShell
$job = $backgroundJobTool->dequeue($queue);
if ($job) {
$backgroundJobTool->addToRunning($queue, $job);
$this->runJob($job);
$backgroundJobTool->removeFromRunning($queue, $job);
$backgroundJobTool->removeFromRunning($this->worker, $job);
}
}
}
@ -78,7 +82,9 @@ class StartWorkerShell extends AppShell
$this->getBackgroundJobsTool()->update($job);
$start = microtime(true);
$job->run();
$job->run(function (array $status) use ($job) {
$this->getBackgroundJobsTool()->markAsRunning($this->worker, $job);
});
$duration = number_format(microtime(true) - $start, 3, '.', '');
if ($job->status() === BackgroundJob::STATUS_COMPLETED) {

View File

@ -66,8 +66,9 @@ class BackgroundJob implements JsonSerializable
/**
* Run the job command
* @param callable|null $runningCallback
*/
public function run(): void
public function run(callable $runningCallback = null): void
{
$descriptorSpec = [
1 => ["pipe", "w"], // stdout
@ -88,7 +89,7 @@ class BackgroundJob implements JsonSerializable
['BACKGROUND_JOB_ID' => $this->id]
);
$this->pool($process, $pipes);
$this->pool($process, $pipes, $runningCallback);
if ($this->returnCode === 0 && empty($stderr)) {
$this->setStatus(BackgroundJob::STATUS_COMPLETED);
@ -98,7 +99,13 @@ class BackgroundJob implements JsonSerializable
}
}
private function pool($process, array $pipes)
/**
* @param resource $process
* @param array $pipes
* @param callable|null $runningCallback
* @return void
*/
private function pool($process, array $pipes, callable $runningCallback = null)
{
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
@ -118,6 +125,12 @@ class BackgroundJob implements JsonSerializable
$this->error .= stream_get_contents($pipes[2]);
}
$status = proc_get_status($process);
if ($status === false) {
throw new RuntimeException("Could not get process status");
}
if ($runningCallback) {
$runningCallback($status);
}
if (!$status['running']) {
// Just in case read rest data from stream
$this->output .= stream_get_contents($pipes[1]);

View File

@ -65,7 +65,7 @@ class Worker implements JsonSerializable
];
}
public function pid(): ?int
public function pid(): int
{
return $this->pid;
}

View File

@ -279,23 +279,27 @@ class BackgroundJobsTool
}
/**
* @param string $queue
* @param Worker $worker
* @param BackgroundJob $job
* @return void
* @throws RedisException
*/
public function addToRunning(string $queue, BackgroundJob $job)
public function markAsRunning(Worker $worker, BackgroundJob $job)
{
$this->RedisConnection->sAdd(self::RUNNING_JOB_PREFIX . ':' . $queue, $job->id());
$key = self::RUNNING_JOB_PREFIX . ':' . $worker->queue() . ':' . $job->id();
$this->RedisConnection->setex($key, 60, $worker->pid());
}
/**
* @param string $queue
* @param Worker $worker
* @param BackgroundJob $job
* @return void
* @throws RedisException
*/
public function removeFromRunning(string $queue, BackgroundJob $job)
public function removeFromRunning(Worker $worker, BackgroundJob $job)
{
$this->RedisConnection->sRem(self::RUNNING_JOB_PREFIX . ':' . $queue, $job->id());
$key = self::RUNNING_JOB_PREFIX . ':' . $worker->queue() . ':' . $job->id();
$this->RedisConnection->del($key);
}
/**
@ -306,7 +310,15 @@ class BackgroundJobsTool
*/
public function runningJobs(string $queue): array
{
return $this->RedisConnection->sMembers(self::RUNNING_JOB_PREFIX . ':' . $queue);
$pattern = $this->RedisConnection->_prefix(self::RUNNING_JOB_PREFIX . ':' . $queue . ':*');
$keys = RedisTool::keysByPattern($this->RedisConnection, $pattern);
$jobIds = [];
foreach ($keys as $key) {
$parts = explode(':', $key);
$jobIds[] = end($parts);
}
return $jobIds;
}
/**
@ -747,8 +759,7 @@ class BackgroundJobsTool
*
* @param integer $pid
* @return \Supervisor\Process
*
* @throws NotFoundException
* @throws NotFoundException|Exception
*/
private function getProcessByPid(int $pid): \Supervisor\Process
{

View File

@ -57,24 +57,37 @@ class RedisTool
/**
* @param Redis $redis
* @param string|array $pattern
* @return int|Redis Number of deleted keys or instance of Redis if used in MULTI mode
* @return Generator<string>
* @throws RedisException
*/
public static function deleteKeysByPattern(Redis $redis, $pattern)
public static function keysByPattern(Redis $redis, $pattern)
{
if (is_string($pattern)) {
$pattern = [$pattern];
}
$allKeys = [];
foreach ($pattern as $p) {
$iterator = null;
while (false !== ($keys = $redis->scan($iterator, $p, 1000))) {
foreach ($keys as $key) {
$allKeys[] = $key;
yield $key;
}
}
}
}
/**
* @param Redis $redis
* @param string|array $pattern
* @return int|Redis Number of deleted keys or instance of Redis if used in MULTI mode
* @throws RedisException
*/
public static function deleteKeysByPattern(Redis $redis, $pattern)
{
$allKeys = [];
foreach (self::keysByPattern($redis, $pattern) as $key) {
$allKeys[] = $key;
}
if (empty($allKeys)) {
return 0;