chg: [internal] Simplified feed caching

pull/7949/head
Jakub Onderka 2021-11-10 18:44:58 +01:00
parent 4dfa8e1a4a
commit 256ec0b0f3
1 changed files with 41 additions and 26 deletions

View File

@ -190,7 +190,7 @@ class Feed extends AppModel
/**
* @param array $feed
* @param HttpSocket|null $HttpSocket Null can be for local feed
* @return Generator
* @return Generator<string>
* @throws Exception
*/
public function getCache(array $feed, HttpSocket $HttpSocket = null)
@ -478,7 +478,7 @@ class Feed extends AppModel
}
$compositeTypes = $this->Attribute->getCompositeTypes();
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
$hashTable = [];
$redisResultToAttributePosition = [];
@ -550,7 +550,7 @@ class Feed extends AppModel
foreach ($sources as $source) {
$sourceId = $source[$scope]['id'];
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
foreach ($hitIds as $k) {
$redis->sismember($cachePrefix . $sourceId, $hashTable[$k]);
}
@ -574,7 +574,7 @@ class Feed extends AppModel
// Append also exact MISP feed or server event UUID
// TODO: This can be optimised in future to do that in one pass
if ($sourceHasHit && ($scope === 'Server' || $source[$scope]['source_format'] === 'misp')) {
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
$eventUuidHitPosition = [];
foreach ($hitIds as $sourceHitPos => $k) {
if ($sourceHits[$sourceHitPos]) {
@ -648,7 +648,7 @@ class Feed extends AppModel
try {
$redis = $this->setupRedisWithException();
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
$cachePrefix = 'misp:' . strtolower($scope) . '_cache:';
foreach ($sources as $source) {
$pipe->exists($cachePrefix . $source[$scope]['id']);
@ -1332,7 +1332,7 @@ class Feed extends AppModel
return $feeds;
}
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
foreach ($feeds as $feed) {
$pipe->get('misp:feed_cache_timestamp:' . $feed['Feed']['id']);
}
@ -1343,14 +1343,28 @@ class Feed extends AppModel
return $feeds;
}
/**
* @param array $feed
* @param Redis $redis
* @param int|false $jobId
* @return bool
*/
private function __cacheFeed($feed, $redis, $jobId = false)
{
$HttpSocket = $this->isFeedLocal($feed) ? null : $this->__setupHttpSocket();
if ($feed['Feed']['source_format'] === 'misp') {
return $this->__cacheMISPFeed($feed, $redis, $HttpSocket, $jobId);
$result = true;
if (!$this->__cacheMISPFeedCache($feed, $redis, $HttpSocket, $jobId)) {
$result = $this->__cacheMISPFeedTraditional($feed, $redis, $HttpSocket, $jobId);
}
} else {
return $this->__cacheFreetextFeed($feed, $redis, $HttpSocket, $jobId);
$result = $this->__cacheFreetextFeed($feed, $redis, $HttpSocket, $jobId);
}
if ($result) {
$redis->set('misp:feed_cache_timestamp:' . $feed['Feed']['id'], time());
}
return $result;
}
/**
@ -1379,7 +1393,7 @@ class Feed extends AppModel
$redis->del('misp:feed_cache:' . $feedId);
foreach (array_chunk($md5Values, 5000) as $k => $chunk) {
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
if (method_exists($redis, 'sAddArray')) {
$redis->sAddArray('misp:feed_cache:' . $feedId, $chunk);
$redis->sAddArray('misp:feed_cache:combined', $chunk);
@ -1392,10 +1406,16 @@ class Feed extends AppModel
$pipe->exec();
$this->jobProgress($jobId, __('Feed %s: %s/%s values cached.', $feedId, $k * 5000, count($md5Values)));
}
$redis->set('misp:feed_cache_timestamp:' . $feedId, time());
return true;
}
/**
* @param array $feed
* @param Redis $redis
* @param HttpSocket|null $HttpSocket
* @param false $jobId
* @return bool
*/
private function __cacheMISPFeedTraditional($feed, $redis, HttpSocket $HttpSocket = null, $jobId = false)
{
$feedId = $feed['Feed']['id'];
@ -1409,6 +1429,7 @@ class Feed extends AppModel
$redis->del('misp:feed_cache:' . $feedId);
$k = 0;
$this->Attribute = ClassRegistry::init('Attribute');
foreach ($manifest as $uuid => $event) {
try {
$event = $this->downloadAndParseEventFromFeed($feed, $uuid, $HttpSocket);
@ -1418,11 +1439,10 @@ class Feed extends AppModel
}
if (!empty($event['Event']['Attribute'])) {
$this->Attribute = ClassRegistry::init('Attribute');
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
foreach ($event['Event']['Attribute'] as $attribute) {
if (!in_array($attribute['type'], Attribute::NON_CORRELATING_TYPES, true)) {
if (in_array($attribute['type'], $this->Attribute->getCompositeTypes())) {
if (in_array($attribute['type'], $this->Attribute->getCompositeTypes(), true)) {
$value = explode('|', $attribute['value']);
if (in_array($attribute['type'], Attribute::PRIMARY_ONLY_CORRELATING_TYPES, true)) {
unset($value[1]);
@ -1450,6 +1470,13 @@ class Feed extends AppModel
return true;
}
/**
* @param array $feed
* @param Redis $redis
* @param HttpSocket|null $HttpSocket
* @param false $jobId
* @return bool
*/
private function __cacheMISPFeedCache($feed, $redis, HttpSocket $HttpSocket = null, $jobId = false)
{
$feedId = $feed['Feed']['id'];
@ -1461,7 +1488,7 @@ class Feed extends AppModel
return false;
}
$pipe = $redis->multi(Redis::PIPELINE);
$pipe = $redis->pipeline();
$redis->del('misp:feed_cache:' . $feedId);
foreach ($cache as $v) {
list($hash, $eventUuid) = $v;
@ -1474,18 +1501,6 @@ class Feed extends AppModel
return true;
}
private function __cacheMISPFeed($feed, $redis, HttpSocket $HttpSocket = null, $jobId = false)
{
$result = true;
if (!$this->__cacheMISPFeedCache($feed, $redis, $HttpSocket, $jobId)) {
$result = $this->__cacheMISPFeedTraditional($feed, $redis, $HttpSocket, $jobId);
}
if ($result) {
$redis->set('misp:feed_cache_timestamp:' . $feed['Feed']['id'], time());
}
return $result;
}
public function compareFeeds($id = false)
{
$redis = $this->setupRedis();