new: support existing worker controls via supervisor api

pull/7939/head
Luciano Righetti 2021-11-04 11:42:17 +01:00
parent 5242d3204d
commit 7ba758efb0
2 changed files with 182 additions and 19 deletions

View File

@ -1209,10 +1209,19 @@ class ServersController extends AppController
throw new MethodNotAllowedException();
}
if (!Configure::read('BackgroundJobs.use_resque')) {
throw new MethodNotAllowedException('Starting workers via API is not implemented. Use supervisor CLI.');
if (Configure::read('BackgroundJobs.enabled')) {
$message = __('Worker start signal sent');
$this->Server->getBackgroundJobsTool()->startWorker($type);
if ($this->_isRest()) {
return $this->RestResponse->saveSuccessResponse('Servers', 'startWorker', $type, $this->response->type(), $message);
} else {
$this->Flash->info($message);
$this->redirect('/servers/serverSettings/workers');
}
}
// CakeResque
$validTypes = array('default', 'email', 'scheduler', 'cache', 'prio', 'update');
if (!in_array($type, $validTypes)) {
throw new MethodNotAllowedException('Invalid worker type.');
@ -1249,8 +1258,21 @@ class ServersController extends AppController
if (!$this->request->is('post')) {
throw new MethodNotAllowedException();
}
$this->Server->killWorker($pid, $this->Auth->user());
$message = __('Worker stop signal sent');
if (Configure::read('BackgroundJobs.enabled')) {
$this->Server->getBackgroundJobsTool()->stopWorker($pid);
if ($this->_isRest()) {
return $this->RestResponse->saveSuccessResponse('Servers', 'stopWorker', $pid, $this->response->type(), $message);
} else {
$this->Flash->info($message);
$this->redirect('/servers/serverSettings/workers');
}
}
// CakeResque
$this->Server->killWorker($pid, $this->Auth->user());
if ($this->_isRest()) {
return $this->RestResponse->saveSuccessResponse('Servers', 'stopWorker', $pid, $this->response->type(), $message);
} else {
@ -1516,7 +1538,14 @@ class ServersController extends AppController
if (!$this->request->is('post')) {
throw new MethodNotAllowedException();
}
$this->Server->restartWorkers($this->Auth->user());
if (Configure::read('BackgroundJobs.enabled')) {
$this->Server->getBackgroundJobsTool()->restartWorkers();
} else {
// CakeResque
$this->Server->restartWorkers($this->Auth->user());
}
if ($this->_isRest()) {
return $this->RestResponse->saveSuccessResponse('Server', 'restartWorkers', false, $this->response->type(), __('Restarting workers.'));
}
@ -1528,7 +1557,14 @@ class ServersController extends AppController
if (!$this->request->is('post')) {
throw new MethodNotAllowedException();
}
$this->Server->restartDeadWorkers($this->Auth->user());
if (Configure::read('BackgroundJobs.enabled')) {
$this->Server->getBackgroundJobsTool()->restartDeadWorkers();
} else {
// CakeResque
$this->Server->restartDeadWorkers($this->Auth->user());
}
if ($this->_isRest()) {
return $this->RestResponse->saveSuccessResponse('Server', 'restartDeadWorkers', false, $this->response->type(), __('Restarting workers.'));
}
@ -1754,12 +1790,19 @@ class ServersController extends AppController
if (!$this->request->is('Post') || $this->request->is('ajax')) {
throw new MethodNotAllowedException();
}
$worker_array = array('cache', 'default', 'email', 'prio');
if (!in_array($worker, $worker_array)) {
throw new MethodNotAllowedException('Invalid worker');
if (Configure::read('BackgroundJobs.enabled')) {
$this->Server->getBackgroundJobsTool()->purgeQueue($worker);
} else {
// CakeResque
$worker_array = array('cache', 'default', 'email', 'prio');
if (!in_array($worker, $worker_array)) {
throw new MethodNotAllowedException('Invalid worker');
}
$redis = Resque::redis();
$redis->del('queue:' . $worker);
}
$redis = Resque::redis();
$redis->del('queue:' . $worker);
$this->Flash->success('Queue cleared.');
$this->redirect($this->referer());
}

View File

@ -18,10 +18,10 @@ App::uses('BackgroundJob', 'Tools/BackgroundJobs');
* $ ./Console/cake start_worker [queue]
* $ ./Console/cake monitor_workers
*
* It is recommended to run these commands with [supervisord](http://supervisord.org).
* `supervisord` has an extensive feature set to manage scripts as services,
* It is recommended to run these commands with [Supervisor](http://supervisord.org).
* `Supervisor` has an extensive feature set to manage scripts as services,
* such as autorestart, parallel execution, logging, monitoring and much more.
* All can be managed via terminal or a XML-RPC API.
* All can be managed via the terminal or a XML-RPC API.
*
* Use the following configuration as a template for the services:
* /etc/supervisor/conf.d/misp-workers-monitor.conf:
@ -105,14 +105,18 @@ class BackgroundJobsTool
*
* Settings should have the following format:
* [
* 'use_resque' => true,
* 'enabled' => true,
* 'redis_host' => 'localhost',
* 'redis_port' => 6379,
* 'redis_password' => '',
* 'redis_database' => 1,
* 'redis_namespace' => 'background_jobs',
* 'max_job_history_ttl' => 86400
* 'track_status' => 86400
* 'track_status' => 86400,
* 'supervisor_host' => 'localhost',
* 'supervisor_port' => '9001',
* 'supervisor_user' => '',
* 'supervisor_password' => '',
* ]
*
* @param array $settings
@ -122,11 +126,11 @@ class BackgroundJobsTool
{
$this->settings = $settings;
if (!$this->RedisConnection && $this->settings['use_resque'] === false) {
if (!$this->RedisConnection && $this->settings['enabled'] === true) {
$this->RedisConnection = $this->createRedisConnection();
}
if (!$this->Supervisor && $this->settings['use_resque'] === false) {
if (!$this->Supervisor && $this->settings['enabled'] === true) {
$this->Supervisor = $this->createSupervisorConnection();
}
}
@ -152,7 +156,7 @@ class BackgroundJobsTool
array $metadata = []
): string {
if ($this->settings['use_resque']) {
if (!$this->settings['enabled']) {
return $this->resqueEnqueue($queue, self::CMD_TO_SHELL_DICT[$command], $args, $trackStatus, $jobId);
}
@ -352,7 +356,7 @@ class BackgroundJobsTool
{
$this->validateQueue($queue);
if ($this->settings['use_resque']) {
if (!$this->settings['enabled']) {
return CakeResque::getQueueSize($queue);
}
@ -453,6 +457,67 @@ class BackgroundJobsTool
$this->RedisConnection->del(self::WORKER_STATUS_PREFIX . ':' . $workerPid);
}
/**
* Start worker by name
*
* @param string $name
* @param boolean $waitForRestart
* @return boolean
*/
public function startWorker(string $name, bool $waitForRestart = false): bool
{
$this->validateWorkerName($name);
return $this->Supervisor->startProcess(
sprintf(
'%s:%s',
self::MISP_WORKERS_PROCESS_GROUP,
$name
),
$waitForRestart
);
}
/**
* Stop worker by name or pid
*
* @param string|int $id
* @param boolean $waitForRestart
* @return boolean
*/
public function stopWorker($id, bool $waitForRestart = false): bool
{
if (is_numeric($id)) {
$process = $this->getProcessByPid((int)$id);
$name = $process->offsetGet('name');
} else {
$name = $id;
}
$this->validateWorkerName($name);
return $this->Supervisor->stopProcess(
sprintf(
'%s:%s',
self::MISP_WORKERS_PROCESS_GROUP,
$name
),
$waitForRestart
);
}
/**
* Restarts workers
*
* @param boolean $waitForRestart
* @return void
*/
public function restartWorkers(bool $waitForRestart = false): void
{
$this->Supervisor->stopProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
$this->Supervisor->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
}
/**
* Restarts workers with status != RUNNING
*
@ -464,6 +529,19 @@ class BackgroundJobsTool
$this->Supervisor->startProcessGroup(self::MISP_WORKERS_PROCESS_GROUP, $waitForRestart);
}
/**
* Purge queue
*
* @param string $queue
* @return void
*/
public function purgeQueue(string $queue): void
{
$this->validateQueue($queue);
$this->RedisConnection->del($queue);
}
/**
* Validate queue
*
@ -504,6 +582,25 @@ class BackgroundJobsTool
return true;
}
/**
* Validate worker name
*
* @return boolean
* @throws InvalidArgumentException
*/
private function validateWorkerName(string $name): bool
{
list($queue, $id) = explode('_', $name);
$this->validateQueue($queue);
if (!$this->validateQueue($queue) || !is_numeric($id)) {
throw new InvalidArgumentException('Invalid worker name, must be one of format {queue_name}_{process_id}, example: default_00');
}
return true;
}
/**
* @return Redis
*/
@ -558,4 +655,27 @@ class BackgroundJobsTool
return $job;
}
/**
* Get Supervisor process by PID
*
* @param integer $pid
* @return \Supervisor\Process
*
* @throws NotFoundException
*/
private function getProcessByPid(int $pid): \Supervisor\Process
{
$procs = $this->Supervisor->getAllProcesses();
foreach ($procs as $proc) {
if ($proc->offsetGet('group') === self::MISP_WORKERS_PROCESS_GROUP && $proc->offsetGet('pid') === $pid) {
return $proc;
}
}
throw new NotFoundException(
sprintf('Worker with pid=%s not found.', $pid)
);
}
}