new: [sync] When saving sightings, push just new sightings

pull/6817/head
Jakub Onderka 2021-03-11 11:48:36 +01:00
parent d82a95b903
commit 683e52702d
5 changed files with 128 additions and 86 deletions

View File

@ -322,24 +322,36 @@ class EventShell extends AppShell
public function publish_sightings()
{
$this->ConfigLoad->execute();
$id = $this->args[0];
$passAlong = $this->args[1];
$jobId = $this->args[2];
$userId = $this->args[3];
list($id, $passAlong, $jobId, $userId) = $this->args;
$user = $this->getUser($userId);
$job = $this->Job->read(null, $jobId);
$this->Event->Behaviors->unload('SysLogLogable.SysLogLogable');
$result = $this->Event->publish_sightings($id, $passAlong);
$job['Job']['progress'] = 100;
$job['Job']['date_modified'] = date("Y-m-d H:i:s");
if ($result) {
$job['Job']['message'] = 'Sightings published.';
} else {
$job['Job']['message'] = 'Sightings published, but the upload to other instances may have failed.';
$sightingsUuidsToPush = [];
if (isset($this->args[4])) { // push just specific sightings
$path = APP . 'tmp/cache/ingest' . DS . $this->args[4];
$tempFile = new File($path);
$inputData = $tempFile->read();
if ($inputData === false) {
$this->error("File `$path` not found.");
}
$sightingsUuidsToPush = $this->Event->jsonDecode($inputData);
$tempFile->delete();
}
$this->Job->save($job);
$this->Event->Behaviors->unload('SysLogLogable.SysLogLogable');
$result = $this->Event->publish_sightings($id, $passAlong, $sightingsUuidsToPush);
$count = count($sightingsUuidsToPush);
$message = $count === 0 ? "All sightings published" : "$count sightings published";
if ($result) {
$message .= '.';
} else {
$message .= ', but the upload to other instances may have failed.';
}
$this->Job->saveStatus($jobId, true, $message);
$log = ClassRegistry::init('Log');
$log->createLogEntry($user, 'publish_sightings', 'Event', $id, 'Sightings for event (' . $id . '): published.', 'publish_sightings updated');
$title = $count === 0 ? "All sightings for event published." : "$count sightings for event published.";
$log->createLogEntry($user, 'publish_sightings', 'Event', $id, $title, 'publish_sightings updated');
}
public function publish_galaxy_clusters()

View File

@ -2579,7 +2579,7 @@ class EventsController extends AppController
throw new NotFoundException(__('Invalid event'));
}
if ($this->request->is('post') || $this->request->is('put')) {
$result = $this->Event->publishRouter($event['Event']['id'], null, $this->Auth->user(), 'sightings');
$result = $this->Event->publishSightingsRouter($event['Event']['id'], $this->Auth->user());
if (!Configure::read('MISP.background_jobs')) {
if (!is_array($result)) {
// redirect to the view event page

View File

@ -4317,10 +4317,11 @@ class Event extends AppModel
* New variant of uploadEventToServersRouter (since 2.4.137) for pushing sightings.
* @param array $event with event tags and whole sharing group
* @param null|int $passAlong Server ID that should be skipped from uploading.
* @param array $sightingsUuidsToPush
* @return array|bool
* @throws Exception
*/
private function uploadEventSightingsToServersRouter(array $event, $passAlong = null)
private function uploadEventSightingsToServersRouter(array $event, $passAlong, array $sightingsUuidsToPush)
{
$this->Server = ClassRegistry::init('Server');
$conditions = ['Server.push_sightings' => 1];
@ -4347,7 +4348,7 @@ class Event extends AppModel
$failedServers = [];
foreach ($servers as $server) {
try {
$this->pushSightingsToServer($server, $event);
$this->pushSightingsToServer($server, $event, $sightingsUuidsToPush);
} catch (Exception $e) {
$this->logException("Uploading sightings to server {$server['Server']['id']} failed.", $e);
$failedServers[] = $server['Server']['url'];
@ -4362,10 +4363,12 @@ class Event extends AppModel
/**
* @param array $server
* @param array $event
* @param array $sightingsUuidsToPush
* @throws HttpClientJsonException
* @throws JsonException
* @throws Exception
*/
private function pushSightingsToServer(array $server, array $event)
private function pushSightingsToServer(array $server, array $event, array $sightingsUuidsToPush = [])
{
App::uses('ServerSyncTool', 'Tools');
if (!isset($this->Sighting)) {
@ -4374,7 +4377,7 @@ class Event extends AppModel
$serverSync = new ServerSyncTool($server, $this->setupSyncRequest($server));
try {
if ($serverSync->eventExists($event) === false) {
return;
return; // skip if event not exists on remote server
}
} catch (Exception $e) {}
@ -4385,18 +4388,17 @@ class Event extends AppModel
],
];
foreach ($this->Sighting->fetchUuidsForEventToPush($event, $fakeSyncUser) as $sightingsUuids) {
// Process sightings in batch to keep memory requirements low
foreach ($this->Sighting->fetchUuidsForEventToPush($event, $fakeSyncUser, $sightingsUuidsToPush) as $batch) {
// Filter out sightings that already exists on remote server
$existingSightings = $serverSync->filterSightingUuidsForPush($event, $sightingsUuids);
$sightingToPush = array_diff($sightingsUuids, $existingSightings);
if (empty($sightingToPush)) {
$existingSightings = $serverSync->filterSightingUuidsForPush($event, $batch);
$newSightings = array_diff($batch, $existingSightings);
if (empty($newSightings)) {
continue;
}
$query = [
'conditions' => ['Sighting.uuid' => $sightingToPush],
];
$sightings = $this->Sighting->attachToEvent($event, $fakeSyncUser, null, $query, true);
$conditions = ['Sighting.uuid' => $newSightings];
$sightings = $this->Sighting->attachToEvent($event, $fakeSyncUser, null, $conditions, true);
$serverSync->uploadSightings($sightings, $event['Event']['uuid']);
}
}
@ -4516,48 +4518,81 @@ class Event extends AppModel
}
}
public function publishRouter($id, $passAlong = null, $user, $scope = 'events')
/**
* @param int $id Event ID
* @param array $user
* @param int|null $passAlong Server ID that should be skipped when pushing sightings.
* @param array $sightingUuids Push just sightings with these UUIDs
* @return array|bool
* @throws Exception
*/
public function publishSightingsRouter($id, array $user, $passAlong = null, array $sightingUuids = [])
{
if (Configure::read('MISP.background_jobs')) {
$job_type = 'publish_' . $scope;
$function = 'publish';
$message = 'Publishing.';
if ($scope === 'sightings') {
$message = 'Publishing sightings.';
$function = 'publish_sightings';
}
$job = ClassRegistry::init('Job');
$message = empty($sightingUuids) ? __('Publishing sightings.') : __('Publishing %s sightings.', count($sightingUuids));
$job->create();
$data = array(
$job->save([
'worker' => 'prio',
'job_type' => 'publish_event',
'job_input' => 'Event ID: ' . $id,
'status' => 0,
'retries' => 0,
'org_id' => $user['org_id'],
'org' => $user['Organisation']['name'],
'message' => $message
'message' => $message,
]);
$command = ['publish_sightings', $id, $passAlong, $job->id, $user['id']];
if (!empty($sightingUuids)) {
$randomFileName = $this->generateRandomFileName() . '.json';
App::uses('File', 'Utility');
$tempFile = new File(APP . 'tmp/cache/ingest' . DS . $randomFileName, true, 0644);
$writeResult = $tempFile->write(json_encode($sightingUuids));
if (!$writeResult) {
throw new Exception("Could not write file content");
}
$command[] = $randomFileName;
}
$processId = CakeResque::enqueue('prio', 'EventShell', $command, true);
$job->saveField('process_id', $processId);
return $processId;
}
return $this->publish_sightings($id, $passAlong, $sightingUuids);
}
public function publishRouter($id, $passAlong = null, $user)
{
if (Configure::read('MISP.background_jobs')) {
$job = ClassRegistry::init('Job');
$job->create();
$data = array(
'worker' => 'prio',
'job_type' => 'publish_event',
'job_input' => 'Event ID: ' . $id,
'status' => 0,
'retries' => 0,
'org_id' => $user['org_id'],
'org' => $user['Organisation']['name'],
'message' => 'Publishing.'
);
$job->save($data);
$jobId = $job->id;
$process_id = CakeResque::enqueue(
'prio',
'EventShell',
array($function, $id, $passAlong, $jobId, $user['id']),
array('publish', $id, $passAlong, $jobId, $user['id']),
true
);
$job->saveField('process_id', $process_id);
return $process_id;
} elseif ($scope === 'sightings') {
$result = $this->publish_sightings($id, $passAlong);
return $result;
} else {
$result = $this->publish($id, $passAlong);
return $result;
}
}
public function publish_sightings($id, $passAlong = null, $jobId = null)
public function publish_sightings($id, $passAlong = null, array $sightingsUuidsToPush = [])
{
if (is_numeric($id)) {
$condition = array('Event.id' => $id);
@ -4572,18 +4607,16 @@ class Event extends AppModel
if (empty($event)) {
return false;
}
if ($jobId) {
$this->Behaviors->unload('SysLogLogable.SysLogLogable');
} else {
// update the DB to set the sightings timestamp
// for background jobs, this should be done already
$fieldList = array('id', 'info', 'sighting_timestamp');
$event['Event']['sighting_timestamp'] = time();
$event['Event']['skip_zmq'] = 1;
$event['Event']['skip_kafka'] = 1;
$this->save($event, array('fieldList' => $fieldList));
}
return $this->uploadEventSightingsToServersRouter($event, $passAlong);
// update the DB to set the sightings timestamp
// for background jobs, this should be done already
$fieldList = array('id', 'info', 'sighting_timestamp');
$event['Event']['sighting_timestamp'] = time();
$event['Event']['skip_zmq'] = 1;
$event['Event']['skip_kafka'] = 1;
$this->save($event, array('fieldList' => $fieldList));
return $this->uploadEventSightingsToServersRouter($event, $passAlong, $sightingsUuidsToPush);
}
// Performs all the actions required to publish an event

View File

@ -3,19 +3,19 @@ App::uses('AppModel', 'Model');
class Job extends AppModel
{
const STATUS_WAITING = 1;
const STATUS_RUNNING = 2;
const STATUS_FAILED = 3;
const STATUS_COMPLETED = 4;
const STATUS_WAITING = 1,
STATUS_RUNNING = 2,
STATUS_FAILED = 3,
STATUS_COMPLETED = 4;
public $belongsTo = array(
'Org' => array(
'className' => 'Organisation',
'foreignKey' => 'org_id',
'order' => array(),
'fields' => array('id', 'name', 'uuid')
),
);
'Org' => array(
'className' => 'Organisation',
'foreignKey' => 'org_id',
'order' => array(),
'fields' => array('id', 'name', 'uuid')
),
);
public function beforeValidate($options = array())
{

View File

@ -550,15 +550,19 @@ class Sighting extends AppModel
/**
* @param array $event
* @param array $user
* @param array $sightingsUuidsToPush
* @return Generator<array>
*/
public function fetchUuidsForEventToPush(array $event, array $user)
public function fetchUuidsForEventToPush(array $event, array $user, array $sightingsUuidsToPush = [])
{
$conditions = $this->createConditions($user, $event);
if ($conditions === false) {
return null;
}
$conditions['Sighting.event_id'] = $event['Event']['id'];
if (!empty($sightingsUuidsToPush)) {
$conditions['Sighting.uuid'] = $sightingsUuidsToPush;
}
while (true) {
$uuids = $this->find('column', [
@ -583,11 +587,11 @@ class Sighting extends AppModel
* @param array $event Just 'Event' object is enough
* @param array $user
* @param array|int|null $attribute Attribute model or attribute ID
* @param array|bool $extraQuery
* @param array|bool $extraConditions
* @param bool $forSync
* @return array|int
*/
public function attachToEvent(array $event, array $user, $attribute = null, $extraQuery = false, $forSync = false)
public function attachToEvent(array $event, array $user, $attribute = null, $extraConditions = false, $forSync = false)
{
$conditions = $this->createConditions($user, $event);
if ($conditions === false) {
@ -606,21 +610,13 @@ class Sighting extends AppModel
]);
}
$query = [
if ($extraConditions !== false) {
$conditions['AND'] = $extraConditions;
}
$sightings = $this->find('all', array(
'conditions' => $conditions,
'recursive' => -1,
];
if (isset($extraQuery['conditions'])) {
$query['conditions']['AND'] = $extraQuery['conditions'];
}
if (isset($extraQuery['limit'])) {
$query['limit'] = $extraQuery['limit'];
}
if (isset($extraQuery['order'])) {
$query['order'] = $extraQuery['order'];
}
$sightings = $this->find('all', $query);
));
if (empty($sightings)) {
return [];
}
@ -725,7 +721,7 @@ class Sighting extends AppModel
}
++$sightingsAdded;
if ($publish) {
$this->Event->publishRouter($sighting['event_id'], null, $user, 'sightings');
$this->Event->publishSightingsRouter($sighting['event_id'], $user);
}
}
return $sightingsAdded;
@ -1097,7 +1093,8 @@ class Sighting extends AppModel
}
if ($this->saveMany($toSave)) {
$this->Event->publishRouter($event['Event']['id'], $passAlong, $user, 'sightings');
$existingUuids = array_column($toSave, 'uuid');
$this->Event->publishSightingsRouter($event['Event']['id'], $user, $passAlong, $existingUuids);
return count($toSave);
} else {
return 0;