mirror of https://github.com/MISP/MISP
Merge branch 'develop' of github.com:MISP/MISP into develop
commit
7d7a45c822
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
|
||||
App::uses('AppModel', 'Model');
|
||||
App::uses('BackgroundJobsTool', 'Tools');
|
||||
|
||||
/**
|
||||
* Application Shell
|
||||
|
@ -26,10 +27,13 @@ App::uses('AppModel', 'Model');
|
|||
*
|
||||
* @package app.Console.Command
|
||||
*/
|
||||
class AppShell extends Shell
|
||||
abstract class AppShell extends Shell
|
||||
{
|
||||
public $tasks = array('ConfigLoad');
|
||||
|
||||
/** @var BackgroundJobsTool */
|
||||
private $BackgroundJobsTool;
|
||||
|
||||
public function initialize()
|
||||
{
|
||||
$this->ConfigLoad = $this->Tasks->load('ConfigLoad');
|
||||
|
@ -77,4 +81,16 @@ class AppShell extends Shell
|
|||
$this->error("Invalid state value `$value`, it must be `true`, `false`, `1`, or `0`.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return BackgroundJobsTool
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function getBackgroundJobsTool()
|
||||
{
|
||||
if (!isset($this->BackgroundJobsTool)) {
|
||||
$this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs'));
|
||||
}
|
||||
return $this->BackgroundJobsTool;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -432,8 +432,7 @@ class EventShell extends AppShell
|
|||
|
||||
$sightingsUuidsToPush = [];
|
||||
if (isset($this->args[4])) { // push just specific sightings
|
||||
$path = $this->args[4][0] === '/' ? $this->args[4] : (APP . 'tmp/cache/ingest' . DS . $this->args[4]);
|
||||
$sightingsUuidsToPush = $this->Event->jsonDecode(FileAccessTool::readAndDelete($path));
|
||||
$sightingsUuidsToPush = $this->getBackgroundJobsTool()->fetchDataFile($this->args[4]);
|
||||
}
|
||||
|
||||
$this->Event->Behaviors->unload('SysLogLogable.SysLogLogable');
|
||||
|
@ -537,9 +536,7 @@ class EventShell extends AppShell
|
|||
}
|
||||
|
||||
$inputFile = $this->args[0];
|
||||
$inputFile = $inputFile[0] === '/' ? $inputFile : APP . 'tmp/cache/ingest' . DS . $inputFile;
|
||||
$inputData = FileAccessTool::readAndDelete($inputFile);
|
||||
$inputData = $this->Event->jsonDecode($inputData);
|
||||
$inputData = $this->getBackgroundJobsTool()->fetchDataFile($inputFile);
|
||||
Configure::write('CurrentUserId', $inputData['user']['id']);
|
||||
$this->Event->processFreeTextData(
|
||||
$inputData['user'],
|
||||
|
@ -560,9 +557,7 @@ class EventShell extends AppShell
|
|||
}
|
||||
|
||||
$inputFile = $this->args[0];
|
||||
$inputFile = $inputFile[0] === '/' ? $inputFile : APP . 'tmp/cache/ingest' . DS . $inputFile;
|
||||
$inputData = FileAccessTool::readAndDelete($inputFile);
|
||||
$inputData = $this->Event->jsonDecode($inputData);
|
||||
$inputData = $this->getBackgroundJobsTool()->fetchDataFile($inputFile);
|
||||
Configure::write('CurrentUserId', $inputData['user']['id']);
|
||||
$this->Event->processModuleResultsData(
|
||||
$inputData['user'],
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
<?php
|
||||
App::uses('Folder', 'Utility');
|
||||
App::uses('File', 'Utility');
|
||||
App::uses('BackgroundJobsTool', 'Tools');
|
||||
require_once 'AppShell.php';
|
||||
|
||||
/**
|
||||
|
@ -655,15 +654,4 @@ class ServerShell extends AppShell
|
|||
}
|
||||
return $server;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return BackgroundJobsTool
|
||||
*/
|
||||
private function getBackgroundJobsTool()
|
||||
{
|
||||
if (!isset($this->BackgroundJobsTool)) {
|
||||
$this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs'));
|
||||
}
|
||||
return $this->BackgroundJobsTool;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,14 +2,10 @@
|
|||
|
||||
declare(strict_types=1);
|
||||
|
||||
App::uses('BackgroundJobsTool', 'Tools');
|
||||
App::uses('ProcessTool', 'Tools');
|
||||
|
||||
class StartWorkerShell extends AppShell
|
||||
{
|
||||
/** @var BackgroundJobsTool */
|
||||
private $BackgroundJobsTool;
|
||||
|
||||
/** @var Worker */
|
||||
private $worker;
|
||||
|
||||
|
@ -18,19 +14,13 @@ class StartWorkerShell extends AppShell
|
|||
|
||||
const DEFAULT_MAX_EXECUTION_TIME = 86400; // 1 day
|
||||
|
||||
public function initialize()
|
||||
{
|
||||
parent::initialize();
|
||||
$this->BackgroundJobsTool = new BackgroundJobsTool(Configure::read('SimpleBackgroundJobs'));
|
||||
}
|
||||
|
||||
public function getOptionParser(): ConsoleOptionParser
|
||||
{
|
||||
$parser = parent::getOptionParser();
|
||||
$parser
|
||||
->addArgument('queue', [
|
||||
'help' => 'Name of the queue to process.',
|
||||
'choices' => $this->BackgroundJobsTool->getQueues(),
|
||||
'choices' => $this->getBackgroundJobsTool()->getQueues(),
|
||||
'required' => true
|
||||
])
|
||||
->addOption(
|
||||
|
@ -62,7 +52,7 @@ class StartWorkerShell extends AppShell
|
|||
while (true) {
|
||||
$this->checkMaxExecutionTime();
|
||||
|
||||
$job = $this->BackgroundJobsTool->dequeue($this->worker->queue());
|
||||
$job = $this->getBackgroundJobsTool()->dequeue($this->worker->queue());
|
||||
if ($job) {
|
||||
$this->runJob($job);
|
||||
}
|
||||
|
@ -81,7 +71,7 @@ class StartWorkerShell extends AppShell
|
|||
|
||||
$command = implode(' ', array_merge([$job->command()], $job->args()));
|
||||
CakeLog::info("[JOB ID: {$job->id()}] - started command `$command`.");
|
||||
$this->BackgroundJobsTool->update($job);
|
||||
$this->getBackgroundJobsTool()->update($job);
|
||||
|
||||
$job->run();
|
||||
|
||||
|
@ -94,7 +84,7 @@ class StartWorkerShell extends AppShell
|
|||
CakeLog::error("[WORKER PID: {$this->worker->pid()}][{$this->worker->queue()}] - job ID: {$job->id()} failed with exception: {$exception->getMessage()}");
|
||||
$job->setStatus(BackgroundJob::STATUS_FAILED);
|
||||
}
|
||||
$this->BackgroundJobsTool->update($job);
|
||||
$this->getBackgroundJobsTool()->update($job);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -148,6 +148,11 @@ class BackgroundJob implements JsonSerializable
|
|||
];
|
||||
}
|
||||
|
||||
public function __sleep(): array
|
||||
{
|
||||
return ['id', 'command', 'args', 'createdAt', 'updatedAt', 'status', 'output', 'error', 'metadata'];
|
||||
}
|
||||
|
||||
public function id(): string
|
||||
{
|
||||
return $this->id;
|
||||
|
|
|
@ -90,7 +90,8 @@ class BackgroundJobsTool
|
|||
self::CMD_WORKFLOW => 'WorkflowShell',
|
||||
];
|
||||
|
||||
const JOB_STATUS_PREFIX = 'job_status';
|
||||
const JOB_STATUS_PREFIX = 'job_status',
|
||||
DATA_CONTENT_PREFIX = 'data_content';
|
||||
|
||||
/** @var array */
|
||||
private $settings;
|
||||
|
@ -125,6 +126,47 @@ class BackgroundJobsTool
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $data
|
||||
* @return string Full path to data file or `redis:UUID` when data are stored in Redis
|
||||
* @throws JsonException
|
||||
* @throws RedisException
|
||||
*/
|
||||
public function enqueueDataFile(array $data)
|
||||
{
|
||||
if (!$this->settings['enabled']) {
|
||||
// Store serialized data to tmp file when SimpleBackgroundJobs are not enabled
|
||||
return FileAccessTool::writeToTempFile(JsonTool::encode($data));
|
||||
}
|
||||
|
||||
// Keep content stored in Redis for 24 hours, that should be enough to process that data
|
||||
$uuid = CakeText::uuid();
|
||||
$this->RedisConnection->setex(self::DATA_CONTENT_PREFIX . ':' . $uuid, 24 * 3600, $data);
|
||||
return "redis:$uuid";
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $path
|
||||
* @return array Deserialized data
|
||||
* @throws JsonException
|
||||
* @throws RedisException
|
||||
*/
|
||||
public function fetchDataFile($path)
|
||||
{
|
||||
if (strpos($path, 'redis:') === 0) {
|
||||
$uuid = substr($path, 6);
|
||||
$data = $this->RedisConnection->get(self::DATA_CONTENT_PREFIX . ':' . $uuid);
|
||||
if ($data === false) {
|
||||
throw new Exception("Redis data key with UUID $uuid doesn't exists.");
|
||||
}
|
||||
RedisTool::unlink($this->RedisConnection, self::DATA_CONTENT_PREFIX . ':' . $uuid);
|
||||
return $data;
|
||||
} else if ($path[0] === '/') { // deprecated storage location when not full path is provided
|
||||
$path = APP . 'tmp/cache/ingest' . DS . $path;
|
||||
}
|
||||
return JsonTool::decode(FileAccessTool::readAndDelete($path));
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue a Job.
|
||||
*
|
||||
|
@ -227,6 +269,9 @@ class BackgroundJobsTool
|
|||
$rawJob = $this->RedisConnection->blpop($queue, $timeout);
|
||||
|
||||
if (!empty($rawJob)) {
|
||||
if ($rawJob[1] instanceof BackgroundJob) {
|
||||
return $rawJob[1];
|
||||
}
|
||||
return new BackgroundJob($rawJob[1]);
|
||||
}
|
||||
|
||||
|
@ -244,7 +289,9 @@ class BackgroundJobsTool
|
|||
self::JOB_STATUS_PREFIX . ':' . $jobId
|
||||
);
|
||||
|
||||
if ($rawJob) {
|
||||
if ($rawJob instanceof BackgroundJob) {
|
||||
return $rawJob;
|
||||
} else if ($rawJob) {
|
||||
return new BackgroundJob($rawJob);
|
||||
}
|
||||
|
||||
|
@ -586,7 +633,9 @@ class BackgroundJobsTool
|
|||
|
||||
$redis = new Redis();
|
||||
$redis->connect($this->settings['redis_host'], $this->settings['redis_port']);
|
||||
$redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_JSON);
|
||||
$serializer = $this->settings['redis_serializer'] ?? false;
|
||||
$serializer = $serializer === 'igbinary' ? Redis::SERIALIZER_IGBINARY : Redis::SERIALIZER_JSON;
|
||||
$redis->setOption(Redis::OPT_SERIALIZER, $serializer);
|
||||
$redis->setOption(Redis::OPT_PREFIX, $this->settings['redis_namespace'] . ':');
|
||||
if (isset($this->settings['redis_read_timeout'])) {
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, $this->settings['redis_read_timeout']);
|
||||
|
|
|
@ -4340,8 +4340,7 @@ class Event extends AppModel
|
|||
|
||||
$args = ['publish_sightings', $id, $passAlong, $jobId, $user['id']];
|
||||
if (!empty($sightingUuids)) {
|
||||
$filePath = FileAccessTool::writeToTempFile(json_encode($sightingUuids));
|
||||
$args[] = $filePath;
|
||||
$args[] = $this->getBackgroundJobsTool()->enqueueDataFile($sightingUuids);
|
||||
}
|
||||
|
||||
return $this->getBackgroundJobsTool()->enqueue(
|
||||
|
@ -6634,8 +6633,7 @@ class Event extends AppModel
|
|||
);
|
||||
|
||||
try {
|
||||
$filePath = FileAccessTool::writeToTempFile(JsonTool::encode($tempData));
|
||||
|
||||
$filePath = $this->getBackgroundJobsTool()->enqueueDataFile($tempData);
|
||||
$this->getBackgroundJobsTool()->enqueue(
|
||||
BackgroundJobsTool::PRIO_QUEUE,
|
||||
BackgroundJobsTool::CMD_EVENT,
|
||||
|
@ -6671,7 +6669,7 @@ class Event extends AppModel
|
|||
);
|
||||
|
||||
try {
|
||||
$filePath = FileAccessTool::writeToTempFile(JsonTool::encode($tempData));
|
||||
$filePath = $this->getBackgroundJobsTool()->enqueueDataFile($tempData);
|
||||
|
||||
$this->getBackgroundJobsTool()->enqueue(
|
||||
BackgroundJobsTool::PRIO_QUEUE,
|
||||
|
|
|
@ -5810,6 +5810,10 @@ class Server extends AppModel
|
|||
'test' => null,
|
||||
'type' => 'string',
|
||||
'null' => true,
|
||||
'options' => [
|
||||
'JSON' => 'JSON',
|
||||
'igbinary' => 'igbinary',
|
||||
],
|
||||
'afterHook' => function () {
|
||||
$keysToDelete = ['taxonomies_cache:*', 'misp:warninglist_cache', 'misp:event_lock:*', 'misp:event_index:*', 'misp:dashboard:*'];
|
||||
RedisTool::deleteKeysByPattern(RedisTool::init(), $keysToDelete);
|
||||
|
@ -7401,6 +7405,22 @@ class Server extends AppModel
|
|||
'test' => null,
|
||||
'type' => 'string'
|
||||
],
|
||||
'redis_serializer' => [
|
||||
'level' => self::SETTING_OPTIONAL,
|
||||
'description' => __('Redis serializer method. WARNING: Changing this setting in production will break your jobs.'),
|
||||
'value' => 'JSON',
|
||||
'test' => null,
|
||||
'type' => 'string',
|
||||
'null' => true,
|
||||
'options' => [
|
||||
'JSON' => 'JSON',
|
||||
'igbinary' => 'igbinary',
|
||||
],
|
||||
'afterHook' => function () {
|
||||
$this->getBackgroundJobsTool()->restartWorkers();
|
||||
return true;
|
||||
},
|
||||
],
|
||||
'max_job_history_ttl' => [
|
||||
'level' => self::SETTING_CRITICAL,
|
||||
'description' => __('The time in seconds the job statuses history will be kept.'),
|
||||
|
|
Loading…
Reference in New Issue