diff --git a/app/Console/Command/AppShell.php b/app/Console/Command/AppShell.php index 6d4afb454..e5c82b818 100644 --- a/app/Console/Command/AppShell.php +++ b/app/Console/Command/AppShell.php @@ -17,6 +17,7 @@ */ App::uses('AppModel', 'Model'); +App::uses('BackgroundJobsTool', 'Tools'); /** * Application Shell @@ -26,10 +27,13 @@ App::uses('AppModel', 'Model'); * * @package app.Console.Command */ -class AppShell extends Shell +abstract class AppShell extends Shell { public $tasks = array('ConfigLoad'); + /** @var BackgroundJobsTool */ + private $BackgroundJobsTool; + public function initialize() { $this->ConfigLoad = $this->Tasks->load('ConfigLoad'); @@ -77,4 +81,16 @@ class AppShell extends Shell $this->error("Invalid state value `$value`, it must be `true`, `false`, `1`, or `0`."); } } + + /** + * @return BackgroundJobsTool + * @throws Exception + */ + protected function getBackgroundJobsTool() + { + if (!isset($this->BackgroundJobsTool)) { + $this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs')); + } + return $this->BackgroundJobsTool; + } } diff --git a/app/Console/Command/EventShell.php b/app/Console/Command/EventShell.php index 1398bd476..00ae7253f 100644 --- a/app/Console/Command/EventShell.php +++ b/app/Console/Command/EventShell.php @@ -432,8 +432,7 @@ class EventShell extends AppShell $sightingsUuidsToPush = []; if (isset($this->args[4])) { // push just specific sightings - $path = $this->args[4][0] === '/' ? $this->args[4] : (APP . 'tmp/cache/ingest' . DS . $this->args[4]); - $sightingsUuidsToPush = $this->Event->jsonDecode(FileAccessTool::readAndDelete($path)); + $sightingsUuidsToPush = $this->getBackgroundJobsTool()->fetchDataFile($this->args[4]); } $this->Event->Behaviors->unload('SysLogLogable.SysLogLogable'); @@ -537,9 +536,7 @@ class EventShell extends AppShell } $inputFile = $this->args[0]; - $inputFile = $inputFile[0] === '/' ? $inputFile : APP . 'tmp/cache/ingest' . DS . $inputFile; - $inputData = FileAccessTool::readAndDelete($inputFile); - $inputData = $this->Event->jsonDecode($inputData); + $inputData = $this->getBackgroundJobsTool()->fetchDataFile($inputFile); Configure::write('CurrentUserId', $inputData['user']['id']); $this->Event->processFreeTextData( $inputData['user'], @@ -560,9 +557,7 @@ class EventShell extends AppShell } $inputFile = $this->args[0]; - $inputFile = $inputFile[0] === '/' ? $inputFile : APP . 'tmp/cache/ingest' . DS . $inputFile; - $inputData = FileAccessTool::readAndDelete($inputFile); - $inputData = $this->Event->jsonDecode($inputData); + $inputData = $this->getBackgroundJobsTool()->fetchDataFile($inputFile); Configure::write('CurrentUserId', $inputData['user']['id']); $this->Event->processModuleResultsData( $inputData['user'], diff --git a/app/Console/Command/ServerShell.php b/app/Console/Command/ServerShell.php index 4ab4722a9..1156878ca 100644 --- a/app/Console/Command/ServerShell.php +++ b/app/Console/Command/ServerShell.php @@ -1,7 +1,6 @@ BackgroundJobsTool)) { - $this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs')); - } - return $this->BackgroundJobsTool; - } } diff --git a/app/Console/Command/StartWorkerShell.php b/app/Console/Command/StartWorkerShell.php index abf344cbd..85590c33c 100644 --- a/app/Console/Command/StartWorkerShell.php +++ b/app/Console/Command/StartWorkerShell.php @@ -2,14 +2,10 @@ declare(strict_types=1); -App::uses('BackgroundJobsTool', 'Tools'); App::uses('ProcessTool', 'Tools'); class StartWorkerShell extends AppShell { - /** @var BackgroundJobsTool */ - private $BackgroundJobsTool; - /** @var Worker */ private $worker; @@ -18,19 +14,13 @@ class StartWorkerShell extends AppShell const DEFAULT_MAX_EXECUTION_TIME = 86400; // 1 day - public function initialize() - { - parent::initialize(); - $this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs')); - } - public function getOptionParser(): ConsoleOptionParser { $parser = parent::getOptionParser(); $parser ->addArgument('queue', [ 'help' => 'Name of the queue to process.', - 'choices' => $this->BackgroundJobsTool->getQueues(), + 'choices' => $this->getBackgroundJobsTool()->getQueues(), 'required' => true ]) ->addOption( @@ -62,7 +52,7 @@ class StartWorkerShell extends AppShell while (true) { $this->checkMaxExecutionTime(); - $job = $this->BackgroundJobsTool->dequeue($this->worker->queue()); + $job = $this->getBackgroundJobsTool()->dequeue($this->worker->queue()); if ($job) { $this->runJob($job); } @@ -81,7 +71,7 @@ class StartWorkerShell extends AppShell $command = implode(' ', array_merge([$job->command()], $job->args())); CakeLog::info("[JOB ID: {$job->id()}] - started command `$command`."); - $this->BackgroundJobsTool->update($job); + $this->getBackgroundJobsTool()->update($job); $job->run(); @@ -94,7 +84,7 @@ class StartWorkerShell extends AppShell CakeLog::error("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - job ID: {$job->id()} failed with exception: {$exception->getMessage()}"); $job->setStatus(BackgroundJob::STATUS_FAILED); } - $this->BackgroundJobsTool->update($job); + $this->getBackgroundJobsTool()->update($job); } /** diff --git a/app/Lib/Tools/BackgroundJobs/BackgroundJob.php b/app/Lib/Tools/BackgroundJobs/BackgroundJob.php index 57fdd00bb..b7a45d86d 100644 --- a/app/Lib/Tools/BackgroundJobs/BackgroundJob.php +++ b/app/Lib/Tools/BackgroundJobs/BackgroundJob.php @@ -148,6 +148,11 @@ class BackgroundJob implements JsonSerializable ]; } + public function __sleep(): array + { + return ['id', 'command', 'args', 'createdAt', 'updatedAt', 'status', 'output', 'error', 'metadata']; + } + public function id(): string { return $this->id; diff --git a/app/Lib/Tools/BackgroundJobsTool.php b/app/Lib/Tools/BackgroundJobsTool.php index ab2ae2b4e..5fa325e5e 100644 --- a/app/Lib/Tools/BackgroundJobsTool.php +++ b/app/Lib/Tools/BackgroundJobsTool.php @@ -90,7 +90,8 @@ class BackgroundJobsTool self::CMD_WORKFLOW => 'WorkflowShell', ]; - const JOB_STATUS_PREFIX = 'job_status'; + const JOB_STATUS_PREFIX = 'job_status', + DATA_CONTENT_PREFIX = 'data_content'; /** @var array */ private $settings; @@ -125,6 +126,47 @@ class BackgroundJobsTool } } + /** + * @param array $data + * @return string Full path to data file or `redis:UUID` when data are stored in Redis + * @throws JsonException + * @throws RedisException + */ + public function enqueueDataFile(array $data) + { + if (!$this->settings['enabled']) { + // Store serialized data to tmp file when SimpleBackgroundJobs are not enabled + return FileAccessTool::writeToTempFile(JsonTool::encode($data)); + } + + // Keep content stored in Redis for 24 hours, that should be enough to process that data + $uuid = CakeText::uuid(); + $this->RedisConnection->setex(self::DATA_CONTENT_PREFIX . ':' . $uuid, 24 * 3600, $data); + return "redis:$uuid"; + } + + /** + * @param string $path + * @return array Deserialized data + * @throws JsonException + * @throws RedisException + */ + public function fetchDataFile($path) + { + if (strpos($path, 'redis:') === 0) { + $uuid = substr($path, 6); + $data = $this->RedisConnection->get(self::DATA_CONTENT_PREFIX . ':' . $uuid); + if ($data === false) { + throw new Exception("Redis data key with UUID $uuid doesn't exists."); + } + RedisTool::unlink($this->RedisConnection, self::DATA_CONTENT_PREFIX . ':' . $uuid); + return $data; + } else if ($path[0] === '/') { // deprecated storage location when not full path is provided + $path = APP . 'tmp/cache/ingest' . DS . $path; + } + return JsonTool::decode(FileAccessTool::readAndDelete($path)); + } + /** * Enqueue a Job. * @@ -227,6 +269,9 @@ class BackgroundJobsTool $rawJob = $this->RedisConnection->blpop($queue, $timeout); if (!empty($rawJob)) { + if ($rawJob[1] instanceof BackgroundJob) { + return $rawJob[1]; + } return new BackgroundJob($rawJob[1]); } @@ -244,7 +289,9 @@ class BackgroundJobsTool self::JOB_STATUS_PREFIX . ':' . $jobId ); - if ($rawJob) { + if ($rawJob instanceof BackgroundJob) { + return $rawJob; + } else if ($rawJob) { return new BackgroundJob($rawJob); } @@ -586,7 +633,9 @@ class BackgroundJobsTool $redis = new Redis(); $redis->connect($this->settings['redis_host'], $this->settings['redis_port']); - $redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_JSON); + $serializer = $this->settings['redis_serializer'] ?? false; + $serializer = $serializer === 'igbinary' ? Redis::SERIALIZER_IGBINARY : Redis::SERIALIZER_JSON; + $redis->setOption(Redis::OPT_SERIALIZER, $serializer); $redis->setOption(Redis::OPT_PREFIX, $this->settings['redis_namespace'] . ':'); if (isset($this->settings['redis_read_timeout'])) { $redis->setOption(Redis::OPT_READ_TIMEOUT, $this->settings['redis_read_timeout']); diff --git a/app/Model/Event.php b/app/Model/Event.php index dfecaa3ac..a9ad9f02f 100755 --- a/app/Model/Event.php +++ b/app/Model/Event.php @@ -4340,8 +4340,7 @@ class Event extends AppModel $args = ['publish_sightings', $id, $passAlong, $jobId, $user['id']]; if (!empty($sightingUuids)) { - $filePath = FileAccessTool::writeToTempFile(json_encode($sightingUuids)); - $args[] = $filePath; + $args[] = $this->getBackgroundJobsTool()->enqueueDataFile($sightingUuids); } return $this->getBackgroundJobsTool()->enqueue( @@ -6634,8 +6633,7 @@ class Event extends AppModel ); try { - $filePath = FileAccessTool::writeToTempFile(JsonTool::encode($tempData)); - + $filePath = $this->getBackgroundJobsTool()->enqueueDataFile($tempData); $this->getBackgroundJobsTool()->enqueue( BackgroundJobsTool::PRIO_QUEUE, BackgroundJobsTool::CMD_EVENT, @@ -6671,7 +6669,7 @@ class Event extends AppModel ); try { - $filePath = FileAccessTool::writeToTempFile(JsonTool::encode($tempData)); + $filePath = $this->getBackgroundJobsTool()->enqueueDataFile($tempData); $this->getBackgroundJobsTool()->enqueue( BackgroundJobsTool::PRIO_QUEUE, diff --git a/app/Model/Server.php b/app/Model/Server.php index 50a62611f..d20497a80 100644 --- a/app/Model/Server.php +++ b/app/Model/Server.php @@ -5810,6 +5810,10 @@ class Server extends AppModel 'test' => null, 'type' => 'string', 'null' => true, + 'options' => [ + 'JSON' => 'JSON', + 'igbinary' => 'igbinary', + ], 'afterHook' => function () { $keysToDelete = ['taxonomies_cache:*', 'misp:warninglist_cache', 'misp:event_lock:*', 'misp:event_index:*', 'misp:dashboard:*']; RedisTool::deleteKeysByPattern(RedisTool::init(), $keysToDelete); @@ -7401,6 +7405,22 @@ class Server extends AppModel 'test' => null, 'type' => 'string' ], + 'redis_serializer' => [ + 'level' => self::SETTING_OPTIONAL, + 'description' => __('Redis serializer method. WARNING: Changing this setting in production will break your jobs.'), + 'value' => 'JSON', + 'test' => null, + 'type' => 'string', + 'null' => true, + 'options' => [ + 'JSON' => 'JSON', + 'igbinary' => 'igbinary', + ], + 'afterHook' => function () { + $this->getBackgroundJobsTool()->restartWorkers(); + return true; + }, + ], 'max_job_history_ttl' => [ 'level' => self::SETTING_CRITICAL, 'description' => __('The time in seconds the job statuses history will be kept.'),