diff --git a/diagnostic.py b/diagnostic.py index 9374f9f..edb138e 100755 --- a/diagnostic.py +++ b/diagnostic.py @@ -6,14 +6,21 @@ import time import functools import configparser import diagnostic_util -import redis -import zmq -import json -from halo import Halo from pprint import pprint +try: + import redis + import zmq + import json + import flask + from halo import Halo +except ModuleNotFoundError as e: + print('Dependency not met. Either not in a virtualenv or dependency not installed.') + print(f'- Error: {e}') + sys.exit(1) ''' Steps: +- check if dependencies exists - check if virtualenv exists - check if configuration is update-to-date - check file permission @@ -74,7 +81,10 @@ def add_spinner(_func=None, name='dots'): elif isinstance(status, bool) and not status: spinner.fail(text) else: - spinner.warn(text) + if status == 'info': + spinner.info(text) + else: + spinner.warn(text) return status return wrapper_add_spinner @@ -167,34 +177,100 @@ def check_buffer_queue(spinner): host=configuration_file.get('RedisGlobal', 'host'), port=configuration_file.getint('RedisGlobal', 'port'), db=configuration_file.getint('RedisLIST', 'db')) + warning_threshold = 100 elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName')) - if elements_in_list > 100: - return ('warning', f'Currently {elements_in_list} in the buffer') - else: - return (True, f'Currently {elements_in_list} in the buffer') + return_status = 'warning' if elements_in_list > warning_threshold else ('info' if elements_in_list > 0 else True) + return_text = f'Currently {elements_in_list} items in the buffer' + return (return_status, return_text) @add_spinner -def check_subscriber_dispatcher_status(spinner): +def check_buffer_change_rate(spinner): redis_server = redis.StrictRedis( host=configuration_file.get('RedisGlobal', 'host'), port=configuration_file.getint('RedisGlobal', 'port'), db=configuration_file.getint('RedisLIST', 'db')) + content = {'content': time.time()} redis_server.rpush(configuration_file.get('RedisLIST', 'listName'), - json.dumps({'zmq_name': 'diagnostic_channel', 'content': time.time()}) + json.dumps({'zmq_name': 'diagnostic_channel', 'content': 'diagnostic_channel ' + json.dumps(content)}) ) - counter = 0 + + time_slept = 0 + sleep_duration = 0.001 + sleep_max = 10.0 + refresh_frequency = 1.0 + next_refresh = 0 + change_increase = 0 + change_decrease = 0 + elements_in_list_prev = 0 + elements_in_list = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName'))) + elements_in_inlist_init = elements_in_list + consecutive_no_rate_change = 0 + while True: + elements_in_list_prev = elements_in_list + elements_in_list = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName'))) + change_increase += elements_in_list - elements_in_list_prev if elements_in_list - elements_in_list_prev > 0 else 0 + change_decrease += elements_in_list_prev - elements_in_list if elements_in_list_prev - elements_in_list > 0 else 0 + + if next_refresh < time_slept: + next_refresh = time_slept + refresh_frequency + change_rate_text = f'↑ {change_increase}/sec\t↓ {change_decrease}/sec' + spinner.text = f'Buffer: {elements_in_list}\t, {change_rate_text}' + + if consecutive_no_rate_change == 3: + time_slept = sleep_max + if elements_in_list == 0: + consecutive_no_rate_change += 1 + else: + consecutive_no_rate_change = 0 + change_increase = 0 + change_decrease = 0 + + if time_slept >= sleep_max: + return_flag = elements_in_inlist_init < elements_in_list or elements_in_list == 0 + return_text = f'Buffer is consumed {"faster" if return_flag else "slower" } than being populated' + break + + time.sleep(sleep_duration) + time_slept += sleep_duration + elements_in_inlist_final = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName'))) + return (return_flag, return_text) + + +@add_spinner +def check_dispatcher_status(spinner): + redis_server = redis.StrictRedis( + host=configuration_file.get('RedisGlobal', 'host'), + port=configuration_file.getint('RedisGlobal', 'port'), + db=configuration_file.getint('RedisLIST', 'db')) + content = {'content': time.time()} + redis_server.rpush(configuration_file.get('RedisLIST', 'listName'), + json.dumps({'zmq_name': 'diagnostic_channel', 'content': 'diagnostic_channel ' + json.dumps(content)}) + ) + + return_flag = False + return_text = '' + time_slept = 0 + sleep_duration = 0.2 + sleep_max = 10.0 + redis_server.delete('diagnostic_tool_response') while True: reply = redis_server.get('diagnostic_tool_response') elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName')) if reply is None: - time.sleep(0.2) - spinner.text = f'No response yet. Element in queue {elements_in_list}' - counter += 1 + if time_slept >= sleep_max: + return_flag = False + return_text = f'''zmq_dispatcher did not respond in the given time ({int(sleep_max)}sec)''' + break + time.sleep(sleep_duration) + spinner.text = f'No response yet' + time_slept += sleep_duration else: - return (True, f'Took {reply} to complete.') + return_flag = True + return_text = f'Took {float(reply):.2f}s to complete' + break - return (False, '') + return (return_flag, return_text) @add_spinner @@ -218,8 +294,9 @@ def start_diagnostic(): check_file_permission() check_redis() # check_zmq() - check_buffer_queue() - check_subscriber_dispatcher_status() + if check_buffer_queue() is not True: + check_buffer_change_rate() + check_dispatcher_status() check_server_listening() check_static_endpoint() check_dynamic_enpoint() diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py index 8e261a0..9f795fc 100755 --- a/zmq_dispatcher.py +++ b/zmq_dispatcher.py @@ -239,7 +239,7 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False, parent live_helper.publish_log(zmq_name, attributeType, jsonobj) def handler_diagnostic_tool(zmq_name, jsonobj): - res = time.time() - jsonobj['content'] + res = time.time() - float(jsonobj['content']) serv_list.set('diagnostic_tool_response', str(res)) ############### @@ -260,7 +260,7 @@ def main(sleeptime): while True: content = serv_list.rpop(LISTNAME) if content is None: - logger.debug('Processed {} message(s) since last sleep.'.format(numMsg)) + logger.info('Processed {} message(s) since last sleep.'.format(numMsg)) numMsg = 0 time.sleep(sleeptime) continue