From 9d4c86010b57abbd4e3b39b48f6d7eeaa4ed381a Mon Sep 17 00:00:00 2001 From: mokaddem Date: Mon, 17 Jun 2019 11:41:14 +0200 Subject: [PATCH] chg: Added more processes and subscriber tests --- diagnostic.py | 79 ++++++++++++++++++++++++++++++++++++++++++++-- diagnostic_util.py | 32 +++++++++++++++++++ zmq_subscriber.py | 2 ++ 3 files changed, 110 insertions(+), 3 deletions(-) diff --git a/diagnostic.py b/diagnostic.py index edb138e..78c7fad 100755 --- a/diagnostic.py +++ b/diagnostic.py @@ -3,10 +3,12 @@ import os import sys import stat import time +import signal import functools import configparser -import diagnostic_util from pprint import pprint +import subprocess +import diagnostic_util try: import redis import zmq @@ -36,6 +38,20 @@ Steps: ''' configuration_file = {} +pgrep_subscriber_output = '' +pgrep_dispatcher_output = '' + + +class TimeoutException(Exception): + pass + + +def timeout_handler(signum, frame): + raise TimeoutException + + +signal.signal(signal.SIGALRM, timeout_handler) + def humanize(name, isResult=False): @@ -171,6 +187,60 @@ def check_zmq(spinner): \t➥ Make sure your network infrastucture allows you to connect to the ZMQ''') +@add_spinner +def check_processes_status(spinner): + global pgrep_subscriber_output, pgrep_dispatcher_output + response = subprocess.check_output( + ["pgrep", "-laf", "zmq_"], + universal_newlines=True + ) + for line in response.splitlines(): + pid, _, p_name = line.split(' ') + if 'zmq_subscriber.py' in p_name: + pgrep_subscriber_output = line + elif 'zmq_dispatcher.py' in p_name: + pgrep_dispatcher_output = line + + if len(pgrep_subscriber_output) == 0: + return (False, 'zmq_subscriber is not running') + elif len(pgrep_dispatcher_output) == 0: + return (False, 'zmq_dispatcher is not running') + else: + return (True, 'Both processes are running') + + +@add_spinner +def check_subscriber_status(spinner): + global pgrep_subscriber_output + pool = redis.ConnectionPool( + host=configuration_file.get('RedisGlobal', 'host'), + port=configuration_file.getint('RedisGlobal', 'port'), + db=configuration_file.getint('RedisLIST', 'db'), + decode_responses=True) + monitor = diagnostic_util.Monitor(pool) + commands = monitor.monitor() + + signal.alarm(15) + try: + for i, c in enumerate(commands): + if i == 0: # Skip 'OK' + continue + split = c.split() + try: + action = split[3] + target = split[4] + except IndexError: + pass + if action == '"LPUSH"' and target == f'\"{configuration_file.get("RedisLIST", "listName")}\"': + signal.alarm(0) + break + except TimeoutException: + return_text = f'''zmq_subscriber seems not to be working. +\t➥ Consider restarting it: {pgrep_subscriber_output}''' + return (False, return_text) + return (True, 'subscriber is running and populating the buffer') + + @add_spinner def check_buffer_queue(spinner): redis_server = redis.StrictRedis( @@ -260,7 +330,8 @@ def check_dispatcher_status(spinner): if reply is None: if time_slept >= sleep_max: return_flag = False - return_text = f'''zmq_dispatcher did not respond in the given time ({int(sleep_max)}sec)''' + return_text = f'''zmq_dispatcher did not respond in the given time ({int(sleep_max)}sec) +\t➥ Consider restarting it: {pgrep_dispatcher_output}''' break time.sleep(sleep_duration) spinner.text = f'No response yet' @@ -293,7 +364,9 @@ def start_diagnostic(): return check_file_permission() check_redis() - # check_zmq() + check_zmq() + check_processes_status() + check_subscriber_status() if check_buffer_queue() is not True: check_buffer_change_rate() check_dispatcher_status() diff --git a/diagnostic_util.py b/diagnostic_util.py index d6838dd..daa98e1 100644 --- a/diagnostic_util.py +++ b/diagnostic_util.py @@ -18,3 +18,35 @@ def dict_compare(dict1, dict2): return (True, []) else: return (False, faulties) + + +# https://stackoverflow.com/a/10464730 +class Monitor(): + def __init__(self, connection_pool): + self.connection_pool = connection_pool + self.connection = None + + def __del__(self): + try: + self.reset() + except Exception: + pass + + def reset(self): + if self.connection: + self.connection_pool.release(self.connection) + self.connection = None + + def monitor(self): + if self.connection is None: + self.connection = self.connection_pool.get_connection( + 'monitor', None) + self.connection.send_command("monitor") + return self.listen() + + def parse_response(self): + return self.connection.read_response() + + def listen(self): + while True: + yield self.parse_response() diff --git a/zmq_subscriber.py b/zmq_subscriber.py index ad9e548..2948163 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -60,6 +60,8 @@ def main(zmqName): put_in_redis_list(zmqName, content) except KeyboardInterrupt: return + except Exception as e: + logger.warning('Error:' + str(e)) if __name__ == "__main__":