chg: [internal] Simplify server caching

pull/7949/head
Jakub Onderka 2021-11-10 18:15:34 +01:00
parent c6aec0046b
commit 6ef90bbc92
2 changed files with 60 additions and 47 deletions

View File

@ -85,6 +85,17 @@ class ServerSyncTool
return $this->get($url);
}
/**
* @param array $rules
* @return HttpSocketResponseExtended
* @throws HttpSocketHttpException
* @throws HttpSocketJsonException
*/
public function attributeSearch(array $rules)
{
return $this->post('/attributes/restSearch.json', $rules);
}
/**
* @param array $params
* @return HttpSocketResponseExtended

View File

@ -4152,14 +4152,14 @@ class Server extends AppModel
public function cacheServerInitiator($user, $id = 'all', $jobId = false)
{
$params = array(
'conditions' => array('caching_enabled' => 1),
'recursive' => -1
);
$redis = $this->setupRedis();
if ($redis === false) {
return 'Redis not reachable.';
}
$params = array(
'conditions' => array('caching_enabled' => 1),
'recursive' => -1
);
if ($id !== 'all') {
$params['conditions']['Server.id'] = $id;
} else {
@ -4184,65 +4184,67 @@ class Server extends AppModel
return true;
}
/**
* @param array $server
* @param Redis $redis
* @param int|false $jobId
* @return bool
* @throws JsonException
*/
private function __cacheInstance($server, $redis, $jobId = false)
{
$continue = true;
$serverId = $server['Server']['id'];
$i = 0;
$chunk_size = 50000;
if ($jobId) {
$job = ClassRegistry::init('Job');
$job->id = $jobId;
}
$redis->del('misp:server_cache:' . $server['Server']['id']);
$HttpSocket = null;
$HttpSocket = $this->setupHttpSocket($server, $HttpSocket);
while ($continue) {
$redis->del('misp:server_cache:' . $serverId);
$serverSync = new ServerSyncTool($server, $this->setupSyncRequest($server));
while (true) {
$i++;
$chunk_size = 50000;
$data = $this->__getCachedAttributes($server, $HttpSocket, $chunk_size, $i);
if (empty(trim($data))) {
$continue = false;
} else {
$pipe = $redis->multi(Redis::PIPELINE);
$data = explode(PHP_EOL, trim($data));
foreach ($data as $entry) {
list($value, $uuid) = explode(',', $entry);
if (!Validation::uuid($uuid)) {
$continue = false;
break;
}
if (!empty($value)) {
$redis->sAdd('misp:server_cache:' . $server['Server']['id'], $value);
$redis->sAdd('misp:server_cache:combined', $value);
$redis->sAdd('misp:server_cache:event_uuid_lookup:' . $value, $server['Server']['id'] . '/' . $uuid);
}
$rules = [
'returnFormat' => 'cache',
'includeEventUuid' => 1,
'page' => $i,
'limit' => $chunk_size,
];
try {
$data = $serverSync->attributeSearch($rules)->body();
} catch (Exception $e) {
$this->logException("Could not fetch cached attribute from server {$serverSync->serverId()}.", $e);
break;
}
$data = trim($data);
if (empty($data)) {
break;
}
$data = explode(PHP_EOL, $data);
$pipe = $redis->pipeline();
foreach ($data as $entry) {
list($value, $uuid) = explode(',', $entry);
if (!Validation::uuid($uuid)) {
break 2;
}
$pipe->exec();
if ($jobId) {
$job->saveField('message', 'Server ' . $server['Server']['id'] . ': ' . ((($i -1) * $chunk_size) + count($data)) . ' attributes cached.');
if (!empty($value)) {
$redis->sAdd('misp:server_cache:' . $serverId, $value);
$redis->sAdd('misp:server_cache:combined', $value);
$redis->sAdd('misp:server_cache:event_uuid_lookup:' . $value, $serverId . '/' . $uuid);
}
}
$pipe->exec();
if ($jobId) {
$job->saveProgress($jobId, 'Server ' . $server['Server']['id'] . ': ' . ((($i -1) * $chunk_size) + count($data)) . ' attributes cached.');
}
}
$redis->set('misp:server_cache_timestamp:' . $server['Server']['id'], time());
return true;
}
private function __getCachedAttributes($server, $HttpSocket, $chunk_size, $i)
{
$filter_rules = array(
'returnFormat' => 'cache',
'includeEventUuid' => 1,
'page' => $i,
'limit' => $chunk_size
);
$request = $this->setupSyncRequest($server);
try {
$response = $HttpSocket->post($server['Server']['url'] . '/attributes/restSearch.json', json_encode($filter_rules), $request);
} catch (SocketException $e) {
return $e->getMessage();
}
return $response->body;
}
/**
* @param array $servers
* @return array