chg: [correlations] Refactor feed cached correlations

Jakub Onderka 2020-09-28 12:24:35 +02:00
parent c5765392c5
commit 0c5be866fc
1 changed files with 142 additions and 94 deletions

View File

@ -333,126 +333,171 @@ class Feed extends AppModel
* Attach correlations from cached servers or feeds.
* @param array $objects
* @param array $attributes
* @param array $user
* @param array $event
* @param bool $overrideLimit Override hardcoded limit for 10 000 attribute correlations.
* @param string $scope `Feed` or `Server`
* @return array
public function attachFeedCorrelations($objects, $user, &$event, $overrideLimit = false, $scope = 'Feed')
public function attachFeedCorrelations(array $attributes, array $user, array &$event, $overrideLimit = false, $scope = 'Feed')
if (empty($attributes)) {
return $attributes;
try {
$redis = $this->setupRedisWithException();
} catch (Exception $e) {
return $objects;
return $attributes;
$cachePrefix = 'misp:' . strtolower($scope) . '_cache:';
// Redis cache for $scope is empty.
// Skip if redis cache for $scope is empty.
if ($redis->sCard($cachePrefix . 'combined') === 0) {
return $objects;
return $attributes;
if (!isset($this->Attribute)) {
$this->Attribute = ClassRegistry::init('Attribute');
$compositeTypes = $this->Attribute->getCompositeTypes();
$pipe = $redis->multi(Redis::PIPELINE);
$hashTable = array();
$hashTable = [];
$redisResultToAttributePosition = [];
$this->Event = ClassRegistry::init('Event');
$compositeTypes = $this->Event->Attribute->getCompositeTypes();
foreach ($objects as $k => $object) {
if (in_array($object['type'], $compositeTypes)) {
$value = explode('|', $object['value']);
$hashTable[$k] = md5($value[0]);
} else {
$hashTable[$k] = md5($object['value']);
foreach ($attributes as $k => $attribute) {
if (in_array($attribute['type'], $this->Attribute->nonCorrelatingTypes)) {
continue; // attribute type is not correlateable
if (!empty($attribute['disable_correlation'])) {
continue; // attribute correlation is disabled
if (in_array($attribute['type'], $compositeTypes)) {
list($value1, $value2) = explode('|', $attribute['value']);
$parts = [$value1];
if (!in_array($attribute['type'], $this->Attribute->primaryOnlyCorrelatingTypes)) {
$parts[] = $value2;
} else {
$parts = [$attribute['value']];
foreach ($parts as $part) {
$md5 = md5($part);
$hashTable[] = $md5;
$redis->sismember($cachePrefix . 'combined', $md5);
$redisResultToAttributePosition[] = $k;
$redis->sismember($cachePrefix . 'combined', $hashTable[$k]);
if (empty($redisResultToAttributePosition)) {
// No attribute that can be correlated
return $attributes;
$results = $pipe->exec();
if (!$overrideLimit && count($objects) > 10000) {
if (!$overrideLimit && count($attributes) > 10000) {
foreach ($results as $k => $result) {
if ($result && empty($objects[$k]['disable_correlation'])) {
if ($result) {
if (isset($event['FeedCount'])) {
} else {
$event['FeedCount'] = 1;
$objects[$k]['FeedHit'] = true;
$attributes[$redisResultToAttributePosition[$k]]['FeedHit'] = true;
return $attributes;
$hitIds = [];
foreach ($results as $k => $result) {
if ($result) {
$hitIds[] = $k;
if (empty($hitIds)) {
return $attributes; // nothing matches, skip
if ($scope === 'Feed') {
$params = array(
'recursive' => -1,
'fields' => array('id', 'name', 'url', 'provider', 'source_format')
if (!$user['Role']['perm_site_admin']) {
$params['conditions'] = array('Feed.lookup_visible' => 1);
$sources = $this->find('all', $params);
} else {
if ($scope === 'Feed') {
$params = array(
'recursive' => -1,
'fields' => array('id', 'name', 'url', 'provider', 'source_format')
if (!$user['Role']['perm_site_admin']) {
$params['conditions'] = array('Feed.lookup_visible' => 1);
$sources = $this->find('all', $params);
} else {
$params = array(
'recursive' => -1,
'fields' => array('id', 'name', 'url', 'caching_enabled')
if (!$user['Role']['perm_site_admin']) {
$params['conditions'] = array('Server.caching_enabled' => 1);
$this->Server = ClassRegistry::init('Server');
$sources = $this->Server->find('all', $params);
$params = array(
'recursive' => -1,
'fields' => array('id', 'name', 'url', 'caching_enabled')
if (!$user['Role']['perm_site_admin']) {
$params['conditions'] = array('Server.caching_enabled' => 1);
$this->Server = ClassRegistry::init('Server');
$sources = $this->Server->find('all', $params);
$hitIds = array();
foreach ($results as $k => $result) {
if ($result && empty($objects[$k]['disable_correlation'])) {
$hitIds[] = $k;
foreach ($sources as $source) {
$sourceId = $source[$scope]['id'];
$pipe = $redis->multi(Redis::PIPELINE);
foreach ($hitIds as $k) {
$redis->sismember($cachePrefix . $sourceId, $hashTable[$k]);
$sourceHits = $pipe->exec();
$sourceHasHit = false;
foreach ($sourceHits as $k => $hit) {
if ($hit) {
if (!isset($event[$scope][$sourceId])) {
$event[$scope][$sourceId] = $source[$scope];
$attributePosition = $redisResultToAttributePosition[$hitIds[$k]];
$attributes[$attributePosition][$scope][] = $source[$scope];
$sourceHasHit = true;
foreach ($sources as $source) {
$sourceScopeId = $source[$scope]['id'];
// Append also exact MISP feed event UUID
// TODO: This can be optimised in future to do that in one pass
if ($sourceHasHit && $source[$scope]['source_format'] === 'misp') {
$pipe = $redis->multi(Redis::PIPELINE);
foreach ($hitIds as $k) {
$redis->sismember($cachePrefix . $sourceScopeId, $hashTable[$k]);
$sourceHits = $pipe->exec();
foreach ($sourceHits as $k4 => $hit) {
if ($hit) {
if (!isset($event[$scope][$sourceScopeId]['id'])) {
if (!isset($event[$scope][$sourceScopeId])) {
$event[$scope][$sourceScopeId] = array();
$event[$scope][$sourceScopeId] = array_merge($event[$scope][$sourceScopeId], $source[$scope]);
$objects[$hitIds[$k4]][$scope][] = $source[$scope];
$eventUuidHitPosition = [];
foreach ($hitIds as $sourceHitPos => $k) {
if ($sourceHits[$sourceHitPos]) {
$redis->smembers($cachePrefix . 'event_uuid_lookup:' . $hashTable[$k]);
$eventUuidHitPosition[] = $redisResultToAttributePosition[$k];
if ($scope === 'Server' || $source[$scope]['source_format'] == 'misp') {
$pipe = $redis->multi(Redis::PIPELINE);
$eventUuidHitPosition = array();
foreach ($objects as $k => $object) {
if (isset($object[$scope])) {
foreach ($object[$scope] as $currentFeed) {
if ($source[$scope]['id'] == $currentFeed['id']) {
$eventUuidHitPosition[] = $k;
$redis->smembers($cachePrefix . 'event_uuid_lookup:' . $hashTable[$k]);
$mispFeedHits = $pipe->exec();
foreach ($mispFeedHits as $sourceHitPos => $feedUuidMatches) {
if (empty($feedUuidMatches)) {
$mispFeedHits = $pipe->exec();
foreach ($mispFeedHits as $sourcehitPos => $f) {
foreach ($f as $url) {
list($feedId, $eventUuid) = explode('/', $url);
if (empty($event[$scope][$feedId]['event_uuids']) || !in_array($eventUuid, $event[$scope][$feedId]['event_uuids'])) {
$event[$scope][$feedId]['event_uuids'][] = $eventUuid;
foreach ($objects[$eventUuidHitPosition[$sourcehitPos]][$scope] as $tempKey => $tempFeed) {
if ($tempFeed['id'] == $feedId) {
$objects[$eventUuidHitPosition[$sourcehitPos]][$scope][$tempKey]['event_uuids'][] = $eventUuid;
foreach ($feedUuidMatches as $url) {
list($feedId, $eventUuid) = explode('/', $url);
if ($feedId != $sourceId) {
continue; // just process current source, skip others
if (empty($event[$scope][$feedId]['event_uuids']) || !in_array($eventUuid, $event[$scope][$feedId]['event_uuids'])) {
$event[$scope][$feedId]['event_uuids'][] = $eventUuid;
$attributePosition = $eventUuidHitPosition[$sourceHitPos];
foreach ($attributes[$attributePosition][$scope] as $tempKey => $tempFeed) {
if ($tempFeed['id'] == $feedId) {
$attributes[$attributePosition][$scope][$tempKey]['event_uuids'][] = $eventUuid;
@ -464,7 +509,7 @@ class Feed extends AppModel
$event[$scope] = array_values($event[$scope]);
return $objects;
return $attributes;
public function downloadFromFeed($actions, $feed, HttpSocket $HttpSocket = null, $user, $jobId = false)
@ -1182,16 +1227,18 @@ class Feed extends AppModel
if (!in_array($attribute['type'], $this->Attribute->nonCorrelatingTypes)) {
if (in_array($attribute['type'], $this->Attribute->getCompositeTypes())) {
$value = explode('|', $attribute['value']);
$redis->sAdd('misp:feed_cache:' . $feedId, md5($value[0]));
$redis->sAdd('misp:feed_cache:' . $feedId, md5($value[1]));
$redis->sAdd('misp:feed_cache:combined', md5($value[0]));
$redis->sAdd('misp:feed_cache:combined', md5($value[1]));
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . md5($value[0]), $feedId . '/' . $event['Event']['uuid']);
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . md5($value[1]), $feedId . '/' . $event['Event']['uuid']);
if (in_array($attribute['type'], $this->Attribute->primaryOnlyCorrelatingTypes)) {
} else {
$redis->sAdd('misp:feed_cache:' . $feedId, md5($attribute['value']));
$redis->sAdd('misp:feed_cache:combined', md5($attribute['value']));
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . md5($attribute['value']), $feedId . '/' . $event['Event']['uuid']);
$value = [$attribute['value']];
foreach ($value as $v) {
$md5 = md5($v);
$redis->sAdd('misp:feed_cache:' . $feedId, $md5);
$redis->sAdd('misp:feed_cache:combined', $md5);
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . $md5, $feedId . '/' . $event['Event']['uuid']);
@ -1219,9 +1266,10 @@ class Feed extends AppModel
$pipe = $redis->multi(Redis::PIPELINE);
foreach ($cache as $v) {
$redis->sAdd('misp:feed_cache:' . $feedId, $v[0]);
$redis->sAdd('misp:feed_cache:combined', $v[0]);
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . $v[0], $feedId . '/' . $v[1]);
list($hash, $eventUuid) = $v;
$redis->sAdd('misp:feed_cache:' . $feedId, $hash);
$redis->sAdd('misp:feed_cache:combined', $hash);
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . $hash, "$feedId/$eventUuid");
$this->jobProgress($jobId, "Feed $feedId: cached via quick cache.");