diff --git a/app/Console/Command/AdminShell.php b/app/Console/Command/AdminShell.php index fb0a4ee12..df86ac826 100644 --- a/app/Console/Command/AdminShell.php +++ b/app/Console/Command/AdminShell.php @@ -109,6 +109,9 @@ class AdminShell extends AppShell $parser->addSubcommand('configLint', [ 'help' => __('Check if settings has correct value.'), ]); + $parser->addSubcommand('createZmqConfig', [ + 'help' => __('Create config file for ZeroMQ server.'), + ]); $parser->addSubcommand('scanAttachment', [ 'help' => __('Scan attachments with AV.'), 'parser' => [ @@ -1251,4 +1254,10 @@ class AdminShell extends AppShell $this->Job->saveField('message', __('Database truncated: ' . $table)); } } + + public function createZmqConfig() + { + $this->Server->getPubSubTool()->createConfigFile(); + $this->err("Config file created in " . PubSubTool::SCRIPTS_TMP); + } } diff --git a/app/Lib/Tools/PubSubTool.php b/app/Lib/Tools/PubSubTool.php index 6a17a020a..b03cfb547 100644 --- a/app/Lib/Tools/PubSubTool.php +++ b/app/Lib/Tools/PubSubTool.php @@ -178,8 +178,12 @@ class PubSubTool public function killService() { + $settings = $this->getSetSettings(); + if ($settings['supervisor_managed']) { + throw new RuntimeException('ZeroMQ server is managed by supervisor, it is not possible to restart it.'); + } + if ($this->checkIfRunning()) { - $settings = $this->getSetSettings(); $redis = $this->createRedisConnection($settings); $redis->rPush('command', 'kill'); sleep(1); @@ -213,12 +217,16 @@ class PubSubTool public function restartServer() { + $settings = $this->getSetSettings(); + if ($settings['supervisor_managed']) { + throw new RuntimeException('ZeroMQ server is managed by supervisor, it is not possible to restart it.'); + } + if (!$this->checkIfRunning()) { if (!$this->killService()) { return 'Could not kill the previous instance of the ZeroMQ script.'; } } - $settings = $this->getSetSettings(); $this->setupPubServer($settings); if ($this->checkIfRunning() === false) { return 'Failed starting the ZeroMQ script.'; @@ -226,12 +234,22 @@ class PubSubTool return true; } + public function createConfigFile() + { + $settings = $this->getSetSettings(); + $this->saveSettingToFile($settings); + } + /** * @param array $settings * @throws Exception */ private function setupPubServer(array $settings) { + if ($settings['supervisor_managed']) { + return; // server is managed by supervisor, we don't need to check if is running or start it when not + } + if ($this->checkIfRunning() === false) { if ($this->checkIfRunning(self::OLD_PID_LOCATION)) { // Old version is running, kill it and start again new one. @@ -250,6 +268,7 @@ class PubSubTool * @param string|array $data * @return bool * @throws JsonException + * @throws RedisException */ private function pushToRedis($ns, $data) { @@ -295,9 +314,12 @@ class PubSubTool FileAccessTool::writeToFile($settingFilePath, JsonTool::encode($settings)); } + /** + * @return array + */ private function getSetSettings() { - $settings = array( + $settings = [ 'redis_host' => 'localhost', 'redis_port' => 6379, 'redis_password' => '', @@ -307,7 +329,8 @@ class PubSubTool 'port' => '50000', 'username' => null, 'password' => null, - ); + 'supervisor_managed' => false, + ]; $pluginConfig = Configure::read('Plugin'); foreach ($settings as $key => $setting) { diff --git a/app/files/scripts/mispzmq/mispzmq.py b/app/files/scripts/mispzmq/mispzmq.py index 4c67684ce..0bfe6a890 100644 --- a/app/files/scripts/mispzmq/mispzmq.py +++ b/app/files/scripts/mispzmq/mispzmq.py @@ -4,15 +4,18 @@ from zmq.auth.thread import ThreadAuthenticator from zmq.utils.monitor import recv_monitor_message import sys import redis -import json import os import time import threading import logging +import typing +import argparse from pathlib import Path - -logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s") +try: + import orjson as json +except ImportError: + import json def check_pid(pid): @@ -55,10 +58,11 @@ class MispZmq: socket = None pidfile = None - r: redis.StrictRedis + redis: redis.StrictRedis namespace: str - def __init__(self): + def __init__(self, debug=False): + logging.basicConfig(level=logging.DEBUG if debug else logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") self._logger = logging.getLogger() self.tmp_location = Path(__file__).parent.parent / "tmp" @@ -67,7 +71,7 @@ class MispZmq: with open(self.pidfile.as_posix()) as f: pid = f.read() if check_pid(pid): - raise Exception("mispzmq already running on PID {}".format(pid)) + raise Exception(f"mispzmq already running on PID {pid}") else: # Cleanup self.pidfile.unlink() @@ -77,17 +81,18 @@ class MispZmq: raise Exception("The settings file is missing.") def _setup(self): - with open((self.tmp_location / "mispzmq_settings.json").as_posix()) as settings_file: - self.settings = json.load(settings_file) + with open((self.tmp_location / "mispzmq_settings.json").as_posix(), 'rb') as settings_file: + self.settings = json.loads(settings_file.read()) + self.namespace = self.settings["redis_namespace"] # Check if TLS is being used with Redis host redis_host = self.settings["redis_host"] redis_ssl = redis_host.startswith("tls://") if redis_host.startswith("tls://"): redis_host = redis_host[6:] - self.r = redis.StrictRedis(host=redis_host, db=self.settings["redis_database"], + self.redis = redis.StrictRedis(host=redis_host, db=self.settings["redis_database"], password=self.settings["redis_password"], port=self.settings["redis_port"], - decode_responses=True, ssl=redis_ssl) + ssl=redis_ssl) self.timestamp_settings = time.time() self._logger.debug("Connected to Redis {}:{}/{}".format(self.settings["redis_host"], self.settings["redis_port"], self.settings["redis_database"])) @@ -122,34 +127,38 @@ class MispZmq: self.socket.disable_monitor() self.monitor_thread = None - def _handle_command(self, command): - if command == "kill": + def _handle_command(self, command: bytes): + if command == b"kill": self._logger.info("Kill command received, shutting down.") self.clean() sys.exit() - elif command == "reload": + elif command == b"reload": self._logger.info("Reload command received, reloading settings from file.") self._setup() self._setup_zmq() - elif command == "status": + elif command == b"status": self._logger.info("Status command received, responding with latest stats.") - self.r.delete("{}:status".format(self.namespace)) - self.r.lpush("{}:status".format(self.namespace), + self.redis.delete(f"{self.namespace}:status") + self.redis.lpush(f"{self.namespace}:status", json.dumps({"timestamp": time.time(), "timestampSettings": self.timestamp_settings, "publishCount": self.publish_count, "messageCount": self.message_count})) else: - self._logger.warning("Received invalid command '{}'.".format(command)) + self._logger.warning(f"Received invalid command '{command}'.") def _create_pid_file(self): with open(self.pidfile.as_posix(), "w") as f: f.write(str(os.getpid())) - def _pub_message(self, topic, data): - self.socket.send_string("{} {}".format(topic, data)) + def _pub_message(self, topic: bytes, data: typing.Union[str, bytes]): + data_to_send = bytearray() + data_to_send.extend(topic) + data_to_send.extend(b" ") + data_to_send.extend(data.encode("utf-8") if isinstance(data, str) else data) + self.socket.send(bytes(data_to_send)) def clean(self): if self.monitor_thread: @@ -179,12 +188,14 @@ class MispZmq: "misp_json_tag", "misp_json_warninglist", "misp_json_workflow" ] - lists = ["{}:command".format(self.namespace)] + lists = [f"{self.namespace}:command"] for topic in topics: - lists.append("{}:data:{}".format(self.namespace, topic)) + lists.append(f"{self.namespace}:data:{topic}") + + key_prefix = f"{self.namespace}:".encode("utf-8") while True: - data = self.r.blpop(lists, timeout=10) + data = self.redis.blpop(lists, timeout=10) if data is None: # redis timeout expired @@ -195,26 +206,30 @@ class MispZmq: "status": status_array[status_entry], "uptime": current_time - int(self.timestamp_settings) } - self._pub_message("misp_json_self", json.dumps(status_message)) - self._logger.debug("No message received for 10 seconds, sending ZMQ status message.") + self._pub_message(b"misp_json_self", json.dumps(status_message)) + self._logger.debug("No message received from Redis for 10 seconds, sending ZMQ status message.") else: key, value = data - key = key.replace("{}:".format(self.namespace), "") - if key == "command": + key = key.replace(key_prefix, b"") + if key == b"command": self._handle_command(value) - elif key.startswith("data:"): - topic = key.split(":")[1] - self._logger.debug("Received data for topic '{}', sending to ZMQ.".format(topic)) + elif key.startswith(b"data:"): + topic = key.split(b":", 1)[1] + self._logger.debug("Received data for topic %s, sending to ZMQ.", topic) self._pub_message(topic, value) self.message_count += 1 - if topic == "misp_json": + if topic == b"misp_json": self.publish_count += 1 else: - self._logger.warning("Received invalid message '{}'.".format(key)) + self._logger.warning("Received invalid message type %s.", key) if __name__ == "__main__": - mzq = MispZmq() + arg_parser = argparse.ArgumentParser(description="MISP ZeroMQ PUB server") + arg_parser.add_argument("--debug", action="store_true", help="Enable debugging messages") + parsed = arg_parser.parse_args() + + mzq = MispZmq(parsed.debug) try: mzq.main() except KeyboardInterrupt: diff --git a/app/files/scripts/mispzmq/mispzmqclient.py b/app/files/scripts/mispzmq/mispzmqclient.py new file mode 100644 index 000000000..2fd57d703 --- /dev/null +++ b/app/files/scripts/mispzmq/mispzmqclient.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +import sys +import zmq +import argparse + + +def main(port: int): + context = zmq.Context() + + print("Connecting to MISP ZeroMQ server…", file=sys.stderr) + socket = context.socket(zmq.SUB) + socket.connect(f"tcp://localhost:{port}") + socket.setsockopt(zmq.SUBSCRIBE, b"misp_") + print(f"Connected to tcp://localhost:{port}", file=sys.stderr) + + while True: + message = socket.recv() + print(message) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Example Python client for MISP ZMQ") + parser.add_argument("--port", default=50000, type=int) + parsed = parser.parse_args() + + try: + main(parsed.port) + except KeyboardInterrupt: + pass