mirror of https://github.com/MISP/MISP
chg: [feed] New method Feed::jobProgress
parent
77fe7e10fc
commit
35d67c261e
|
@ -408,11 +408,6 @@ class Feed extends AppModel
|
|||
|
||||
public function downloadFromFeed($actions, $feed, $HttpSocket, $user, $jobId = false)
|
||||
{
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->read(null, $jobId);
|
||||
$email = "Scheduled job";
|
||||
}
|
||||
$total = count($actions['add']) + count($actions['edit']);
|
||||
$currentItem = 0;
|
||||
$this->Event = ClassRegistry::init('Event');
|
||||
|
@ -437,17 +432,14 @@ class Feed extends AppModel
|
|||
} else {
|
||||
$results['add']['fail'] = array('uuid' => $uuid, 'reason' => $result);
|
||||
}
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('progress', 100 * (($currentItem + 1) / $total));
|
||||
}
|
||||
$this->jobProgress($jobId, null, 100 * (($currentItem + 1) / $total));
|
||||
$currentItem++;
|
||||
}
|
||||
|
||||
foreach ($actions['edit'] as $editTarget) {
|
||||
$uuid = $editTarget['uuid'];
|
||||
try {
|
||||
$result = $this->__updateEventFromFeed($HttpSocket, $feed, $editTarget['uuid'], $editTarget['id'], $user, $filterRules);
|
||||
$result = $this->__updateEventFromFeed($HttpSocket, $feed, $uuid, $editTarget['id'], $user, $filterRules);
|
||||
} catch (Exception $e) {
|
||||
CakeLog::error($e->getMessage()); // TODO: Better exception logging
|
||||
$results['edit']['fail'] = array('uuid' => $uuid, 'reason' => $e->getMessage());
|
||||
|
@ -464,9 +456,8 @@ class Feed extends AppModel
|
|||
} else {
|
||||
$results['edit']['fail'] = array('uuid' => $uuid, 'reason' => $result);
|
||||
}
|
||||
if ($jobId && $currentItem % 10 == 0) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('progress', 100 * (($currentItem + 1) / $total));
|
||||
if ($currentItem % 10 == 0) {
|
||||
$this->jobProgress($jobId, null, 100 * (($currentItem + 1) / $total));
|
||||
}
|
||||
$currentItem++;
|
||||
}
|
||||
|
@ -784,7 +775,6 @@ class Feed extends AppModel
|
|||
public function downloadFromFeedInitiator($feedId, $user, $jobId = false)
|
||||
{
|
||||
$this->id = $feedId;
|
||||
$job = ClassRegistry::init('Job');
|
||||
$this->read();
|
||||
if (isset($this->data['Feed']['settings']) && !empty($this->data['Feed']['settings'])) {
|
||||
$this->data['Feed']['settings'] = json_decode($this->data['Feed']['settings'], true);
|
||||
|
@ -792,52 +782,33 @@ class Feed extends AppModel
|
|||
|
||||
$HttpSocket = $this->isFeedLocal($this->data) ? false : $this->__setupHttpSocket($this->data);
|
||||
if ($this->data['Feed']['source_format'] == 'misp') {
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Fetching event manifest.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Fetching event manifest.');
|
||||
try {
|
||||
$actions = $this->getNewEventUuids($this->data, $HttpSocket);
|
||||
} catch (Exception $e) {
|
||||
CakeLog::error($e->getMessage()); // TODO: Better exception logging
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Could not fetch event manifest. See log for more details.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Could not fetch event manifest. See log for more details.');
|
||||
return false;
|
||||
}
|
||||
|
||||
if (empty($actions['add']) && empty($actions['edit'])) {
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Job complete.');
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Fetching events.');
|
||||
}
|
||||
|
||||
$total = count($actions['add']) + count($actions['edit']);
|
||||
$this->jobProgress($jobId, "Fetching $total events.");
|
||||
$result = $this->downloadFromFeed($actions, $this->data, $HttpSocket, $user, $jobId);
|
||||
$this->__cleanupFile($this->data, '/manifest.json');
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Job complete.');
|
||||
}
|
||||
} else {
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Fetching data.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Fetching data.');
|
||||
try {
|
||||
$temp = $this->getFreetextFeed($this->data, $HttpSocket, $this->data['Feed']['source_format'], 'all');
|
||||
} catch (Exception $e) {
|
||||
CakeLog::error($e->getMessage()); // TODO: Better exception logging
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Could not fetch freetext feed. See log for more details.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Could not fetch freetext feed. See log for more details.');
|
||||
return false;
|
||||
}
|
||||
|
||||
$data = array();
|
||||
foreach ($temp as $key => $value) {
|
||||
$data[] = array(
|
||||
|
@ -847,23 +818,16 @@ class Feed extends AppModel
|
|||
'to_ids' => $value['to_ids']
|
||||
);
|
||||
}
|
||||
if ($jobId) {
|
||||
$job->saveField('progress', 50);
|
||||
$job->saveField('message', 'Saving data.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Saving data.', 50);
|
||||
if (empty($data)) {
|
||||
return true;
|
||||
}
|
||||
$result = $this->saveFreetextFeedData($this->data, $data, $user);
|
||||
$message = 'Job complete.';
|
||||
if ($result !== true) {
|
||||
CakeLog::error($result);
|
||||
return false;
|
||||
}
|
||||
$this->__cleanupFile($this->data, '');
|
||||
if ($jobId) {
|
||||
$job->saveField('progress', '100');
|
||||
$job->saveField('message', 'Job complete.');
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
|
@ -972,15 +936,11 @@ class Feed extends AppModel
|
|||
$uniqueValues[] = $data[$key]['value'];
|
||||
}
|
||||
$data = array_values($data);
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->id = $jobId;
|
||||
}
|
||||
foreach ($data as $k => $chunk) {
|
||||
$this->Event->Attribute->create();
|
||||
$this->Event->Attribute->save($chunk);
|
||||
if ($jobId && $k % 100 == 0) {
|
||||
$job->saveField('progress', 50 + round((50 * ((($k + 1) * 100) / count($data)))));
|
||||
if ($k % 100 == 0) {
|
||||
$this->jobProgress($jobId, null, 50 + round((50 * ((($k + 1) * 100) / count($data)))));
|
||||
}
|
||||
}
|
||||
if (!empty($data)) {
|
||||
|
@ -1022,19 +982,14 @@ class Feed extends AppModel
|
|||
$redis->del('misp:feed_cache:event_uuid_lookup:');
|
||||
}
|
||||
$feeds = $this->find('all', $params);
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->id = $jobId;
|
||||
if (!$job->exists()) {
|
||||
$jobId = false;
|
||||
}
|
||||
}
|
||||
foreach ($feeds as $k => $feed) {
|
||||
$this->__cacheFeed($feed, $redis, $jobId);
|
||||
if ($jobId) {
|
||||
$job->saveField('progress', 100 * $k / count($feeds));
|
||||
$job->saveField('message', 'Feed ' . $feed['Feed']['id'] . ' cached.');
|
||||
if ($this->__cacheFeed($feed, $redis, $jobId)) {
|
||||
$message = 'Feed ' . $feed['Feed']['id'] . ' cached.';
|
||||
} else {
|
||||
$message = 'Failed to cache feed ' . $feed['Feed']['id'] . '. See logs for more details.';
|
||||
}
|
||||
|
||||
$this->jobProgress($jobId, $message, 100 * $k / count($feeds));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -1063,29 +1018,19 @@ class Feed extends AppModel
|
|||
|
||||
private function __cacheFreetextFeed($feed, $redis, $HttpSocket, $jobId = false)
|
||||
{
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->id = $jobId;
|
||||
if (!$job->exists()) {
|
||||
$jobId = false;
|
||||
}
|
||||
}
|
||||
try {
|
||||
$values = $this->getFreetextFeed($feed, $HttpSocket, $feed['Feed']['source_format'], 'all');
|
||||
} catch (Exception $e) {
|
||||
CakeLog::error($e->getMessage()); // TODO: Better exception logging
|
||||
if ($jobId) {
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Could not fetch freetext feed. See log for more details.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Could not fetch freetext feed. See log for more details.');
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach ($values as $k => $value) {
|
||||
$redis->sAdd('misp:feed_cache:' . $feed['Feed']['id'], md5($value['value']));
|
||||
$redis->sAdd('misp:feed_cache:combined', md5($value['value']));
|
||||
if ($jobId && ($k % 1000 == 0)) {
|
||||
$job->saveField('message', 'Feed ' . $feed['Feed']['id'] . ': ' . $k . ' values cached.');
|
||||
if ($k % 1000 == 0) {
|
||||
$this->jobProgress($jobId, 'Feed ' . $feed['Feed']['id'] . ': ' . $k . ' values cached.');
|
||||
}
|
||||
}
|
||||
$redis->set('misp:feed_cache_timestamp:' . $feed['Feed']['id'], time());
|
||||
|
@ -1105,10 +1050,6 @@ class Feed extends AppModel
|
|||
$redis->del('misp:feed_cache:' . $feed['Feed']['id']);
|
||||
|
||||
$k = 0;
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->id = $jobId;
|
||||
}
|
||||
foreach ($manifest as $uuid => $event) {
|
||||
try {
|
||||
$event = $this->downloadAndParseEventFromFeed($feed, $uuid, $HttpSocket);
|
||||
|
@ -1140,8 +1081,8 @@ class Feed extends AppModel
|
|||
}
|
||||
|
||||
$k++;
|
||||
if ($jobId && ($k % 10 == 0)) {
|
||||
$job->saveField('message', 'Feed ' . $feed['Feed']['id'] . ': ' . $k . ' events cached.');
|
||||
if ($k % 10 == 0) {
|
||||
$this->jobProgress($jobId, 'Feed ' . $feed['Feed']['id'] . ': ' . $k . ' events cached.');
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
@ -1164,28 +1105,20 @@ class Feed extends AppModel
|
|||
$redis->sAdd('misp:feed_cache:event_uuid_lookup:' . $v[0], $feedId . '/' . $v[1]);
|
||||
}
|
||||
$pipe->exec();
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->id = $jobId;
|
||||
$job->saveField('message', 'Feed ' . $feedId . ': cached via quick cache.');
|
||||
}
|
||||
$this->jobProgress($jobId, 'Feed ' . $feedId . ': cached via quick cache.');
|
||||
return true;
|
||||
}
|
||||
|
||||
private function __cacheMISPFeed($feed, $redis, $HttpSocket, $jobId = false)
|
||||
{
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
$job->id = $jobId;
|
||||
if (!$job->exists()) {
|
||||
$jobId = false;
|
||||
}
|
||||
}
|
||||
$result = true;
|
||||
if (!$this->__cacheMISPFeedCache($feed, $redis, $HttpSocket, $jobId)) {
|
||||
$this->__cacheMISPFeedTraditional($feed, $redis, $HttpSocket, $jobId);
|
||||
$result = $this->__cacheMISPFeedTraditional($feed, $redis, $HttpSocket, $jobId);
|
||||
};
|
||||
$redis->set('misp:feed_cache_timestamp:' . $feed['Feed']['id'], time());
|
||||
return true;
|
||||
if ($result) {
|
||||
$redis->set('misp:feed_cache_timestamp:' . $feed['Feed']['id'], time());
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function compareFeeds($id = false)
|
||||
|
@ -1622,4 +1555,29 @@ class Feed extends AppModel
|
|||
{
|
||||
return isset($feed['Feed']['input_source']) && $feed['Feed']['input_source'] === 'local';
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $jobId
|
||||
* @param string|null $message
|
||||
* @param int|null $progress
|
||||
*/
|
||||
private function jobProgress($jobId = null, $message = null, $progress = null)
|
||||
{
|
||||
if ($jobId) {
|
||||
$job = ClassRegistry::init('Job');
|
||||
|
||||
$jobData = array($job->primaryKey => $jobId);
|
||||
if ($message) {
|
||||
$jobData['message'] = $message;
|
||||
}
|
||||
if ($progress) {
|
||||
$jobData['progress'] = $progress;
|
||||
}
|
||||
try {
|
||||
$job->save($jobData);
|
||||
} catch (Exception $e) {
|
||||
// ignore error during saving information about job
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue