Merge pull request #5876 from JakubOnderka/pubsub

chg: [pubsub] Refactored PubSub tool
pull/5906/head
Andras Iklody 2020-05-14 10:24:47 +02:00 committed by GitHub
commit d8a5ee76dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 327 additions and 179 deletions

View File

@ -1624,6 +1624,7 @@ class ServersController extends AppController
$result = $pubSubTool->statusCheck(); $result = $pubSubTool->statusCheck();
if (!empty($result)) { if (!empty($result)) {
$this->set('events', $result['publishCount']); $this->set('events', $result['publishCount']);
$this->set('messages', $result['messageCount']);
$this->set('time', date('Y/m/d H:i:s', $result['timestamp'])); $this->set('time', date('Y/m/d H:i:s', $result['timestamp']));
$this->set('time2', date('Y/m/d H:i:s', $result['timestampSettings'])); $this->set('time2', date('Y/m/d H:i:s', $result['timestampSettings']));
} }

View File

@ -1,66 +1,53 @@
<?php <?php
class PubSubTool class PubSubTool
{ {
private $__redis = false; const SCRIPTS_TMP = APP . 'files' . DS . 'scripts' . DS . 'tmp' . DS;
private $__settings = false; const OLD_PID_LOCATION = APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.pid';
private function __getSetSettings() /**
{ * @var Redis
$settings = array( */
'redis_host' => 'localhost', private $redis;
'redis_port' => '6379',
'redis_password' => '',
'redis_database' => '1',
'redis_namespace' => 'mispq',
'port' => '50000',
);
foreach ($settings as $key => $setting) { /**
$temp = Configure::read('Plugin.ZeroMQ_' . $key); * @var array
if ($temp) { */
$settings[$key] = $temp; private $settings;
}
}
$settingsFile = new File(APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'settings.json', true, 0644);
$settingsFile->write(json_encode($settings, true));
$settingsFile->close();
return $settings;
}
public function initTool() public function initTool()
{ {
if (!$this->__redis) { if (!$this->redis) {
$settings = $this->__setupPubServer(); $settings = $this->getSetSettings();
$redis = new Redis(); $this->setupPubServer($settings);
$redis->connect($settings['redis_host'], $settings['redis_port']); $this->redis = $this->createRedisConnection($settings);
$redis_pwd = $settings['redis_password']; $this->settings = $settings;
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$this->__redis = $redis;
$this->__settings = $settings;
} else {
$settings = $this->__settings;
} }
return $settings;
} }
// read the pid file, if it exists, check if the process is actually running /**
// if either the pid file doesn't exists or the process is not running return false * Read the pid file, if it exists, check if the process is actually running
// otherwise return the pid * if either the pid file doesn't exists or the process is not running return false
public function checkIfRunning() * otherwise return the pid.
*
* @param string|null $pidFilePath
* @return bool|int False when process is not running, PID otherwise.
* @throws Exception
*/
public function checkIfRunning($pidFilePath = null)
{ {
$pidFile = new File(APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.pid'); $pidFile = new File($pidFilePath ?: self::SCRIPTS_TMP . 'mispzmq.pid');
$pid = $pidFile->read(true, 'r'); if (!$pidFile->exists()) {
return false;
}
$pid = $pidFile->read();
if ($pid === false || $pid === '') { if ($pid === false || $pid === '') {
return false; return false;
} }
if (!is_numeric($pid)) { if (!is_numeric($pid)) {
throw new Exception('Internal error (invalid PID file for the MISP zmq script)'); throw new Exception('Internal error (invalid PID file for the MISP zmq script)');
} }
$result = trim(shell_exec('ps aux | awk \'{print $2}\' | grep "^' . $pid . '$"')); $result = file_exists("/proc/$pid");
if (empty($result)) { if ($result === false) {
return false; return false;
} }
return $pid; return $pid;
@ -68,18 +55,14 @@ class PubSubTool
public function statusCheck() public function statusCheck()
{ {
$redis = new Redis(); $settings = $this->getSetSettings();
$settings = $this->__getSetSettings(); $redis = $this->createRedisConnection($settings);
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$redis->rPush($settings['redis_namespace'] . ':command', 'status'); $redis->rPush($settings['redis_namespace'] . ':command', 'status');
sleep(1); $response = $redis->blPop($settings['redis_namespace'] . ':status', 5);
$response = trim($redis->lPop($settings['redis_namespace'] . ':status')); if ($response === null) {
return json_decode($response, true); throw new Exception("No response from status command returned after 5 seconds.");
}
return json_decode(trim($response[1]), true);
} }
public function checkIfPythonLibInstalled() public function checkIfPythonLibInstalled()
@ -92,23 +75,12 @@ class PubSubTool
return false; return false;
} }
private function __setupPubServer()
{
App::uses('File', 'Utility');
$my_server = ClassRegistry::init('Server');
$settings = $this->__getSetSettings();
if ($this->checkIfRunning() === false) {
shell_exec($my_server->getPythonVersion() . ' ' . APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.py > ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.log 2> ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.error.log &');
}
return $settings;
}
public function publishEvent($event) public function publishEvent($event)
{ {
App::uses('JSONConverterTool', 'Tools'); App::uses('JSONConverterTool', 'Tools');
$jsonTool = new JSONConverterTool(); $jsonTool = new JSONConverterTool();
$json = $jsonTool->convert($event); $json = $jsonTool->convert($event);
return $this->__pushToRedis(':data:misp_json', $json); return $this->pushToRedis(':data:misp_json', $json);
} }
public function event_save($event, $action) public function event_save($event, $action)
@ -116,7 +88,7 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$event['action'] = $action; $event['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_event', json_encode($event, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_event', json_encode($event, JSON_PRETTY_PRINT));
} }
public function object_save($object, $action) public function object_save($object, $action)
@ -124,7 +96,7 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$object['action'] = $action; $object['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_object', json_encode($object, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_object', json_encode($object, JSON_PRETTY_PRINT));
} }
public function object_reference_save($object_reference, $action) public function object_reference_save($object_reference, $action)
@ -132,18 +104,12 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$object_reference['action'] = $action; $object_reference['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_object_reference', json_encode($object_reference, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_object_reference', json_encode($object_reference, JSON_PRETTY_PRINT));
} }
public function publishConversation($message) public function publishConversation($message)
{ {
return $this->__pushToRedis(':data:misp_json_conversation', json_encode($message, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_conversation', json_encode($message, JSON_PRETTY_PRINT));
}
private function __pushToRedis($ns, $data)
{
$this->__redis->rPush($this->__settings['redis_namespace'] . $ns, $data);
return true;
} }
public function attribute_save($attribute, $action = false) public function attribute_save($attribute, $action = false)
@ -151,7 +117,7 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$attribute['action'] = $action; $attribute['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_attribute', json_encode($attribute, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_attribute', json_encode($attribute, JSON_PRETTY_PRINT));
} }
public function tag_save($tag, $action = false) public function tag_save($tag, $action = false)
@ -159,7 +125,7 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$tag['action'] = $action; $tag['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_tag', json_encode($tag, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_tag', json_encode($tag, JSON_PRETTY_PRINT));
} }
public function sighting_save($sighting, $action = false) public function sighting_save($sighting, $action = false)
@ -167,7 +133,7 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$sighting['action'] = $action; $sighting['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_sighting', json_encode($sighting, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_sighting', json_encode($sighting, JSON_PRETTY_PRINT));
} }
public function modified($data, $type, $action = false) public function modified($data, $type, $action = false)
@ -175,7 +141,7 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$data['action'] = $action; $data['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT));
} }
public function publish($data, $type, $action = false) public function publish($data, $type, $action = false)
@ -183,48 +149,39 @@ class PubSubTool
if (!empty($action)) { if (!empty($action)) {
$data['action'] = $action; $data['action'] = $action;
} }
return $this->__pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT)); return $this->pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT));
} }
public function killService($settings = false) public function killService()
{ {
$redis = new Redis();
if ($this->checkIfRunning()) { if ($this->checkIfRunning()) {
if ($settings == false) { $settings = $this->getSetSettings();
$settings = $this->__getSetSettings(); $redis = $this->createRedisConnection($settings);
}
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$redis->rPush($settings['redis_namespace'] . ':command', 'kill'); $redis->rPush($settings['redis_namespace'] . ':command', 'kill');
sleep(1); sleep(1);
if ($this->checkIfRunning()) { if ($this->checkIfRunning()) {
// Still running.
return false; return false;
} }
} }
return true; return true;
} }
// reload the server if it is running, if not, start it /**
* Reload the server if it is running, if not, start it.
*
* @return bool|string
* @throws Exception
*/
public function reloadServer() public function reloadServer()
{ {
if (!$this->checkIfRunning()) { $settings = $this->getSetSettings();
$settings = $this->__setupPubServer(); $this->saveSettingToFile($settings);
} else {
$settings = $this->__getSetSettings(); if ($this->checkIfRunning()) {
$redis = new Redis(); $redis = $this->createRedisConnection($settings);
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$redis->rPush($settings['redis_namespace'] . ':command', 'reload'); $redis->rPush($settings['redis_namespace'] . ':command', 'reload');
} } else {
if (!$this->checkIfRunning()) {
return 'Setting saved, but something is wrong with the ZeroMQ server. Please check the diagnostics page for more information.'; return 'Setting saved, but something is wrong with the ZeroMQ server. Please check the diagnostics page for more information.';
} }
return true; return true;
@ -237,10 +194,96 @@ class PubSubTool
return 'Could not kill the previous instance of the ZeroMQ script.'; return 'Could not kill the previous instance of the ZeroMQ script.';
} }
} }
$this->__setupPubServer(); $settings = $this->getSetSettings();
if (!is_numeric($this->checkIfRunning())) { $this->setupPubServer($settings);
if ($this->checkIfRunning() === false) {
return 'Failed starting the ZeroMQ script.'; return 'Failed starting the ZeroMQ script.';
} }
return true; return true;
} }
/**
* @param array $settings
* @throws Exception
*/
private function setupPubServer(array $settings)
{
if ($this->checkIfRunning() === false) {
if ($this->checkIfRunning(self::OLD_PID_LOCATION)) {
// Old version is running, kill it and start again new one.
$redis = $this->createRedisConnection($settings);
$redis->rPush($settings['redis_namespace'] . ':command', 'kill');
sleep(1);
}
$this->saveSettingToFile($settings);
$server = ClassRegistry::init('Server');
shell_exec($server->getPythonVersion() . ' ' . APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.py >> ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.log 2>> ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.error.log &');
}
}
private function pushToRedis($ns, $data)
{
$this->redis->rPush($this->settings['redis_namespace'] . $ns, $data);
return true;
}
/**
* @param array $settings
* @return Redis
*/
private function createRedisConnection(array $settings)
{
$redis = new Redis();
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redisPassword = $settings['redis_password'];
if (!empty($redisPassword)) {
$redis->auth($redisPassword);
}
$redis->select($settings['redis_database']);
return $redis;
}
/**
* @param array $settings
* @throws Exception
*/
private function saveSettingToFile(array $settings)
{
$settingFilePath = self::SCRIPTS_TMP . 'mispzmq_settings.json';
$settingsFile = new File($settingFilePath, true, 0644);
if (!$settingsFile->exists()) {
throw new Exception("Could not create zmq config file '$settingFilePath'.");
}
// Because setting file contains secrets, it should be readable just by owner. But because in Travis test,
// config file is created under one user and then changed under other user, file must be readable and writable
// also by group.
chmod($settingsFile->pwd(), 0660);
if (!$settingsFile->write(json_encode($settings))) {
throw new Exception("Could not write zmq config file '$settingFilePath'.");
}
$settingsFile->close();
}
private function getSetSettings()
{
$settings = array(
'redis_host' => 'localhost',
'redis_port' => '6379',
'redis_password' => '',
'redis_database' => '1',
'redis_namespace' => 'mispq',
'port' => '50000',
'username' => null,
'password' => null,
);
foreach ($settings as $key => $setting) {
$temp = Configure::read('Plugin.ZeroMQ_' . $key);
if ($temp) {
$settings[$key] = $temp;
}
}
return $settings;
}
} }

View File

@ -28,7 +28,10 @@ class AppModel extends Model
{ {
public $name; public $name;
public $loadedPubSubTool = false; /**
* @var PubSubTool
*/
private $loadedPubSubTool;
public $loadedKafkaPubTool = false; public $loadedKafkaPubTool = false;
@ -2364,20 +2367,14 @@ class AppModel extends Model
public function getPubSubTool() public function getPubSubTool()
{ {
if (!$this->loadedPubSubTool) { if (!$this->loadedPubSubTool) {
$this->loadPubSubTool(); App::uses('PubSubTool', 'Tools');
$pubSubTool = new PubSubTool();
$pubSubTool->initTool();
$this->loadedPubSubTool = $pubSubTool;
} }
return $this->loadedPubSubTool; return $this->loadedPubSubTool;
} }
public function loadPubSubTool()
{
App::uses('PubSubTool', 'Tools');
$pubSubTool = new PubSubTool();
$pubSubTool->initTool();
$this->loadedPubSubTool = $pubSubTool;
return true;
}
public function getElasticSearchTool() public function getElasticSearchTool()
{ {
if (!$this->elasticSearchClient) { if (!$this->elasticSearchClient) {

View File

@ -1806,6 +1806,24 @@ class Server extends AppModel
'type' => 'numeric', 'type' => 'numeric',
'afterHook' => 'zmqAfterHook', 'afterHook' => 'zmqAfterHook',
), ),
'ZeroMQ_username' => array(
'level' => 2,
'description' => __('The username that client need to use to connect to ZeroMQ.'),
'value' => '',
'errorMessage' => '',
'test' => 'testForEmpty',
'type' => 'string',
'afterHook' => 'zmqAfterHook',
),
'ZeroMQ_password' => array(
'level' => 2,
'description' => __('The password that client need to use to connect to ZeroMQ.'),
'value' => '',
'errorMessage' => '',
'test' => 'testForEmpty',
'type' => 'string',
'afterHook' => 'zmqAfterHook',
),
'ZeroMQ_redis_host' => array( 'ZeroMQ_redis_host' => array(
'level' => 2, 'level' => 2,
'description' => __('Location of the Redis db used by MISP and the Python PUB script to queue data to be published.'), 'description' => __('Location of the Redis db used by MISP and the Python PUB script to queue data to be published.'),

View File

@ -2,9 +2,10 @@
<legend><?php echo __('ZeroMQ Server Status');?></legend> <legend><?php echo __('ZeroMQ Server Status');?></legend>
<div style="padding-left:5px;padding-right:5px;padding-bottom:5px;"> <div style="padding-left:5px;padding-right:5px;padding-bottom:5px;">
<?php if (isset($time)): ?> <?php if (isset($time)): ?>
<p><b><?php echo __('Start time');?></b>: <?php echo h($time); ?><br /> <p><b><?php echo __('Reply time');?></b>: <?php echo h($time); ?><br>
<b><?php echo __('Settings read at');?></b>: <?php echo h($time2); ?><br /> <b><?php echo __('Start time');?></b>: <?php echo h($time2); ?><br>
<b><?php echo __('Events processed');?></b>: <?php echo h($events); ?></p> <b><?php echo __('Events processed');?></b>: <?php echo h($events); ?><br>
<b><?php echo __('Messages processed');?></b>: <?php echo h($messages); ?></p>
<?php else: ?> <?php else: ?>
<p><?php echo __('The ZeroMQ server is unreachable.');?></p> <p><?php echo __('The ZeroMQ server is unreachable.');?></p>
<?php endif; ?> <?php endif; ?>

View File

@ -1,15 +1,20 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import zmq import zmq
from zmq.auth.thread import ThreadAuthenticator
from zmq.utils.monitor import recv_monitor_message
import sys import sys
import redis import redis
import json import json
import os import os
import time import time
import threading
import logging
from pathlib import Path from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s")
def check_pid(pid): def check_pid(pid):
""" Check For the existence of a unix pid. """ """ Check For the existence of a unix pid. """
if not pid: if not pid:
@ -24,102 +29,185 @@ def check_pid(pid):
return True return True
class MISPZMQ: EVENT_MAP = {}
for name in dir(zmq):
if name.startswith("EVENT_"):
value = getattr(zmq, name)
EVENT_MAP[value] = name
def event_monitor(monitor, logger):
while monitor.poll():
evt = recv_monitor_message(monitor)
if evt["event"] == zmq.EVENT_MONITOR_STOPPED:
break
evt.update({"description": EVENT_MAP[evt["event"]]})
logger.debug("ZMQ event: {}".format(evt))
monitor.close()
class MispZmq:
message_count = 0
publish_count = 0
monitor_thread = None
auth = None
socket = None
pidfile = None
def __init__(self): def __init__(self):
self.current_location = Path(__file__).parent self._logger = logging.getLogger()
self.pidfile = self.current_location / "mispzmq.pid"
self.publishCount = 0 self.tmp_location = Path(__file__).parent.parent / "tmp"
self.pidfile = self.tmp_location / "mispzmq.pid"
if self.pidfile.exists(): if self.pidfile.exists():
with open(self.pidfile.as_posix()) as f: with open(self.pidfile.as_posix()) as f:
pid = f.read() pid = f.read()
if check_pid(pid): if check_pid(pid):
raise Exception('mispzmq already running on PID {}'.format(pid)) raise Exception("mispzmq already running on PID {}".format(pid))
else: else:
# Cleanup # Cleanup
self.pidfile.unlink() self.pidfile.unlink()
if (self.current_location / 'settings.json').exists(): if (self.tmp_location / "mispzmq_settings.json").exists():
self.setup() self._setup()
else: else:
raise Exception("The settings file is missing.") raise Exception("The settings file is missing.")
def setup(self): def _setup(self):
with open((self.current_location / 'settings.json').as_posix()) as settings_file: with open((self.tmp_location / "mispzmq_settings.json").as_posix()) as settings_file:
self.settings = json.load(settings_file) self.settings = json.load(settings_file)
self.namespace = self.settings["redis_namespace"] self.namespace = self.settings["redis_namespace"]
self.r = redis.StrictRedis(host=self.settings["redis_host"], db=self.settings["redis_database"], self.r = redis.StrictRedis(host=self.settings["redis_host"], db=self.settings["redis_database"],
password=self.settings["redis_password"], port=self.settings["redis_port"], password=self.settings["redis_password"], port=self.settings["redis_port"],
decode_responses=True) decode_responses=True)
self.timestampSettings = time.time() self.timestamp_settings = time.time()
self._logger.debug("Connected to Redis {}:{}/{}".format(self.settings["redis_host"], self.settings["redis_port"],
self.settings["redis_database"]))
def handleCommand(self, command): def _setup_zmq(self):
context = zmq.Context()
if "username" in self.settings and self.settings["username"]:
if "password" not in self.settings or not self.settings["password"]:
raise Exception("When username is set, password cannot be empty.")
self.auth = ThreadAuthenticator(context)
self.auth.start()
self.auth.configure_plain(domain="*", passwords={self.settings["username"]: self.settings["password"]})
else:
if self.auth:
self.auth.stop()
self.auth = None
self.socket = context.socket(zmq.PUB)
if self.settings["username"]:
self.socket.plain_server = True # must come before bind
self.socket.bind("tcp://*:{}".format(self.settings["port"]))
self._logger.debug("ZMQ listening on tcp://*:{}".format(self.settings["port"]))
if self._logger.isEnabledFor(logging.DEBUG):
monitor = self.socket.get_monitor_socket()
self.monitor_thread = threading.Thread(target=event_monitor, args=(monitor, self._logger))
self.monitor_thread.start()
else:
if self.monitor_thread:
self.socket.disable_monitor()
self.monitor_thread = None
def _handle_command(self, command):
if command == "kill": if command == "kill":
print("Kill command received, shutting down.") self._logger.info("Kill command received, shutting down.")
self.pidfile.unlink() self.clean()
sys.exit() sys.exit()
elif command == "reload": elif command == "reload":
print("Reload command received, reloading settings from file.") self._logger.info("Reload command received, reloading settings from file.")
self.setup() self._setup()
self._setup_zmq()
elif command == "status": elif command == "status":
print("Status command received, responding with latest stats.") self._logger.info("Status command received, responding with latest stats.")
self.r.delete("{}:status".format(self.namespace)) self.r.delete("{}:status".format(self.namespace))
self.r.lpush("{}:status".format(self.namespace), self.r.lpush("{}:status".format(self.namespace),
json.dumps({"timestamp": time.time(), json.dumps({"timestamp": time.time(),
"timestampSettings": self.timestampSettings, "timestampSettings": self.timestamp_settings,
"publishCount": self.publishCount})) "publishCount": self.publish_count,
"messageCount": self.message_count}))
else:
self._logger.warning("Received invalid command '{}'.".format(command))
def createPidFile(self): def _create_pid_file(self):
with open(self.pidfile.as_posix(), 'w') as f: with open(self.pidfile.as_posix(), "w") as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
def pubMessage(self, topic, data, socket): def _pub_message(self, topic, data):
socket.send_string("{} {}".format(topic, data)) self.socket.send_string("{} {}".format(topic, data))
if topic is 'misp_json':
self.publishCount += 1 def clean(self):
if self.monitor_thread:
self.socket.disable_monitor()
if self.auth:
self.auth.stop()
if self.socket:
self.socket.close()
if self.pidfile:
self.pidfile.unlink()
def main(self): def main(self):
start_time = int(time.time()) self._create_pid_file()
self.createPidFile() self._setup_zmq()
time.sleep(1)
status_array = [ status_array = [
"And when you're dead I will be still alive.", "And when you're dead I will be still alive.",
"And believe me I am still alive.", "And believe me I am still alive.",
"I'm doing science and I'm still alive.", "I'm doing science and I'm still alive.",
"I feel FANTASTIC and I'm still alive.", "I feel FANTASTIC and I'm still alive.",
"While you're dying I'll be still alive." "While you're dying I'll be still alive.",
] ]
topics = ["misp_json", "misp_json_event", "misp_json_attribute", "misp_json_sighting", topics = ["misp_json", "misp_json_event", "misp_json_attribute", "misp_json_sighting",
"misp_json_organisation", "misp_json_user", "misp_json_conversation", "misp_json_organisation", "misp_json_user", "misp_json_conversation",
"misp_json_object", "misp_json_object_reference", "misp_json_audit", "misp_json_object", "misp_json_object_reference", "misp_json_audit",
"misp_json_tag" "misp_json_tag",
] ]
context = zmq.Context()
socket = context.socket(zmq.PUB) lists = ["{}:command".format(self.namespace)]
socket.bind("tcp://*:{}".format(self.settings["port"])) for topic in topics:
time.sleep(1) lists.append("{}:data:{}".format(self.namespace, topic))
while True: while True:
command = self.r.lpop("{}:command".format(self.namespace)) data = self.r.blpop(lists, timeout=10)
if command is not None:
self.handleCommand(command) if data is None:
message_received = False # redis timeout expired
for topic in topics: current_time = int(time.time())
data = self.r.lpop("{}:data:{}".format(self.namespace, topic)) time_delta = current_time - int(self.timestamp_settings)
if data is not None: status_entry = int(time_delta / 10 % 5)
self.pubMessage(topic, data, socket)
message_received = True
if not message_received:
time.sleep(0.1)
current_time = 10 * time.time()
temp_start_time = 10 * start_time
time_delta = int(current_time - temp_start_time)
if time_delta % 100 == 0:
status_entry = int(time_delta / 100 % 5)
status_message = { status_message = {
'status': status_array[status_entry], "status": status_array[status_entry],
'uptime': int(time.time()) - start_time "uptime": current_time - int(self.timestamp_settings)
} }
self.pubMessage('misp_json_self', json.dumps(status_message), socket) self._pub_message("misp_json_self", json.dumps(status_message))
self._logger.debug("No message received for 10 seconds, sending ZMQ status message.")
else:
key, value = data
key = key.replace("{}:".format(self.namespace), "")
if key == "command":
self._handle_command(value)
elif key.startswith("data:"):
topic = key.split(":")[1]
self._logger.debug("Received data for topic '{}', sending to ZMQ.".format(topic))
self._pub_message(topic, value)
self.message_count += 1
if topic == "misp_json":
self.publish_count += 1
else:
self._logger.warning("Received invalid message '{}'.".format(key))
if __name__ == "__main__": if __name__ == "__main__":
mzq = MISPZMQ() mzq = MispZmq()
mzq.main() try:
mzq.main()
except KeyboardInterrupt:
mzq.clean()