chg: Added more tests in diagnostic tool

diagnosticTool
mokaddem 2019-06-17 09:50:26 +02:00
parent 51095d685c
commit b4c2193b1a
2 changed files with 98 additions and 21 deletions

View File

@ -6,14 +6,21 @@ import time
import functools import functools
import configparser import configparser
import diagnostic_util import diagnostic_util
import redis
import zmq
import json
from halo import Halo
from pprint import pprint from pprint import pprint
try:
import redis
import zmq
import json
import flask
from halo import Halo
except ModuleNotFoundError as e:
print('Dependency not met. Either not in a virtualenv or dependency not installed.')
print(f'- Error: {e}')
sys.exit(1)
''' '''
Steps: Steps:
- check if dependencies exists
- check if virtualenv exists - check if virtualenv exists
- check if configuration is update-to-date - check if configuration is update-to-date
- check file permission - check file permission
@ -74,7 +81,10 @@ def add_spinner(_func=None, name='dots'):
elif isinstance(status, bool) and not status: elif isinstance(status, bool) and not status:
spinner.fail(text) spinner.fail(text)
else: else:
spinner.warn(text) if status == 'info':
spinner.info(text)
else:
spinner.warn(text)
return status return status
return wrapper_add_spinner return wrapper_add_spinner
@ -167,34 +177,100 @@ def check_buffer_queue(spinner):
host=configuration_file.get('RedisGlobal', 'host'), host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'), port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db')) db=configuration_file.getint('RedisLIST', 'db'))
warning_threshold = 100
elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName')) elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName'))
if elements_in_list > 100: return_status = 'warning' if elements_in_list > warning_threshold else ('info' if elements_in_list > 0 else True)
return ('warning', f'Currently {elements_in_list} in the buffer') return_text = f'Currently {elements_in_list} items in the buffer'
else: return (return_status, return_text)
return (True, f'Currently {elements_in_list} in the buffer')
@add_spinner @add_spinner
def check_subscriber_dispatcher_status(spinner): def check_buffer_change_rate(spinner):
redis_server = redis.StrictRedis( redis_server = redis.StrictRedis(
host=configuration_file.get('RedisGlobal', 'host'), host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'), port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db')) db=configuration_file.getint('RedisLIST', 'db'))
content = {'content': time.time()}
redis_server.rpush(configuration_file.get('RedisLIST', 'listName'), redis_server.rpush(configuration_file.get('RedisLIST', 'listName'),
json.dumps({'zmq_name': 'diagnostic_channel', 'content': time.time()}) json.dumps({'zmq_name': 'diagnostic_channel', 'content': 'diagnostic_channel ' + json.dumps(content)})
) )
counter = 0
time_slept = 0
sleep_duration = 0.001
sleep_max = 10.0
refresh_frequency = 1.0
next_refresh = 0
change_increase = 0
change_decrease = 0
elements_in_list_prev = 0
elements_in_list = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName')))
elements_in_inlist_init = elements_in_list
consecutive_no_rate_change = 0
while True:
elements_in_list_prev = elements_in_list
elements_in_list = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName')))
change_increase += elements_in_list - elements_in_list_prev if elements_in_list - elements_in_list_prev > 0 else 0
change_decrease += elements_in_list_prev - elements_in_list if elements_in_list_prev - elements_in_list > 0 else 0
if next_refresh < time_slept:
next_refresh = time_slept + refresh_frequency
change_rate_text = f'{change_increase}/sec\t{change_decrease}/sec'
spinner.text = f'Buffer: {elements_in_list}\t, {change_rate_text}'
if consecutive_no_rate_change == 3:
time_slept = sleep_max
if elements_in_list == 0:
consecutive_no_rate_change += 1
else:
consecutive_no_rate_change = 0
change_increase = 0
change_decrease = 0
if time_slept >= sleep_max:
return_flag = elements_in_inlist_init < elements_in_list or elements_in_list == 0
return_text = f'Buffer is consumed {"faster" if return_flag else "slower" } than being populated'
break
time.sleep(sleep_duration)
time_slept += sleep_duration
elements_in_inlist_final = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName')))
return (return_flag, return_text)
@add_spinner
def check_dispatcher_status(spinner):
redis_server = redis.StrictRedis(
host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db'))
content = {'content': time.time()}
redis_server.rpush(configuration_file.get('RedisLIST', 'listName'),
json.dumps({'zmq_name': 'diagnostic_channel', 'content': 'diagnostic_channel ' + json.dumps(content)})
)
return_flag = False
return_text = ''
time_slept = 0
sleep_duration = 0.2
sleep_max = 10.0
redis_server.delete('diagnostic_tool_response')
while True: while True:
reply = redis_server.get('diagnostic_tool_response') reply = redis_server.get('diagnostic_tool_response')
elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName')) elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName'))
if reply is None: if reply is None:
time.sleep(0.2) if time_slept >= sleep_max:
spinner.text = f'No response yet. Element in queue {elements_in_list}' return_flag = False
counter += 1 return_text = f'''zmq_dispatcher did not respond in the given time ({int(sleep_max)}sec)'''
break
time.sleep(sleep_duration)
spinner.text = f'No response yet'
time_slept += sleep_duration
else: else:
return (True, f'Took {reply} to complete.') return_flag = True
return_text = f'Took {float(reply):.2f}s to complete'
break
return (False, '') return (return_flag, return_text)
@add_spinner @add_spinner
@ -218,8 +294,9 @@ def start_diagnostic():
check_file_permission() check_file_permission()
check_redis() check_redis()
# check_zmq() # check_zmq()
check_buffer_queue() if check_buffer_queue() is not True:
check_subscriber_dispatcher_status() check_buffer_change_rate()
check_dispatcher_status()
check_server_listening() check_server_listening()
check_static_endpoint() check_static_endpoint()
check_dynamic_enpoint() check_dynamic_enpoint()

View File

@ -239,7 +239,7 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False, parent
live_helper.publish_log(zmq_name, attributeType, jsonobj) live_helper.publish_log(zmq_name, attributeType, jsonobj)
def handler_diagnostic_tool(zmq_name, jsonobj): def handler_diagnostic_tool(zmq_name, jsonobj):
res = time.time() - jsonobj['content'] res = time.time() - float(jsonobj['content'])
serv_list.set('diagnostic_tool_response', str(res)) serv_list.set('diagnostic_tool_response', str(res))
############### ###############
@ -260,7 +260,7 @@ def main(sleeptime):
while True: while True:
content = serv_list.rpop(LISTNAME) content = serv_list.rpop(LISTNAME)
if content is None: if content is None:
logger.debug('Processed {} message(s) since last sleep.'.format(numMsg)) logger.info('Processed {} message(s) since last sleep.'.format(numMsg))
numMsg = 0 numMsg = 0
time.sleep(sleeptime) time.sleep(sleeptime)
continue continue