chg: [pubsub] Refactored PubSub tool

pull/5876/head
Jakub Onderka 2020-05-07 15:50:54 +02:00
parent 0c1dba99d2
commit 1e07dfc9ae
4 changed files with 322 additions and 176 deletions

View File

@ -1,66 +1,53 @@
<?php
class PubSubTool
{
private $__redis = false;
private $__settings = false;
const SCRIPTS_TMP = APP . 'files' . DS . 'scripts' . DS . 'tmp' . DS;
const OLD_PID_LOCATION = APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.pid';
private function __getSetSettings()
{
$settings = array(
'redis_host' => 'localhost',
'redis_port' => '6379',
'redis_password' => '',
'redis_database' => '1',
'redis_namespace' => 'mispq',
'port' => '50000',
);
/**
* @var Redis
*/
private $redis;
foreach ($settings as $key => $setting) {
$temp = Configure::read('Plugin.ZeroMQ_' . $key);
if ($temp) {
$settings[$key] = $temp;
}
}
$settingsFile = new File(APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'settings.json', true, 0644);
$settingsFile->write(json_encode($settings, true));
$settingsFile->close();
return $settings;
}
/**
* @var array
*/
private $settings;
public function initTool()
{
if (!$this->__redis) {
$settings = $this->__setupPubServer();
$redis = new Redis();
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$this->__redis = $redis;
$this->__settings = $settings;
} else {
$settings = $this->__settings;
if (!$this->redis) {
$settings = $this->getSetSettings();
$this->setupPubServer($settings);
$this->redis = $this->createRedisConnection($settings);
$this->settings = $settings;
}
return $settings;
}
// read the pid file, if it exists, check if the process is actually running
// if either the pid file doesn't exists or the process is not running return false
// otherwise return the pid
public function checkIfRunning()
/**
* Read the pid file, if it exists, check if the process is actually running
* if either the pid file doesn't exists or the process is not running return false
* otherwise return the pid.
*
* @param string|null $pidFilePath
* @return bool|int False when process is not running, PID otherwise.
* @throws Exception
*/
public function checkIfRunning($pidFilePath = null)
{
$pidFile = new File(APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.pid');
$pid = $pidFile->read(true, 'r');
$pidFile = new File($pidFilePath ?: self::SCRIPTS_TMP . 'mispzmq.pid');
if (!$pidFile->exists()) {
return false;
}
$pid = $pidFile->read();
if ($pid === false || $pid === '') {
return false;
}
if (!is_numeric($pid)) {
throw new Exception('Internal error (invalid PID file for the MISP zmq script)');
}
$result = trim(shell_exec('ps aux | awk \'{print $2}\' | grep "^' . $pid . '$"'));
if (empty($result)) {
$result = file_exists("/proc/$pid");
if ($result === false) {
return false;
}
return $pid;
@ -68,18 +55,14 @@ class PubSubTool
public function statusCheck()
{
$redis = new Redis();
$settings = $this->__getSetSettings();
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$settings = $this->getSetSettings();
$redis = $this->createRedisConnection($settings);
$redis->rPush($settings['redis_namespace'] . ':command', 'status');
sleep(1);
$response = trim($redis->lPop($settings['redis_namespace'] . ':status'));
return json_decode($response, true);
$response = $redis->blPop($settings['redis_namespace'] . ':status', 5);
if ($response === null) {
throw new Exception("No response from status command returned after 5 seconds.");
}
return json_decode(trim($response[1]), true);
}
public function checkIfPythonLibInstalled()
@ -92,23 +75,12 @@ class PubSubTool
return false;
}
private function __setupPubServer()
{
App::uses('File', 'Utility');
$my_server = ClassRegistry::init('Server');
$settings = $this->__getSetSettings();
if ($this->checkIfRunning() === false) {
shell_exec($my_server->getPythonVersion() . ' ' . APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.py > ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.log 2> ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.error.log &');
}
return $settings;
}
public function publishEvent($event)
{
App::uses('JSONConverterTool', 'Tools');
$jsonTool = new JSONConverterTool();
$json = $jsonTool->convert($event);
return $this->__pushToRedis(':data:misp_json', $json);
return $this->pushToRedis(':data:misp_json', $json);
}
public function event_save($event, $action)
@ -116,7 +88,7 @@ class PubSubTool
if (!empty($action)) {
$event['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_event', json_encode($event, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_event', json_encode($event, JSON_PRETTY_PRINT));
}
public function object_save($object, $action)
@ -124,7 +96,7 @@ class PubSubTool
if (!empty($action)) {
$object['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_object', json_encode($object, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_object', json_encode($object, JSON_PRETTY_PRINT));
}
public function object_reference_save($object_reference, $action)
@ -132,18 +104,12 @@ class PubSubTool
if (!empty($action)) {
$object_reference['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_object_reference', json_encode($object_reference, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_object_reference', json_encode($object_reference, JSON_PRETTY_PRINT));
}
public function publishConversation($message)
{
return $this->__pushToRedis(':data:misp_json_conversation', json_encode($message, JSON_PRETTY_PRINT));
}
private function __pushToRedis($ns, $data)
{
$this->__redis->rPush($this->__settings['redis_namespace'] . $ns, $data);
return true;
return $this->pushToRedis(':data:misp_json_conversation', json_encode($message, JSON_PRETTY_PRINT));
}
public function attribute_save($attribute, $action = false)
@ -151,7 +117,7 @@ class PubSubTool
if (!empty($action)) {
$attribute['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_attribute', json_encode($attribute, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_attribute', json_encode($attribute, JSON_PRETTY_PRINT));
}
public function tag_save($tag, $action = false)
@ -159,7 +125,7 @@ class PubSubTool
if (!empty($action)) {
$tag['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_tag', json_encode($tag, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_tag', json_encode($tag, JSON_PRETTY_PRINT));
}
public function sighting_save($sighting, $action = false)
@ -167,7 +133,7 @@ class PubSubTool
if (!empty($action)) {
$sighting['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_sighting', json_encode($sighting, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_sighting', json_encode($sighting, JSON_PRETTY_PRINT));
}
public function modified($data, $type, $action = false)
@ -175,7 +141,7 @@ class PubSubTool
if (!empty($action)) {
$data['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT));
}
public function publish($data, $type, $action = false)
@ -183,48 +149,39 @@ class PubSubTool
if (!empty($action)) {
$data['action'] = $action;
}
return $this->__pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT));
return $this->pushToRedis(':data:misp_json_' . $type, json_encode($data, JSON_PRETTY_PRINT));
}
public function killService($settings = false)
public function killService()
{
$redis = new Redis();
if ($this->checkIfRunning()) {
if ($settings == false) {
$settings = $this->__getSetSettings();
}
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$settings = $this->getSetSettings();
$redis = $this->createRedisConnection($settings);
$redis->rPush($settings['redis_namespace'] . ':command', 'kill');
sleep(1);
if ($this->checkIfRunning()) {
// Still running.
return false;
}
}
return true;
}
// reload the server if it is running, if not, start it
/**
* Reload the server if it is running, if not, start it.
*
* @return bool|string
* @throws Exception
*/
public function reloadServer()
{
if (!$this->checkIfRunning()) {
$settings = $this->__setupPubServer();
} else {
$settings = $this->__getSetSettings();
$redis = new Redis();
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redis_pwd = $settings['redis_password'];
if (!empty($redis_pwd)) {
$redis->auth($redis_pwd);
}
$redis->select($settings['redis_database']);
$settings = $this->getSetSettings();
$this->saveSettingToFile($settings);
if ($this->checkIfRunning()) {
$redis = $this->createRedisConnection($settings);
$redis->rPush($settings['redis_namespace'] . ':command', 'reload');
}
if (!$this->checkIfRunning()) {
} else {
return 'Setting saved, but something is wrong with the ZeroMQ server. Please check the diagnostics page for more information.';
}
return true;
@ -237,10 +194,96 @@ class PubSubTool
return 'Could not kill the previous instance of the ZeroMQ script.';
}
}
$this->__setupPubServer();
if (!is_numeric($this->checkIfRunning())) {
$settings = $this->getSetSettings();
$this->setupPubServer($settings);
if ($this->checkIfRunning() === false) {
return 'Failed starting the ZeroMQ script.';
}
return true;
}
/**
* @param array $settings
* @throws Exception
*/
private function setupPubServer(array $settings)
{
if ($this->checkIfRunning() === false) {
if ($this->checkIfRunning(self::OLD_PID_LOCATION)) {
// Old version is running, kill it and start again new one.
$redis = $this->createRedisConnection($settings);
$redis->rPush($settings['redis_namespace'] . ':command', 'kill');
sleep(1);
}
$this->saveSettingToFile($settings);
$server = ClassRegistry::init('Server');
shell_exec($server->getPythonVersion() . ' ' . APP . 'files' . DS . 'scripts' . DS . 'mispzmq' . DS . 'mispzmq.py >> ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.log 2>> ' . APP . 'tmp' . DS . 'logs' . DS . 'mispzmq.error.log &');
}
}
private function pushToRedis($ns, $data)
{
$this->redis->rPush($this->settings['redis_namespace'] . $ns, $data);
return true;
}
/**
* @param array $settings
* @return Redis
*/
private function createRedisConnection(array $settings)
{
$redis = new Redis();
$redis->connect($settings['redis_host'], $settings['redis_port']);
$redisPassword = $settings['redis_password'];
if (!empty($redisPassword)) {
$redis->auth($redisPassword);
}
$redis->select($settings['redis_database']);
return $redis;
}
/**
* @param array $settings
* @throws Exception
*/
private function saveSettingToFile(array $settings)
{
$settingFilePath = self::SCRIPTS_TMP . 'mispzmq_settings.json';
$settingsFile = new File($settingFilePath, true, 0644);
if (!$settingsFile->exists()) {
throw new Exception("Could not create zmq config file '$settingFilePath'.");
}
// Because setting file contains secrets, it should be readable just by owner. But because in Travis test,
// config file is created under one user and then changed under other user, file must be readable and writable
// also by group.
chmod($settingsFile->pwd(), 0660);
if (!$settingsFile->write(json_encode($settings))) {
throw new Exception("Could not write zmq config file '$settingFilePath'.");
}
$settingsFile->close();
}
private function getSetSettings()
{
$settings = array(
'redis_host' => 'localhost',
'redis_port' => '6379',
'redis_password' => '',
'redis_database' => '1',
'redis_namespace' => 'mispq',
'port' => '50000',
'username' => null,
'password' => null,
);
foreach ($settings as $key => $setting) {
$temp = Configure::read('Plugin.ZeroMQ_' . $key);
if ($temp) {
$settings[$key] = $temp;
}
}
return $settings;
}
}

View File

@ -28,7 +28,10 @@ class AppModel extends Model
{
public $name;
public $loadedPubSubTool = false;
/**
* @var PubSubTool
*/
private $loadedPubSubTool;
public $loadedKafkaPubTool = false;
@ -2364,20 +2367,14 @@ class AppModel extends Model
public function getPubSubTool()
{
if (!$this->loadedPubSubTool) {
$this->loadPubSubTool();
App::uses('PubSubTool', 'Tools');
$pubSubTool = new PubSubTool();
$pubSubTool->initTool();
$this->loadedPubSubTool = $pubSubTool;
}
return $this->loadedPubSubTool;
}
public function loadPubSubTool()
{
App::uses('PubSubTool', 'Tools');
$pubSubTool = new PubSubTool();
$pubSubTool->initTool();
$this->loadedPubSubTool = $pubSubTool;
return true;
}
public function getElasticSearchTool()
{
if (!$this->elasticSearchClient) {

View File

@ -1806,6 +1806,24 @@ class Server extends AppModel
'type' => 'numeric',
'afterHook' => 'zmqAfterHook',
),
'ZeroMQ_username' => array(
'level' => 2,
'description' => __('The username that client need to use to connect to ZeroMQ.'),
'value' => '',
'errorMessage' => '',
'test' => 'testForEmpty',
'type' => 'string',
'afterHook' => 'zmqAfterHook',
),
'ZeroMQ_password' => array(
'level' => 2,
'description' => __('The password that client need to use to connect to ZeroMQ.'),
'value' => '',
'errorMessage' => '',
'test' => 'testForEmpty',
'type' => 'string',
'afterHook' => 'zmqAfterHook',
),
'ZeroMQ_redis_host' => array(
'level' => 2,
'description' => __('Location of the Redis db used by MISP and the Python PUB script to queue data to be published.'),

View File

@ -1,15 +1,20 @@
#!/usr/bin/env python3
import zmq
from zmq.auth.thread import ThreadAuthenticator
from zmq.utils.monitor import recv_monitor_message
import sys
import redis
import json
import os
import time
import threading
import logging
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s")
def check_pid(pid):
""" Check For the existence of a unix pid. """
if not pid:
@ -24,102 +29,185 @@ def check_pid(pid):
return True
class MISPZMQ:
EVENT_MAP = {}
for name in dir(zmq):
if name.startswith("EVENT_"):
value = getattr(zmq, name)
EVENT_MAP[value] = name
def event_monitor(monitor, logger):
while monitor.poll():
evt = recv_monitor_message(monitor)
if evt["event"] == zmq.EVENT_MONITOR_STOPPED:
break
evt.update({"description": EVENT_MAP[evt["event"]]})
logger.debug("ZMQ event: {}".format(evt))
monitor.close()
class MispZmq:
message_count = 0
publish_count = 0
monitor_thread = None
auth = None
socket = None
pidfile = None
def __init__(self):
self.current_location = Path(__file__).parent
self.pidfile = self.current_location / "mispzmq.pid"
self.publishCount = 0
self._logger = logging.getLogger()
self.tmp_location = Path(__file__).parent.parent / "tmp"
self.pidfile = self.tmp_location / "mispzmq.pid"
if self.pidfile.exists():
with open(self.pidfile.as_posix()) as f:
pid = f.read()
if check_pid(pid):
raise Exception('mispzmq already running on PID {}'.format(pid))
raise Exception("mispzmq already running on PID {}".format(pid))
else:
# Cleanup
self.pidfile.unlink()
if (self.current_location / 'settings.json').exists():
self.setup()
if (self.tmp_location / "mispzmq_settings.json").exists():
self._setup()
else:
raise Exception("The settings file is missing.")
def setup(self):
with open((self.current_location / 'settings.json').as_posix()) as settings_file:
def _setup(self):
with open((self.tmp_location / "mispzmq_settings.json").as_posix()) as settings_file:
self.settings = json.load(settings_file)
self.namespace = self.settings["redis_namespace"]
self.r = redis.StrictRedis(host=self.settings["redis_host"], db=self.settings["redis_database"],
password=self.settings["redis_password"], port=self.settings["redis_port"],
decode_responses=True)
self.timestampSettings = time.time()
self.timestamp_settings = time.time()
self._logger.debug("Connected to Redis {}:{}/{}".format(self.settings["redis_host"], self.settings["redis_port"],
self.settings["redis_database"]))
def handleCommand(self, command):
def _setup_zmq(self):
context = zmq.Context()
if "username" in self.settings and self.settings["username"]:
if "password" not in self.settings or not self.settings["password"]:
raise Exception("When username is set, password cannot be empty.")
self.auth = ThreadAuthenticator(context)
self.auth.start()
self.auth.configure_plain(domain="*", passwords={self.settings["username"]: self.settings["password"]})
else:
if self.auth:
self.auth.stop()
self.auth = None
self.socket = context.socket(zmq.PUB)
if self.settings["username"]:
self.socket.plain_server = True # must come before bind
self.socket.bind("tcp://*:{}".format(self.settings["port"]))
self._logger.debug("ZMQ listening on tcp://*:{}".format(self.settings["port"]))
if self._logger.isEnabledFor(logging.DEBUG):
monitor = self.socket.get_monitor_socket()
self.monitor_thread = threading.Thread(target=event_monitor, args=(monitor, self._logger))
self.monitor_thread.start()
else:
if self.monitor_thread:
self.socket.disable_monitor()
self.monitor_thread = None
def _handle_command(self, command):
if command == "kill":
print("Kill command received, shutting down.")
self.pidfile.unlink()
self._logger.info("Kill command received, shutting down.")
self.clean()
sys.exit()
elif command == "reload":
print("Reload command received, reloading settings from file.")
self.setup()
self._logger.info("Reload command received, reloading settings from file.")
self._setup()
self._setup_zmq()
elif command == "status":
print("Status command received, responding with latest stats.")
self._logger.info("Status command received, responding with latest stats.")
self.r.delete("{}:status".format(self.namespace))
self.r.lpush("{}:status".format(self.namespace),
json.dumps({"timestamp": time.time(),
"timestampSettings": self.timestampSettings,
"publishCount": self.publishCount}))
"timestampSettings": self.timestamp_settings,
"publishCount": self.publish_count,
"messageCount": self.message_count}))
else:
self._logger.warning("Received invalid command '{}'.".format(command))
def createPidFile(self):
with open(self.pidfile.as_posix(), 'w') as f:
def _create_pid_file(self):
with open(self.pidfile.as_posix(), "w") as f:
f.write(str(os.getpid()))
def pubMessage(self, topic, data, socket):
socket.send_string("{} {}".format(topic, data))
if topic is 'misp_json':
self.publishCount += 1
def _pub_message(self, topic, data):
self.socket.send_string("{} {}".format(topic, data))
def clean(self):
if self.monitor_thread:
self.socket.disable_monitor()
if self.auth:
self.auth.stop()
if self.socket:
self.socket.close()
if self.pidfile:
self.pidfile.unlink()
def main(self):
start_time = int(time.time())
self.createPidFile()
self._create_pid_file()
self._setup_zmq()
time.sleep(1)
status_array = [
"And when you're dead I will be still alive.",
"And believe me I am still alive.",
"I'm doing science and I'm still alive.",
"I feel FANTASTIC and I'm still alive.",
"While you're dying I'll be still alive."
"While you're dying I'll be still alive.",
]
topics = ["misp_json", "misp_json_event", "misp_json_attribute", "misp_json_sighting",
"misp_json_organisation", "misp_json_user", "misp_json_conversation",
"misp_json_object", "misp_json_object_reference", "misp_json_audit",
"misp_json_tag"
"misp_json_tag",
]
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:{}".format(self.settings["port"]))
time.sleep(1)
lists = ["{}:command".format(self.namespace)]
for topic in topics:
lists.append("{}:data:{}".format(self.namespace, topic))
while True:
command = self.r.lpop("{}:command".format(self.namespace))
if command is not None:
self.handleCommand(command)
message_received = False
for topic in topics:
data = self.r.lpop("{}:data:{}".format(self.namespace, topic))
if data is not None:
self.pubMessage(topic, data, socket)
message_received = True
if not message_received:
time.sleep(0.1)
current_time = 10 * time.time()
temp_start_time = 10 * start_time
time_delta = int(current_time - temp_start_time)
if time_delta % 100 == 0:
status_entry = int(time_delta / 100 % 5)
data = self.r.blpop(lists, timeout=10)
if data is None:
# redis timeout expired
current_time = int(time.time())
time_delta = current_time - int(self.timestamp_settings)
status_entry = int(time_delta / 10 % 5)
status_message = {
'status': status_array[status_entry],
'uptime': int(time.time()) - start_time
"status": status_array[status_entry],
"uptime": current_time - int(self.timestamp_settings)
}
self.pubMessage('misp_json_self', json.dumps(status_message), socket)
self._pub_message("misp_json_self", json.dumps(status_message))
self._logger.debug("No message received for 10 seconds, sending ZMQ status message.")
else:
key, value = data
key = key.replace("{}:".format(self.namespace), "")
if key == "command":
self._handle_command(value)
elif key.startswith("data:"):
topic = key.split(":")[1]
self._logger.debug("Received data for topic '{}', sending to ZMQ.".format(topic))
self._pub_message(topic, value)
self.message_count += 1
if topic == "misp_json":
self.publish_count += 1
else:
self._logger.warning("Received invalid message '{}'.".format(key))
if __name__ == "__main__":
mzq = MISPZMQ()
mzq.main()
mzq = MispZmq()
try:
mzq.main()
except KeyboardInterrupt:
mzq.clean()