Merge pull request #9491 from JakubOnderka/zmq-supervisor

Zmq supervisor
pull/9492/head
Jakub Onderka 2024-01-13 15:52:51 +01:00 committed by GitHub
commit 2b212125b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 36 deletions

View File

@ -109,6 +109,9 @@ class AdminShell extends AppShell
$parser->addSubcommand('configLint', [ $parser->addSubcommand('configLint', [
'help' => __('Check if settings has correct value.'), 'help' => __('Check if settings has correct value.'),
]); ]);
$parser->addSubcommand('createZmqConfig', [
'help' => __('Create config file for ZeroMQ server.'),
]);
$parser->addSubcommand('scanAttachment', [ $parser->addSubcommand('scanAttachment', [
'help' => __('Scan attachments with AV.'), 'help' => __('Scan attachments with AV.'),
'parser' => [ 'parser' => [
@ -1251,4 +1254,10 @@ class AdminShell extends AppShell
$this->Job->saveField('message', __('Database truncated: ' . $table)); $this->Job->saveField('message', __('Database truncated: ' . $table));
} }
} }
public function createZmqConfig()
{
$this->Server->getPubSubTool()->createConfigFile();
$this->err("Config file created in " . PubSubTool::SCRIPTS_TMP);
}
} }

View File

@ -178,8 +178,12 @@ class PubSubTool
public function killService() public function killService()
{ {
$settings = $this->getSetSettings();
if ($settings['supervisor_managed']) {
throw new RuntimeException('ZeroMQ server is managed by supervisor, it is not possible to restart it.');
}
if ($this->checkIfRunning()) { if ($this->checkIfRunning()) {
$settings = $this->getSetSettings();
$redis = $this->createRedisConnection($settings); $redis = $this->createRedisConnection($settings);
$redis->rPush('command', 'kill'); $redis->rPush('command', 'kill');
sleep(1); sleep(1);
@ -213,12 +217,16 @@ class PubSubTool
public function restartServer() public function restartServer()
{ {
$settings = $this->getSetSettings();
if ($settings['supervisor_managed']) {
throw new RuntimeException('ZeroMQ server is managed by supervisor, it is not possible to restart it.');
}
if (!$this->checkIfRunning()) { if (!$this->checkIfRunning()) {
if (!$this->killService()) { if (!$this->killService()) {
return 'Could not kill the previous instance of the ZeroMQ script.'; return 'Could not kill the previous instance of the ZeroMQ script.';
} }
} }
$settings = $this->getSetSettings();
$this->setupPubServer($settings); $this->setupPubServer($settings);
if ($this->checkIfRunning() === false) { if ($this->checkIfRunning() === false) {
return 'Failed starting the ZeroMQ script.'; return 'Failed starting the ZeroMQ script.';
@ -226,12 +234,22 @@ class PubSubTool
return true; return true;
} }
public function createConfigFile()
{
$settings = $this->getSetSettings();
$this->saveSettingToFile($settings);
}
/** /**
* @param array $settings * @param array $settings
* @throws Exception * @throws Exception
*/ */
private function setupPubServer(array $settings) private function setupPubServer(array $settings)
{ {
if ($settings['supervisor_managed']) {
return; // server is managed by supervisor, we don't need to check if is running or start it when not
}
if ($this->checkIfRunning() === false) { if ($this->checkIfRunning() === false) {
if ($this->checkIfRunning(self::OLD_PID_LOCATION)) { if ($this->checkIfRunning(self::OLD_PID_LOCATION)) {
// Old version is running, kill it and start again new one. // Old version is running, kill it and start again new one.
@ -250,6 +268,7 @@ class PubSubTool
* @param string|array $data * @param string|array $data
* @return bool * @return bool
* @throws JsonException * @throws JsonException
* @throws RedisException
*/ */
private function pushToRedis($ns, $data) private function pushToRedis($ns, $data)
{ {
@ -295,9 +314,12 @@ class PubSubTool
FileAccessTool::writeToFile($settingFilePath, JsonTool::encode($settings)); FileAccessTool::writeToFile($settingFilePath, JsonTool::encode($settings));
} }
/**
* @return array
*/
private function getSetSettings() private function getSetSettings()
{ {
$settings = array( $settings = [
'redis_host' => 'localhost', 'redis_host' => 'localhost',
'redis_port' => 6379, 'redis_port' => 6379,
'redis_password' => '', 'redis_password' => '',
@ -307,7 +329,8 @@ class PubSubTool
'port' => '50000', 'port' => '50000',
'username' => null, 'username' => null,
'password' => null, 'password' => null,
); 'supervisor_managed' => false,
];
$pluginConfig = Configure::read('Plugin'); $pluginConfig = Configure::read('Plugin');
foreach ($settings as $key => $setting) { foreach ($settings as $key => $setting) {

View File

@ -4,15 +4,18 @@ from zmq.auth.thread import ThreadAuthenticator
from zmq.utils.monitor import recv_monitor_message from zmq.utils.monitor import recv_monitor_message
import sys import sys
import redis import redis
import json
import os import os
import time import time
import threading import threading
import logging import logging
import typing
import argparse
from pathlib import Path from pathlib import Path
try:
logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s") import orjson as json
except ImportError:
import json
def check_pid(pid): def check_pid(pid):
@ -55,10 +58,11 @@ class MispZmq:
socket = None socket = None
pidfile = None pidfile = None
r: redis.StrictRedis redis: redis.StrictRedis
namespace: str namespace: str
def __init__(self): def __init__(self, debug=False):
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
self._logger = logging.getLogger() self._logger = logging.getLogger()
self.tmp_location = Path(__file__).parent.parent / "tmp" self.tmp_location = Path(__file__).parent.parent / "tmp"
@ -67,7 +71,7 @@ class MispZmq:
with open(self.pidfile.as_posix()) as f: with open(self.pidfile.as_posix()) as f:
pid = f.read() pid = f.read()
if check_pid(pid): if check_pid(pid):
raise Exception("mispzmq already running on PID {}".format(pid)) raise Exception(f"mispzmq already running on PID {pid}")
else: else:
# Cleanup # Cleanup
self.pidfile.unlink() self.pidfile.unlink()
@ -77,17 +81,18 @@ class MispZmq:
raise Exception("The settings file is missing.") raise Exception("The settings file is missing.")
def _setup(self): def _setup(self):
with open((self.tmp_location / "mispzmq_settings.json").as_posix()) as settings_file: with open((self.tmp_location / "mispzmq_settings.json").as_posix(), 'rb') as settings_file:
self.settings = json.load(settings_file) self.settings = json.loads(settings_file.read())
self.namespace = self.settings["redis_namespace"] self.namespace = self.settings["redis_namespace"]
# Check if TLS is being used with Redis host # Check if TLS is being used with Redis host
redis_host = self.settings["redis_host"] redis_host = self.settings["redis_host"]
redis_ssl = redis_host.startswith("tls://") redis_ssl = redis_host.startswith("tls://")
if redis_host.startswith("tls://"): if redis_host.startswith("tls://"):
redis_host = redis_host[6:] redis_host = redis_host[6:]
self.r = redis.StrictRedis(host=redis_host, db=self.settings["redis_database"], self.redis = redis.StrictRedis(host=redis_host, db=self.settings["redis_database"],
password=self.settings["redis_password"], port=self.settings["redis_port"], password=self.settings["redis_password"], port=self.settings["redis_port"],
decode_responses=True, ssl=redis_ssl) ssl=redis_ssl)
self.timestamp_settings = time.time() self.timestamp_settings = time.time()
self._logger.debug("Connected to Redis {}:{}/{}".format(self.settings["redis_host"], self.settings["redis_port"], self._logger.debug("Connected to Redis {}:{}/{}".format(self.settings["redis_host"], self.settings["redis_port"],
self.settings["redis_database"])) self.settings["redis_database"]))
@ -122,34 +127,38 @@ class MispZmq:
self.socket.disable_monitor() self.socket.disable_monitor()
self.monitor_thread = None self.monitor_thread = None
def _handle_command(self, command): def _handle_command(self, command: bytes):
if command == "kill": if command == b"kill":
self._logger.info("Kill command received, shutting down.") self._logger.info("Kill command received, shutting down.")
self.clean() self.clean()
sys.exit() sys.exit()
elif command == "reload": elif command == b"reload":
self._logger.info("Reload command received, reloading settings from file.") self._logger.info("Reload command received, reloading settings from file.")
self._setup() self._setup()
self._setup_zmq() self._setup_zmq()
elif command == "status": elif command == b"status":
self._logger.info("Status command received, responding with latest stats.") self._logger.info("Status command received, responding with latest stats.")
self.r.delete("{}:status".format(self.namespace)) self.redis.delete(f"{self.namespace}:status")
self.r.lpush("{}:status".format(self.namespace), self.redis.lpush(f"{self.namespace}:status",
json.dumps({"timestamp": time.time(), json.dumps({"timestamp": time.time(),
"timestampSettings": self.timestamp_settings, "timestampSettings": self.timestamp_settings,
"publishCount": self.publish_count, "publishCount": self.publish_count,
"messageCount": self.message_count})) "messageCount": self.message_count}))
else: else:
self._logger.warning("Received invalid command '{}'.".format(command)) self._logger.warning(f"Received invalid command '{command}'.")
def _create_pid_file(self): def _create_pid_file(self):
with open(self.pidfile.as_posix(), "w") as f: with open(self.pidfile.as_posix(), "w") as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
def _pub_message(self, topic, data): def _pub_message(self, topic: bytes, data: typing.Union[str, bytes]):
self.socket.send_string("{} {}".format(topic, data)) data_to_send = bytearray()
data_to_send.extend(topic)
data_to_send.extend(b" ")
data_to_send.extend(data.encode("utf-8") if isinstance(data, str) else data)
self.socket.send(bytes(data_to_send))
def clean(self): def clean(self):
if self.monitor_thread: if self.monitor_thread:
@ -179,12 +188,14 @@ class MispZmq:
"misp_json_tag", "misp_json_warninglist", "misp_json_workflow" "misp_json_tag", "misp_json_warninglist", "misp_json_workflow"
] ]
lists = ["{}:command".format(self.namespace)] lists = [f"{self.namespace}:command"]
for topic in topics: for topic in topics:
lists.append("{}:data:{}".format(self.namespace, topic)) lists.append(f"{self.namespace}:data:{topic}")
key_prefix = f"{self.namespace}:".encode("utf-8")
while True: while True:
data = self.r.blpop(lists, timeout=10) data = self.redis.blpop(lists, timeout=10)
if data is None: if data is None:
# redis timeout expired # redis timeout expired
@ -195,26 +206,30 @@ class MispZmq:
"status": status_array[status_entry], "status": status_array[status_entry],
"uptime": current_time - int(self.timestamp_settings) "uptime": current_time - int(self.timestamp_settings)
} }
self._pub_message("misp_json_self", json.dumps(status_message)) self._pub_message(b"misp_json_self", json.dumps(status_message))
self._logger.debug("No message received for 10 seconds, sending ZMQ status message.") self._logger.debug("No message received from Redis for 10 seconds, sending ZMQ status message.")
else: else:
key, value = data key, value = data
key = key.replace("{}:".format(self.namespace), "") key = key.replace(key_prefix, b"")
if key == "command": if key == b"command":
self._handle_command(value) self._handle_command(value)
elif key.startswith("data:"): elif key.startswith(b"data:"):
topic = key.split(":")[1] topic = key.split(b":", 1)[1]
self._logger.debug("Received data for topic '{}', sending to ZMQ.".format(topic)) self._logger.debug("Received data for topic %s, sending to ZMQ.", topic)
self._pub_message(topic, value) self._pub_message(topic, value)
self.message_count += 1 self.message_count += 1
if topic == "misp_json": if topic == b"misp_json":
self.publish_count += 1 self.publish_count += 1
else: else:
self._logger.warning("Received invalid message '{}'.".format(key)) self._logger.warning("Received invalid message type %s.", key)
if __name__ == "__main__": if __name__ == "__main__":
mzq = MispZmq() arg_parser = argparse.ArgumentParser(description="MISP ZeroMQ PUB server")
arg_parser.add_argument("--debug", action="store_true", help="Enable debugging messages")
parsed = arg_parser.parse_args()
mzq = MispZmq(parsed.debug)
try: try:
mzq.main() mzq.main()
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
import sys
import zmq
import argparse
def main(port: int):
context = zmq.Context()
print("Connecting to MISP ZeroMQ server…", file=sys.stderr)
socket = context.socket(zmq.SUB)
socket.connect(f"tcp://localhost:{port}")
socket.setsockopt(zmq.SUBSCRIBE, b"misp_")
print(f"Connected to tcp://localhost:{port}", file=sys.stderr)
while True:
message = socket.recv()
print(message)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Example Python client for MISP ZMQ")
parser.add_argument("--port", default=50000, type=int)
parsed = parser.parse_args()
try:
main(parsed.port)
except KeyboardInterrupt:
pass