From 787edbc3016081b1c2f896d48a60d215adff98d1 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 26 Feb 2019 15:45:38 +0100 Subject: [PATCH 1/6] put 0MQ subscribers into screens --- config/config.cfg.default | 19 +++++++--- start_all.sh | 18 ---------- start_zmq.sh | 42 ++++++++++++++++++++++ zmq_subscriber.py | 12 +++---- zmq_subscribers.py | 74 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 137 insertions(+), 28 deletions(-) create mode 100755 start_zmq.sh create mode 100755 zmq_subscribers.py diff --git a/config/config.cfg.default b/config/config.cfg.default index 1d18adb..192344c 100644 --- a/config/config.cfg.default +++ b/config/config.cfg.default @@ -38,10 +38,21 @@ filename=logs.log [RedisGlobal] host=localhost port=6250 -#misp_web_url = http://192.168.56.50 -misp_web_url = http://localhost -#zmq_url=tcp://192.168.56.50:50000 -zmq_url=tcp://localhost:50000 +misp_web_url = http://0.0.0.0 +misp_instances = [{ + "name": "misp1", + "url": "http://localhost", + "zmq": "tcp://localhost:50000"}] + +#misp_instances = [{ +# "name": "misp1", +# "url": "http://localhost", +# "zmq": "tcp://localhost:50000"}, +# { +# "name": "misp2", +# "url": "http://10.0.2.4", +# "zmq": "tcp://10.0.2.4:50000"} +# ] [RedisLIST] db=3 diff --git a/start_all.sh b/start_all.sh index ef00155..2379217 100755 --- a/start_all.sh +++ b/start_all.sh @@ -24,8 +24,6 @@ fi netstat -an |grep LISTEN |grep 6250 |grep -v tcp6 ; check_redis_port=$? netstat -an |grep LISTEN |grep 8001 |grep -v tcp6 ; check_dashboard_port=$? -ps auxw |grep zmq_subscriber.py |grep -v grep ; check_zmq_subscriber=$? -ps auxw |grep zmq_dispatcher.py |grep -v grep ; check_zmq_dispatcher=$? # Configure accordingly, remember: 0.0.0.0 exposes to every active IP interface, play safe and bind it to something you trust and know export FLASK_APP=server.py @@ -43,22 +41,6 @@ else echo -e $RED"\t* NOT starting Redis server, made a very unrealiable check on port 6250, and something seems to be there… please double check if this is good!"$DEFAULT fi -sleep 0.1 -if [ "${check_zmq_subscriber}" == "1" ]; then - echo -e $GREEN"\t* Launching zmq subscriber"$DEFAULT - ${ENV_PY} ./zmq_subscriber.py & -else - echo -e $RED"\t* NOT starting zmq subscriber, made a rather unrealiable ps -auxw | grep for zmq_subscriber.py, and something seems to be there… please double check if this is good!"$DEFAULT -fi - -sleep 0.1 -if [ "${check_zmq_dispatcher}" == "1" ]; then - echo -e $GREEN"\t* Launching zmq dispatcher"$DEFAULT - ${ENV_PY} ./zmq_dispatcher.py & -else - echo -e $RED"\t* NOT starting zmq dispatcher, made a rather unrealiable ps -auxw | grep for zmq_dispatcher.py, and something seems to be there… please double check if this is good!"$DEFAULT -fi - sleep 0.1 if [ "${check_dashboard_port}" == "1" ]; then echo -e $GREEN"\t* Launching flask server"$DEFAULT diff --git a/start_zmq.sh b/start_zmq.sh new file mode 100755 index 0000000..9ce96fc --- /dev/null +++ b/start_zmq.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash + +#set -x + +GREEN="\\033[1;32m" +DEFAULT="\\033[0;39m" +RED="\\033[1;31m" + +# Getting CWD where bash script resides +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +DASH_HOME="${DIR}" + +cd ${DASH_HOME} + +if [ -e "${DIR}/DASHENV/bin/python" ]; then + echo "dashboard virtualenv seems to exist, good" + ENV_PY="${DIR}/DASHENV/bin/python" +else + echo "Please make sure you have a dashboard environment, au revoir" + exit 1 +fi + +ps auxw |grep zmq_subscriber.py |grep -v grep ; check_zmq_subscriber=$? +ps auxw |grep zmq_dispatcher.py |grep -v grep ; check_zmq_dispatcher=$? + +screen -dmS "Misp_Dashboard" + +sleep 0.1 +if [ "${check_zmq_subscriber}" == "1" ]; then + echo -e $GREEN"\t* Launching zmq subscribers"$DEFAULT + screen -S "Misp_Dashboard" -X screen -t "zmq-subscribers" bash -c ${ENV_PY}' ./zmq_subscribers.py; read x' +else + echo -e $RED"\t* NOT starting zmq subscribers, made a rather unrealiable ps -auxw | grep for zmq_subscriber.py, and something seems to be there… please double check if this is good!"$DEFAULT +fi + +sleep 0.1 +if [ "${check_zmq_dispatcher}" == "1" ]; then + echo -e $GREEN"\t* Launching zmq dispatcher"$DEFAULT + screen -S "Misp_Dashboard" -X screen -t "zmq-dispacher" bash -c ${ENV_PY}' ./zmq_dispatcher.py; read x' +else + echo -e $RED"\t* NOT starting zmq dispatcher, made a rather unrealiable ps -auxw | grep for zmq_dispatcher.py, and something seems to be there… please double check if this is good!"$DEFAULT +fi diff --git a/zmq_subscriber.py b/zmq_subscriber.py index b642333..f6bfe29 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -21,7 +21,6 @@ if not os.path.exists(logDir): logging.basicConfig(filename=logPath, filemode='a', level=logging.INFO) logger = logging.getLogger('zmq_subscriber') -ZMQ_URL = cfg.get('RedisGlobal', 'zmq_url') CHANNEL = cfg.get('RedisLog', 'channel') LISTNAME = cfg.get('RedisLIST', 'listName') @@ -41,25 +40,26 @@ def put_in_redis_list(zmq_name, content): serv_list.lpush(LISTNAME, json.dumps(to_add)) logger.debug('Pushed: {}'.format(json.dumps(to_add))) -def main(zmqName): +def main(zmqName, zmqurl): context = zmq.Context() socket = context.socket(zmq.SUB) - socket.connect(ZMQ_URL) + socket.connect(zmqurl) socket.setsockopt_string(zmq.SUBSCRIBE, '') while True: try: content = socket.recv() put_in_redis_list(zmqName, content) + print(zmqName, content) except KeyboardInterrupt: return if __name__ == "__main__": - parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribes to a ZNQ then redispatch it to the misp-dashboard') + parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribes to a ZMQ then redispatch it to the misp-dashboard') parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") - parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) + parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default="tcp://localhost:50000") args = parser.parse_args() - main(args.zmqname) + main(args.zmqname, args.zmqurl) diff --git a/zmq_subscribers.py b/zmq_subscribers.py new file mode 100755 index 0000000..43711cf --- /dev/null +++ b/zmq_subscribers.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +import time, datetime +import logging +import redis +import configparser +import argparse +import os +import subprocess +import sys +import json +import atexit +import signal +import shlex +import pty +import threading + +configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg') +cfg = configparser.ConfigParser() +cfg.read(configfile) +logDir = cfg.get('Log', 'directory') +logfilename = cfg.get('Log', 'filename') +logPath = os.path.join(logDir, logfilename) +if not os.path.exists(logDir): + os.makedirs(logDir) +logging.basicConfig(filename=logPath, filemode='a', level=logging.INFO) +logger = logging.getLogger('zmq_subscriber') + +CHANNEL = cfg.get('RedisLog', 'channel') +LISTNAME = cfg.get('RedisLIST', 'listName') + +serv_list = redis.StrictRedis( + host=cfg.get('RedisGlobal', 'host'), + port=cfg.getint('RedisGlobal', 'port'), + db=cfg.getint('RedisLIST', 'db')) + +children = [] + +def signal_handler(signal, frame): + for child in children: + # We don't resume as we are already attached + cmd = "screen -p"+child+" -X {arg}" + argsc = shlex.split(cmd.format(arg = "kill")) + print("\n\033[1;31m [-] Terminating {child}\033[0;39m".format(child=child)) + logger.info('Terminate: {child}'.format(child=child)) + subprocess.call(argsc) # kill window + sys.exit(0) + +############### +## MAIN LOOP ## +############### + +def main(): + print("\033[1;31m [+] I am the subscriber's master - kill me to kill'em'all \033[0;39m") + # screen needs a shell and I an no fan of shell=True + (master, slave) = pty.openpty() + try: + for item in json.loads(cfg.get('RedisGlobal', 'misp_instances')): + name = shlex.quote(item.get("name")) + zmq = shlex.quote(item.get("zmq")) + print("\033[1;32m [+] Subscribing to "+zmq+"\033[0;39m") + logger.info('Launching: {child}'.format(child=name)) + children.append(name) + subprocess.Popen(["screen", "-r", "Misp_Dashboard", "-X", "screen", "-t", name ,sys.executable, "./zmq_subscriber.py", "-n", name, "-u", zmq], close_fds=True, shell=False, stdin=slave, stdout=slave, stderr=slave) + except ValueError as error: + print("\033[1;31m [!] Fatal exception: {error} \033[0;39m".format(error=error)) + logger.error("JSON error: %s", error) + sys.exit(1) + signal.signal(signal.SIGINT, signal_handler) + forever = threading.Event() + forever.wait() + +if __name__ == "__main__": + main() From e2dea48294dce0b8ffe673272cb5b60bed17c704 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Thu, 14 Mar 2019 11:41:07 +0100 Subject: [PATCH 2/6] Update zmq_subscribers.py Added a test comment --- zmq_subscribers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmq_subscribers.py b/zmq_subscribers.py index 43711cf..0c0b81f 100755 --- a/zmq_subscribers.py +++ b/zmq_subscribers.py @@ -68,7 +68,7 @@ def main(): sys.exit(1) signal.signal(signal.SIGINT, signal_handler) forever = threading.Event() - forever.wait() + forever.wait() # Wait for SIGINT if __name__ == "__main__": main() From 1439804d46b75598e483042dd59708d2b1014e57 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Fri, 21 Jun 2019 12:35:53 +0200 Subject: [PATCH 3/6] chg: create zmqs user + sudoer right for www-data --- install_dependencies.sh | 9 +++++++++ start_all.sh | 3 +++ 2 files changed, 12 insertions(+) diff --git a/install_dependencies.sh b/install_dependencies.sh index e9beb8e..7352109 100755 --- a/install_dependencies.sh +++ b/install_dependencies.sh @@ -3,6 +3,15 @@ set -e #set -x +sudo chmod -R g+w . + +if ! id zmqs >/dev/null 2>&1; then + # Create zmq user + sudo useradd -U -G www-data -m -s /bin/bash zmqs + # Adds right to www-data to run ./start-zmq as zmq + sudo echo "www-data ALL=(zmqs) NOPASSWD:/bin/bash /var/www/misp-dashboard/start_zmq.sh" > /etc/sudoers.d/www-data +fi + sudo apt-get install python3-virtualenv virtualenv screen redis-server unzip -y if [ -z "$VIRTUAL_ENV" ]; then diff --git a/start_all.sh b/start_all.sh index 2379217..e2845ad 100755 --- a/start_all.sh +++ b/start_all.sh @@ -48,3 +48,6 @@ if [ "${check_dashboard_port}" == "1" ]; then else echo -e $RED"\t* NOT starting flask server, made a very unrealiable check on port 8001, and something seems to be there… please double check if this is good!"$DEFAULT fi + +sleep 0.1 +sudo -u zmqs /bin/bash /var/www/misp-dashboard/start_zmq.sh & From cbbdf7cbfceee42c7e4df1b2e69861d14b886857 Mon Sep 17 00:00:00 2001 From: mokaddem Date: Fri, 21 Jun 2019 15:32:59 +0200 Subject: [PATCH 4/6] fix: mergeconflict and log filename --- zmq_subscriber.py | 2 +- zmq_subscribers.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zmq_subscriber.py b/zmq_subscriber.py index 8252b4f..7a7035b 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -73,6 +73,6 @@ if __name__ == "__main__": args = parser.parse_args() try: - main(args.zmqname) + main(args.zmqname, args.zmqurl) except redis.exceptions.ResponseError as error: print(error) diff --git a/zmq_subscribers.py b/zmq_subscribers.py index 0c0b81f..a6b2737 100755 --- a/zmq_subscribers.py +++ b/zmq_subscribers.py @@ -19,7 +19,7 @@ configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/c cfg = configparser.ConfigParser() cfg.read(configfile) logDir = cfg.get('Log', 'directory') -logfilename = cfg.get('Log', 'filename') +logfilename = cfg.get('Log', 'subscriber_filename') logPath = os.path.join(logDir, logfilename) if not os.path.exists(logDir): os.makedirs(logDir) From a9f9a67184d8208a82f41ab550b475cb67f02e90 Mon Sep 17 00:00:00 2001 From: mokaddem Date: Fri, 21 Jun 2019 15:55:03 +0200 Subject: [PATCH 5/6] chg: [diagnostic] Added support of multiple subscribers - WiP --- diagnostic.py | 43 ++++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/diagnostic.py b/diagnostic.py index 8d0faeb..a51e8a5 100755 --- a/diagnostic.py +++ b/diagnostic.py @@ -172,22 +172,35 @@ def check_zmq(spinner): timeout = 15 context = zmq.Context() socket = context.socket(zmq.SUB) - socket.connect(configuration_file.get('RedisGlobal', 'zmq_url')) - socket.setsockopt_string(zmq.SUBSCRIBE, '') - poller = zmq.Poller() + misp_instances = json.loads(cfg.get('RedisGlobal', 'misp_instances')) + instances_status = {} + for misp_instance in misp_instances: + socket.connect(misp_instance.get('zmq')) + socket.setsockopt_string(zmq.SUBSCRIBE, '') + poller = zmq.Poller() - start_time = time.time() - poller.register(socket, zmq.POLLIN) - for t in range(1, timeout+1): - socks = dict(poller.poll(timeout=1*1000)) - if len(socks) > 0: - if socket in socks and socks[socket] == zmq.POLLIN: - rcv_string = socket.recv() - if rcv_string.startswith(b'misp_json'): - return (True, '') - else: - pass - spinner.text = f'checking zmq - elapsed time: {int(time.time() - start_time)}s' + start_time = time.time() + poller.register(socket, zmq.POLLIN) + for t in range(1, timeout+1): + socks = dict(poller.poll(timeout=1*1000)) + if len(socks) > 0: + if socket in socks and socks[socket] == zmq.POLLIN: + rcv_string = socket.recv() + if rcv_string.startswith(b'misp_json'): + instances_status[misp_instance.get('name')] = True + else: + pass + spinner.text = f'checking zmq of {misp_instance.get('name')} - elapsed time: {int(time.time() - start_time)}s' + instances_status[misp_instance.get('name')] = False + + results = [s for n, s in instances_status.items()] + if all(results): + return (True, '') + elif any(results): + return_text = 'Some connection to ZMQ streams failed.\n' + for name, status: + return_text += f'\t➥ {name}: {'success' if status else 'failed'}\n' + return (True, return_text) else: return (False, '''Can\'t connect to the ZMQ stream. \t➥ Make sure the MISP ZMQ is running: `/servers/serverSettings/diagnostics` From ddf7cbcfffe478f1438225f33b86acba231c6721 Mon Sep 17 00:00:00 2001 From: mokaddem Date: Fri, 21 Jun 2019 16:19:57 +0200 Subject: [PATCH 6/6] fix: [diagnostic] socket subscribing multiple time and improved status message --- diagnostic.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/diagnostic.py b/diagnostic.py index a51e8a5..edf10d6 100755 --- a/diagnostic.py +++ b/diagnostic.py @@ -171,14 +171,15 @@ def check_redis(spinner): def check_zmq(spinner): timeout = 15 context = zmq.Context() - socket = context.socket(zmq.SUB) - misp_instances = json.loads(cfg.get('RedisGlobal', 'misp_instances')) + misp_instances = json.loads(configuration_file.get('RedisGlobal', 'misp_instances')) instances_status = {} for misp_instance in misp_instances: + socket = context.socket(zmq.SUB) socket.connect(misp_instance.get('zmq')) socket.setsockopt_string(zmq.SUBSCRIBE, '') poller = zmq.Poller() + flag_skip = False start_time = time.time() poller.register(socket, zmq.POLLIN) for t in range(1, timeout+1): @@ -188,21 +189,23 @@ def check_zmq(spinner): rcv_string = socket.recv() if rcv_string.startswith(b'misp_json'): instances_status[misp_instance.get('name')] = True + flag_skip = True + break else: - pass - spinner.text = f'checking zmq of {misp_instance.get('name')} - elapsed time: {int(time.time() - start_time)}s' - instances_status[misp_instance.get('name')] = False + spinner.text = f'checking zmq of {misp_instance.get("name")} - elapsed time: {int(time.time() - start_time)}s' + if not flag_skip: + instances_status[misp_instance.get('name')] = False results = [s for n, s in instances_status.items()] if all(results): return (True, '') elif any(results): - return_text = 'Some connection to ZMQ streams failed.\n' - for name, status: - return_text += f'\t➥ {name}: {'success' if status else 'failed'}\n' + return_text = 'Connection to ZMQ stream(s) failed.\n' + for name, status in instances_status.items(): + return_text += f'\t➥ {name}: {"success" if status else "failed"}\n' return (True, return_text) else: - return (False, '''Can\'t connect to the ZMQ stream. + return (False, '''Can\'t connect to the ZMQ stream(s). \t➥ Make sure the MISP ZMQ is running: `/servers/serverSettings/diagnostics` \t➥ Make sure your network infrastucture allows you to connect to the ZMQ''') @@ -215,11 +218,8 @@ def check_processes_status(spinner): universal_newlines=True ) for line in response.splitlines(): - lines = line.split(' ') - if len(lines) == 2: - pid, p_name = lines - elif len(lines) ==3: - pid, _, p_name = lines + lines = line.split(' ', maxsplit=1) + pid, p_name = lines if 'zmq_subscriber.py' in p_name: pgrep_subscriber_output = line