chg: [workflow] non-blocking workflows are run by background workers

pull/8530/head
Sami Mokaddem 2022-06-24 12:20:03 +02:00
parent 14ff17a6e8
commit 351a3cfd4e
No known key found for this signature in database
GPG Key ID: 164C473F627A06FA
3 changed files with 79 additions and 2 deletions

View File

@ -8,6 +8,35 @@ class WorkflowShell extends AppShell {
public $uses = ['Job', 'Workflow'];
public $tasks = ['ConfigLoad'];
public function executeWorkflowForTrigger()
{
$this->ConfigLoad->execute();
if (empty($this->args[0]) || empty($this->args[1]) || empty($this->args[2]) || empty($this->args[3])) {
die(__('Invalid number of arguments.'));
}
$trigger_id = $this->args[0];
$data = JsonTool::decode($this->args[1]);
$logging = JsonTool::decode($this->args[2]);
$jobId = $this->args[3];
$blockingErrors = [];
$executionSuccess = $this->Workflow->executeWorkflowForTrigger($trigger_id, $data, $blockingErrors);
$job = $this->Job->read(null, $jobId);
$job['Job']['progress'] = 100;
$job['Job']['status'] = Job::STATUS_COMPLETED;
$job['Job']['date_modified'] = date("Y-m-d H:i:s");
if ($executionSuccess) {
$job['Job']['message'] = __('Workflow for trigger `%s` completed execution', $trigger_id);
} else {
$errorMessage = implode(', ', $blockingErrors);
$message = __('Error while executing workflow for trigger `%s`: %s. %s%s', $trigger_id, $logging['message'], PHP_EOL . __('Returned message: %s', $errorMessage));
$job['Job']['message'] = $message;
}
$this->Job->save($job);
}
public function walkGraph()
{
$this->ConfigLoad->execute();

View File

@ -3435,7 +3435,7 @@ class AppModel extends Model
$this->Workflow->checkTriggerEnabled($trigger_id) &&
$this->Workflow->checkTriggerListenedTo($trigger_id)
) {
$success = $this->Workflow->executeWorkflowForTrigger($trigger_id, $data, $blockingErrors);
$success = $this->Workflow->executeWorkflowForTriggerRouter($trigger_id, $data, $blockingErrors, $logging);
if (!empty($logging)) {
$errorMessage = implode(', ', $blockingErrors);
$this->loadLog()->createLogEntry('SYSTEM', $logging['action'], $logging['model'], $logging['id'], $logging['message'], __('Returned message: %s', $errorMessage));

View File

@ -328,6 +328,54 @@ class Workflow extends AppModel
return count($triggers) <= 1;
}
/**
* executeWorkflowForTrigger
*
* @param string $trigger_id
* @param array $data
* @throws TriggerNotFoundException
*/
public function executeWorkflowForTriggerRouter($trigger_id, array $data, array &$blockingErrors=[], array $logging=[]): bool
{
$this->loadAllWorkflowModules();
if (empty($this->loaded_modules['trigger'][$trigger_id])) {
throw new TriggerNotFoundException(__('Unknown trigger `%s`', $trigger_id));
}
$trigger = $this->loaded_modules['trigger'][$trigger_id];
if (!empty($trigger['disabled'])) {
return true;
}
if (empty($trigger['canAbort'])) {
$this->Job = ClassRegistry::init('Job');
$jobId = $this->Job->createJob(
'SYSTEM',
Job::WORKER_PRIO,
'executeWorkflowForTrigger',
sprintf('Workflow for trigger `%s`', $trigger_id),
__('Executing non-blocking workflow for trigger `%s`', $trigger_id)
);
$this->Job->getBackgroundJobsTool()->enqueue(
BackgroundJobsTool::PRIO_QUEUE,
BackgroundJobsTool::CMD_WORKFLOW,
[
'executeWorkflowForTrigger',
$trigger_id,
JsonTool::encode($data),
JsonTool::encode($logging),
$jobId
],
true,
$jobId
);
return true;
} else {
$blockingPathExecutionSuccess = $this->executeWorkflowForTrigger($trigger_id, $data, $blockingErrors);
return $blockingPathExecutionSuccess;
}
}
/**
* executeWorkflowForTrigger
*
@ -348,7 +396,7 @@ class Workflow extends AppModel
if (!empty($trigger['disabled'])) {
return true;
}
$workflow = $this->fetchWorkflowByTrigger($trigger_id, true);
if (empty($workflow)) {
throw new WorkflowNotFoundException(__('Could not get workflow for trigger `%s`', $trigger_id));