chg: [internal] Simplify fetching Kafka topic

pull/7824/head
Jakub Onderka 2021-10-10 18:41:44 +02:00
parent a33bf23342
commit d9e89955bc
9 changed files with 61 additions and 59 deletions

View File

@ -2577,9 +2577,10 @@ class AppModel extends Model
return true;
}
public function publishKafkaNotification($topicName, $data, $action = false) {
$kafkaTopic = Configure::read('Plugin.Kafka_' . $topicName . '_notifications_topic');
if (Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_' . $topicName . '_notifications_enable') && !empty($kafkaTopic)) {
public function publishKafkaNotification($topicName, $data, $action = false)
{
$kafkaTopic = $this->kafkaTopic($topicName);
if ($kafkaTopic) {
$this->getKafkaPubTool()->publishJson($kafkaTopic, $data, $action);
}
}
@ -3275,4 +3276,23 @@ class AppModel extends Model
}
return $this->Log;
}
/**
* @param string $name
* @return string|null Null when Kafka is not enabled, topic is not enabled or topic is not defined
*/
protected function kafkaTopic($name)
{
static $kafkaEnabled;
if ($kafkaEnabled === null) {
$kafkaEnabled = (bool)Configure::read('Plugin.Kafka_enable');
}
if ($kafkaEnabled) {
if (!Configure::read("Plugin.Kafka_{$name}_notifications_enable")) {
return null;
}
return Configure::read("Plugin.Kafka_{$name}_notifications_topic") ?: null;
}
return null;
}
}

View File

@ -454,9 +454,8 @@ class Attribute extends AppModel
$result = $this->saveAttachment($this->data['Attribute']);
}
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_attribute_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_attribute_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_attribute_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('attribute');
if ($pubToZmq || $kafkaTopic) {
$attribute = $this->fetchAttribute($this->id);
if (!empty($attribute)) {
$user = array(
@ -481,7 +480,7 @@ class Attribute extends AppModel
$pubSubTool->attribute_save($attribute, $action);
unset($attribute['Attribute']['data']);
}
if ($pubToKafka) {
if ($kafkaTopic) {
if (Configure::read('Plugin.Kafka_include_attachments') && $this->typeIsAttachment($attribute['Attribute']['type'])) {
$attribute['Attribute']['data'] = $this->base64EncodeAttachment($attribute['Attribute']);
}
@ -513,8 +512,8 @@ class Attribute extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->attribute_save($this->data, 'delete');
}
$kafkaTopic = Configure::read('Plugin.Kafka_attribute_notifications_topic');
if (Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_attribute_notifications_enable') && !empty($kafkaTopic)) {
$kafkaTopic = $this->kafkaTopic('attribute');
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $this->data, 'delete');
}

View File

@ -35,11 +35,9 @@ class AttributeTag extends AppModel
public function afterSave($created, $options = array())
{
parent::afterSave($created, $options);
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_tag_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_tag_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_tag_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('tag');
if ($pubToZmq || $kafkaTopic) {
$tag = $this->find('first', array(
'recursive' => -1,
'conditions' => array('AttributeTag.id' => $this->id),
@ -52,7 +50,7 @@ class AttributeTag extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->tag_save($tag, 'attached to attribute');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $tag, 'attached to attribute');
}
@ -62,9 +60,8 @@ class AttributeTag extends AppModel
public function beforeDelete($cascade = true)
{
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_tag_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_tag_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_tag_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('tag');
if ($pubToZmq || $kafkaTopic) {
if (!empty($this->id)) {
$tag = $this->find('first', array(
'recursive' => -1,
@ -78,7 +75,7 @@ class AttributeTag extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->tag_save($tag, 'detached from attribute');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $tag, 'detached from attribute');
}

View File

@ -29,9 +29,8 @@ class EventTag extends AppModel
{
parent::afterSave($created, $options);
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_tag_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_tag_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_tag_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('tag');
if ($pubToZmq || $kafkaTopic) {
$tag = $this->find('first', array(
'recursive' => -1,
'conditions' => array('EventTag.id' => $this->id),
@ -43,7 +42,7 @@ class EventTag extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->tag_save($tag, 'attached to event');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $tag, 'attached to event');
}
@ -53,9 +52,8 @@ class EventTag extends AppModel
public function beforeDelete($cascade = true)
{
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_tag_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_tag_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_tag_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('tag');
if ($pubToZmq || $kafkaTopic) {
if (!empty($this->id)) {
$tag = $this->find('first', array(
'recursive' => -1,
@ -68,7 +66,7 @@ class EventTag extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->tag_save($tag, 'detached from event');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $tag, 'detached from event');
}

View File

@ -300,11 +300,8 @@ class MispObject extends AppModel
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') &&
Configure::read('Plugin.ZeroMQ_object_notifications_enable') &&
empty($this->data['Object']['skip_zmq']);
$kafkaTopic = Configure::read('Plugin.Kafka_object_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') &&
Configure::read('Plugin.Kafka_object_notifications_enable') &&
!empty($kafkaTopic) &&
empty($this->data['Object']['skip_kafka']);
$kafkaTopic = $this->kafkaTopic('object');
$pubToKafka = $kafkaTopic && empty($this->data['Object']['skip_kafka']);
if ($pubToZmq || $pubToKafka) {
$object = $this->find('first', array(
'conditions' => array('Object.id' => $this->id),

View File

@ -45,8 +45,7 @@ class ObjectReference extends AppModel
$this->data['ObjectReference']['uuid'] = CakeText::uuid();
}
if (empty($this->data['ObjectReference']['timestamp'])) {
$date = new DateTime();
$this->data['ObjectReference']['timestamp'] = $date->getTimestamp();
$this->data['ObjectReference']['timestamp'] = time();
}
if (!isset($this->data['ObjectReference']['comment'])) {
$this->data['ObjectReference']['comment'] = '';
@ -57,9 +56,8 @@ class ObjectReference extends AppModel
public function afterSave($created, $options = array())
{
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_object_reference_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_object_reference_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_object_reference_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('object_reference');
if ($pubToZmq || $kafkaTopic) {
$object_reference = $this->find('first', array(
'conditions' => array('ObjectReference.id' => $this->id),
'recursive' => -1
@ -72,7 +70,7 @@ class ObjectReference extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->object_reference_save($object_reference, $action);
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $object_reference, $action);
}

View File

@ -77,11 +77,9 @@ class Sighting extends AppModel
public function afterSave($created, $options = array())
{
parent::afterSave($created, $options = array());
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_sighting_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_sighting_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_sighting_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('sighting');
if ($pubToZmq || $kafkaTopic) {
$user = array(
'org_id' => -1,
'Role' => array(
@ -93,7 +91,7 @@ class Sighting extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->sighting_save($sighting, 'add');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $sighting, 'add');
}
@ -105,9 +103,8 @@ class Sighting extends AppModel
{
parent::beforeDelete();
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_sighting_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_sighting_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_sighting_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('sighting');
if ($pubToZmq || $kafkaTopic) {
$user = array(
'org_id' => -1,
'Role' => array(
@ -119,7 +116,7 @@ class Sighting extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->sighting_save($sighting, 'delete');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $sighting, 'delete');
}

View File

@ -106,11 +106,9 @@ class Tag extends AppModel
public function afterSave($created, $options = array())
{
parent::afterSave($created, $options);
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_tag_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_tag_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_tag_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('tag');
if ($pubToZmq || $kafkaTopic) {
$tag = $this->find('first', array(
'recursive' => -1,
'conditions' => array('Tag.id' => $this->id)
@ -120,7 +118,7 @@ class Tag extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->tag_save($tag, $action);
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $tag, $action);
}
@ -130,9 +128,8 @@ class Tag extends AppModel
public function beforeDelete($cascade = true)
{
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_tag_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_tag_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_tag_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('tag');
if ($pubToZmq || $kafkaTopic) {
if (!empty($this->id)) {
$tag = $this->find('first', array(
'recursive' => -1,
@ -142,7 +139,7 @@ class Tag extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->tag_save($tag, 'delete');
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $tag, 'delete');
}

View File

@ -279,9 +279,8 @@ class User extends AppModel
public function afterSave($created, $options = array())
{
$pubToZmq = Configure::read('Plugin.ZeroMQ_enable') && Configure::read('Plugin.ZeroMQ_user_notifications_enable');
$kafkaTopic = Configure::read('Plugin.Kafka_user_notifications_topic');
$pubToKafka = Configure::read('Plugin.Kafka_enable') && Configure::read('Plugin.Kafka_user_notifications_enable') && !empty($kafkaTopic);
if ($pubToZmq || $pubToKafka) {
$kafkaTopic = $this->kafkaTopic('user');
if ($pubToZmq || $kafkaTopic) {
if (!empty($this->data)) {
$user = $this->data;
if (!isset($user['User'])) {
@ -311,7 +310,7 @@ class User extends AppModel
$pubSubTool = $this->getPubSubTool();
$pubSubTool->modified($user, 'user', $action);
}
if ($pubToKafka) {
if ($kafkaTopic) {
$kafkaPubTool = $this->getKafkaPubTool();
$kafkaPubTool->publishJson($kafkaTopic, $user, $action);
}