2019-06-14 16:15:07 +02:00
#!/usr/bin/env python3
import os
import sys
import stat
import time
2019-06-17 11:41:14 +02:00
import signal
2019-06-14 16:15:07 +02:00
import functools
import configparser
from pprint import pprint
2019-06-17 11:41:14 +02:00
import subprocess
import diagnostic_util
2019-06-17 09:50:26 +02:00
import redis
import zmq
import json
import flask
2019-06-17 14:42:41 +02:00
import requests
2019-06-17 09:50:26 +02:00
from halo import Halo
except ModuleNotFoundError as e:
print('Dependency not met. Either not in a virtualenv or dependency not installed.')
2019-08-28 15:49:40 +02:00
print('- Error: {}'.format(e))
2019-06-17 09:50:26 +02:00
2019-06-14 16:15:07 +02:00
2019-06-17 09:50:26 +02:00
- check if dependencies exists
2019-06-14 16:15:07 +02:00
- check if virtualenv exists
- check if configuration is update-to-date
- check file permission
- check if redis is running and responding
- check if able to connect to zmq
- check zmq_dispatcher processing queue
- check queue status: being filled up / being filled down
- check if subscriber responding
- check if dispatcher responding
- check if server listening
- check log static endpoint
- check log dynamic endpoint
2019-06-17 14:42:41 +02:00
HOST = ''
PORT = 8001 # overriden by configuration file
2019-06-14 16:15:07 +02:00
configuration_file = {}
2019-06-17 11:41:14 +02:00
pgrep_subscriber_output = ''
pgrep_dispatcher_output = ''
2019-06-17 14:47:51 +02:00
signal.signal(signal.SIGALRM, diagnostic_util.timeout_handler)
2019-06-14 16:15:07 +02:00
def humanize(name, isResult=False):
words = name.split('_')
if isResult:
words = words[1:]
words[0] = words[0][0].upper() + words[0][1:]
words[0] = words[0][0].upper() + words[0][1:] + 'ing'
return ' '.join(words)
def add_spinner(_func=None, name='dots'):
def decorator_add_spinner(func):
def wrapper_add_spinner(*args, **kwargs):
human_func_name = humanize(func.__name__)
human_func_result = humanize(func.__name__, isResult=True)
flag_skip = False
with Halo(text=human_func_name, spinner=name) as spinner:
result = func(spinner, *args, **kwargs)
if isinstance(result, tuple):
status, output = result
elif isinstance(result, list):
status = result[0]
output = result[1]
elif isinstance(result, bool):
status = result
output = None
status = False
flag_skip = True
2019-08-28 15:49:40 +02:00
spinner.fail('{} - Function return unexpected result: {}'.format(human_func_name, str(result)))
2019-06-14 16:15:07 +02:00
if not flag_skip:
text = human_func_result
if output is not None and len(output) > 0:
2019-08-28 15:49:40 +02:00
text += ': {}'.format(output)
2019-06-14 16:15:07 +02:00
if isinstance(status, bool) and status:
elif isinstance(status, bool) and not status:
2019-06-17 09:50:26 +02:00
if status == 'info':
2019-06-14 16:15:07 +02:00
return status
return wrapper_add_spinner
if _func is None:
return decorator_add_spinner
return decorator_add_spinner(_func)
2019-06-18 12:30:07 +02:00
def check_virtual_environment_and_packages(spinner):
2019-06-14 16:15:07 +02:00
result = os.environ.get('VIRTUAL_ENV')
if result is None:
return (False, 'This diagnostic tool should be started inside a virtual environment.')
2019-06-18 12:30:07 +02:00
if redis.__version__.startswith('2'):
2019-08-28 15:49:40 +02:00
return (False, '''Redis python client have version {}. Version 3.x required.
\t➥ [inside virtualenv] pip3 install -U redis'''.format(redis.__version__))
2019-06-18 12:30:07 +02:00
return (True, '')
2019-06-14 16:15:07 +02:00
def check_configuration(spinner):
2019-06-17 14:42:41 +02:00
global configuration_file, port
2019-06-14 16:15:07 +02:00
configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg')
cfg = configparser.ConfigParser()
configuration_file = cfg
cfg = {s: dict(cfg.items(s)) for s in cfg.sections()}
2019-06-20 15:40:14 +02:00
configfile_default = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg.default')
2019-06-14 16:15:07 +02:00
cfg_default = configparser.ConfigParser()
2019-06-20 15:40:14 +02:00
2019-06-14 16:15:07 +02:00
cfg_default = {s: dict(cfg_default.items(s)) for s in cfg_default.sections()}
# Check if all fields from config.default exists in config
result, faulties = diagnostic_util.dict_compare(cfg_default, cfg)
if result:
2019-06-17 14:42:41 +02:00
port = configuration_file.get("Server", "port")
2019-06-14 16:15:07 +02:00
return (True, '')
2019-06-20 15:40:14 +02:00
return_text = '''Configuration incomplete.
\tUpdate your configuration file `config.cfg`.\n\t➥ Faulty fields:\n'''
for field_name in faulties:
2019-08-28 15:49:40 +02:00
return_text += '\t\t- {}\n'.format(field_name)
2019-06-20 15:40:14 +02:00
return (False, return_text)
2019-06-14 16:15:07 +02:00
def check_file_permission(spinner):
max_mind_database_path = configuration_file.get('RedisMap', 'pathmaxminddb')
st = os.stat(max_mind_database_path)
all_read_perm = bool(st.st_mode & stat.S_IROTH) # FIXME: permission may be changed
if all_read_perm:
return (True, '')
return (False, 'Maxmin GeoDB might have incorrect read file permission')
def check_redis(spinner):
redis_server = redis.StrictRedis(
host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLog', 'db'))
if redis_server.ping():
return (True, '')
return (False, '''Can\'t reach Redis server.
\t➥ Make sure it is running and adapt your configuration accordingly''')
def check_zmq(spinner):
2019-06-17 15:53:03 +02:00
timeout = 15
2019-06-14 16:15:07 +02:00
context = zmq.Context()
2019-06-21 16:19:57 +02:00
misp_instances = json.loads(configuration_file.get('RedisGlobal', 'misp_instances'))
2019-06-21 15:55:03 +02:00
instances_status = {}
for misp_instance in misp_instances:
2019-06-21 16:19:57 +02:00
socket = context.socket(zmq.SUB)
2019-06-21 15:55:03 +02:00
socket.setsockopt_string(zmq.SUBSCRIBE, '')
poller = zmq.Poller()
2019-06-21 16:19:57 +02:00
flag_skip = False
2019-06-21 15:55:03 +02:00
start_time = time.time()
poller.register(socket, zmq.POLLIN)
for t in range(1, timeout+1):
socks = dict(poller.poll(timeout=1*1000))
if len(socks) > 0:
if socket in socks and socks[socket] == zmq.POLLIN:
rcv_string = socket.recv()
if rcv_string.startswith(b'misp_json'):
instances_status[misp_instance.get('name')] = True
2019-06-21 16:19:57 +02:00
flag_skip = True
2019-06-21 15:55:03 +02:00
2019-08-28 15:49:40 +02:00
spinner.text = 'checking zmq of {} - elapsed time: {}s'.format(misp_instance.get("name"), int(time.time() - start_time))
2019-06-21 16:19:57 +02:00
if not flag_skip:
instances_status[misp_instance.get('name')] = False
2019-06-21 15:55:03 +02:00
results = [s for n, s in instances_status.items()]
if all(results):
return (True, '')
elif any(results):
2019-06-21 16:19:57 +02:00
return_text = 'Connection to ZMQ stream(s) failed.\n'
for name, status in instances_status.items():
2019-08-28 15:49:40 +02:00
return_text += '\t➥ {}: {}\n'.format(name, "success" if status else "failed")
2019-06-21 15:55:03 +02:00
return (True, return_text)
2019-06-14 16:15:07 +02:00
2019-06-21 16:19:57 +02:00
return (False, '''Can\'t connect to the ZMQ stream(s).
2019-06-14 16:15:07 +02:00
\t➥ Make sure the MISP ZMQ is running: `/servers/serverSettings/diagnostics`
\t➥ Make sure your network infrastucture allows you to connect to the ZMQ''')
2019-06-17 11:41:14 +02:00
def check_processes_status(spinner):
global pgrep_subscriber_output, pgrep_dispatcher_output
response = subprocess.check_output(
["pgrep", "-laf", "zmq_"],
for line in response.splitlines():
2019-06-21 16:19:57 +02:00
lines = line.split(' ', maxsplit=1)
pid, p_name = lines
2019-06-19 11:30:49 +02:00
2019-06-17 11:41:14 +02:00
if 'zmq_subscriber.py' in p_name:
pgrep_subscriber_output = line
elif 'zmq_dispatcher.py' in p_name:
pgrep_dispatcher_output = line
if len(pgrep_subscriber_output) == 0:
return (False, 'zmq_subscriber is not running')
elif len(pgrep_dispatcher_output) == 0:
return (False, 'zmq_dispatcher is not running')
return (True, 'Both processes are running')
def check_subscriber_status(spinner):
global pgrep_subscriber_output
pool = redis.ConnectionPool(
host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db'),
monitor = diagnostic_util.Monitor(pool)
commands = monitor.monitor()
2019-06-17 15:53:03 +02:00
start_time = time.time()
2019-06-17 11:41:14 +02:00
for i, c in enumerate(commands):
if i == 0: # Skip 'OK'
split = c.split()
action = split[3]
target = split[4]
except IndexError:
2019-08-28 15:49:40 +02:00
if action == '"LPUSH"' and target == '\"{}\"'.format(configuration_file.get("RedisLIST", "listName")):
2019-06-17 11:41:14 +02:00
2019-06-17 15:53:03 +02:00
2019-08-28 15:49:40 +02:00
spinner.text = 'Checking subscriber status - elapsed time: {}s'.format(int(time.time() - start_time))
2019-06-17 14:47:51 +02:00
except diagnostic_util.TimeoutException:
2019-08-28 15:49:40 +02:00
return_text = '''zmq_subscriber seems not to be working.
\t➥ Consider restarting it: {}'''.format(pgrep_subscriber_output)
2019-06-17 11:41:14 +02:00
return (False, return_text)
return (True, 'subscriber is running and populating the buffer')
2019-06-14 16:15:07 +02:00
def check_buffer_queue(spinner):
redis_server = redis.StrictRedis(
host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db'))
2019-06-17 09:50:26 +02:00
warning_threshold = 100
2019-06-14 16:15:07 +02:00
elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName'))
2019-06-17 09:50:26 +02:00
return_status = 'warning' if elements_in_list > warning_threshold else ('info' if elements_in_list > 0 else True)
2019-08-28 15:49:40 +02:00
return_text = 'Currently {} items in the buffer'.format(elements_in_list)
2019-06-17 09:50:26 +02:00
return (return_status, return_text)
def check_buffer_change_rate(spinner):
redis_server = redis.StrictRedis(
host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db'))
time_slept = 0
sleep_duration = 0.001
sleep_max = 10.0
refresh_frequency = 1.0
next_refresh = 0
change_increase = 0
change_decrease = 0
elements_in_list_prev = 0
elements_in_list = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName')))
elements_in_inlist_init = elements_in_list
consecutive_no_rate_change = 0
while True:
elements_in_list_prev = elements_in_list
elements_in_list = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName')))
change_increase += elements_in_list - elements_in_list_prev if elements_in_list - elements_in_list_prev > 0 else 0
change_decrease += elements_in_list_prev - elements_in_list if elements_in_list_prev - elements_in_list > 0 else 0
if next_refresh < time_slept:
next_refresh = time_slept + refresh_frequency
2019-08-28 15:49:40 +02:00
change_rate_text = '↑ {}/sec\t↓ {}/sec'.format(change_increase, change_decrease)
spinner.text = 'Buffer: {}\t{}'.format(elements_in_list, change_rate_text)
2019-06-17 09:50:26 +02:00
if consecutive_no_rate_change == 3:
time_slept = sleep_max
if elements_in_list == 0:
consecutive_no_rate_change += 1
consecutive_no_rate_change = 0
change_increase = 0
change_decrease = 0
if time_slept >= sleep_max:
2019-06-17 16:16:05 +02:00
return_flag = elements_in_list == 0 or (elements_in_list < elements_in_inlist_init or elements_in_list < 2)
2019-08-28 15:49:40 +02:00
return_text = 'Buffer is consumed {} than being populated'.format("faster" if return_flag else "slower")
2019-06-17 09:50:26 +02:00
time_slept += sleep_duration
elements_in_inlist_final = int(redis_server.llen(configuration_file.get('RedisLIST', 'listName')))
return (return_flag, return_text)
2019-06-14 16:15:07 +02:00
2019-06-17 09:50:26 +02:00
def check_dispatcher_status(spinner):
2019-06-14 16:59:00 +02:00
redis_server = redis.StrictRedis(
host=configuration_file.get('RedisGlobal', 'host'),
port=configuration_file.getint('RedisGlobal', 'port'),
db=configuration_file.getint('RedisLIST', 'db'))
2019-06-17 09:50:26 +02:00
content = {'content': time.time()}
2019-06-14 16:59:00 +02:00
redis_server.rpush(configuration_file.get('RedisLIST', 'listName'),
2019-06-17 09:50:26 +02:00
json.dumps({'zmq_name': 'diagnostic_channel', 'content': 'diagnostic_channel ' + json.dumps(content)})
2019-06-14 16:59:00 +02:00
2019-06-17 09:50:26 +02:00
return_flag = False
return_text = ''
time_slept = 0
sleep_duration = 0.2
sleep_max = 10.0
2019-06-14 16:59:00 +02:00
while True:
reply = redis_server.get('diagnostic_tool_response')
elements_in_list = redis_server.llen(configuration_file.get('RedisLIST', 'listName'))
if reply is None:
2019-06-17 09:50:26 +02:00
if time_slept >= sleep_max:
return_flag = False
2019-08-28 15:49:40 +02:00
return_text = 'zmq_dispatcher did not respond in the given time ({}s)'.format(int(sleep_max))
2019-06-17 16:16:05 +02:00
if len(pgrep_dispatcher_output) > 0:
2019-08-28 15:49:40 +02:00
return_text += '\n\t➥ Consider restarting it: {}'.format(pgrep_dispatcher_output)
2019-06-17 16:16:05 +02:00
return_text += '\n\t➥ Consider starting it'
2019-06-17 09:50:26 +02:00
2019-08-28 15:49:40 +02:00
spinner.text = 'Dispatcher status: No response yet'
2019-06-17 09:50:26 +02:00
time_slept += sleep_duration
2019-06-14 16:59:00 +02:00
2019-06-17 09:50:26 +02:00
return_flag = True
2019-08-28 16:04:45 +02:00
return_text = 'Took {:.2f}s to complete'.format(float(reply))
2019-06-17 09:50:26 +02:00
2019-06-14 16:15:07 +02:00
2019-06-17 09:50:26 +02:00
return (return_flag, return_text)
2019-06-14 16:15:07 +02:00
def check_server_listening(spinner):
2019-08-28 15:49:40 +02:00
url = '{}:{}/_get_log_head'.format(HOST, PORT)
spinner.text = 'Trying to connect to {}'.format(url)
2019-06-17 15:09:08 +02:00
r = requests.get(url)
except requests.exceptions.ConnectionError:
2019-08-28 15:49:40 +02:00
return (False, 'Can\'t connect to {}').format(url)
2019-06-17 14:42:41 +02:00
return (
r.status_code == 200,
2019-08-28 15:49:40 +02:00
'{} {}reached. Status code [{}]'.format(url, "not " if r.status_code != 200 else "", r.status_code)
2019-06-17 14:42:41 +02:00
2019-06-14 16:15:07 +02:00
2019-06-17 14:42:41 +02:00
def check_server_dynamic_enpoint(spinner):
sleep_max = 15
start_time = time.time()
2019-08-28 15:49:40 +02:00
url = '{}:{}/_logs'.format(HOST, PORT)
2019-06-17 14:42:41 +02:00
p = subprocess.Popen(
['curl', '-sfN', '--header', 'Accept: text/event-stream', url],
2019-06-17 15:53:03 +02:00
return_flag = False
2019-08-28 15:49:40 +02:00
return_text = 'Dynamic endpoint returned data but not in the correct format.'
2019-06-17 14:42:41 +02:00
for line in iter(p.stdout.readline, b''):
if line.startswith(b'data: '):
data = line[6:]
j = json.loads(data)
return_flag = True
2019-08-28 16:04:45 +02:00
return_text = 'Dynamic endpoint returned data (took {:.2f}s)'.format(time.time()-start_time)
2019-06-17 14:42:41 +02:00
except Exception as e:
return_flag = False
2019-08-28 15:49:40 +02:00
return_text = 'Something went wrong. Output {}'.format(line)
2019-06-17 14:42:41 +02:00
2019-06-17 14:47:51 +02:00
except diagnostic_util.TimeoutException:
2019-08-28 15:49:40 +02:00
return_text = 'Dynamic endpoint did not returned data in the given time ({}sec)'.format(int(time.time()-start_time))
2019-06-17 14:42:41 +02:00
return (return_flag, return_text)
2019-06-14 16:15:07 +02:00
def start_diagnostic():
2019-06-18 12:30:07 +02:00
if not (check_virtual_environment_and_packages() and check_configuration()):
2019-06-14 16:15:07 +02:00
2019-06-17 11:41:14 +02:00
2019-06-17 09:50:26 +02:00
if check_buffer_queue() is not True:
2019-06-17 16:16:05 +02:00
dispatcher_running = check_dispatcher_status()
if check_server_listening() and dispatcher_running:
2019-06-17 15:09:08 +02:00
2019-06-14 16:15:07 +02:00
def main():
if __name__ == '__main__':