chg: [Analyzer Queues] Add queue by group of sensors (TODO: add sensor uuid in the UI)

gallypette-patch-1
Terrtia 2020-02-28 16:52:48 +01:00
parent aabf74f2f3
commit 4d55d601a1
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
9 changed files with 262 additions and 82 deletions
server
workers
workers_2/meta_types_modules
workers_3
workers_4
workers_8

View File

@ -6,6 +6,7 @@ save_directory = None
[D4_Server]
# registration or shared-secret
server_mode = registration
analyzer_queues_max_size = 10000
[Flask_Server]
# UI port number

230
server/lib/Analyzer_Queue.py Executable file
View File

@ -0,0 +1,230 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import os
import sys
import datetime
import time
import uuid
import redis
sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/'))
import ConfigLoader
### Config ###
config_loader = ConfigLoader.ConfigLoader()
r_serv_metadata = config_loader.get_redis_conn("Redis_METADATA")
r_serv_analyzer = config_loader.get_redis_conn("Redis_ANALYZER")
LIST_DEFAULT_SIZE = config_loader.get_config_int('D4_Server', 'analyzer_queues_max_size')
config_loader = None
### ###
def is_valid_uuid_v4(uuid_v4):
if uuid_v4:
uuid_v4 = uuid_v4.replace('-', '')
else:
return False
try:
uuid_test = uuid.UUID(hex=uuid_v4, version=4)
return uuid_test.hex == uuid_v4
except:
return False
def sanitize_uuid(uuid_v4):
if not is_valid_uuid_v4(uuid_v4):
uuid_v4 = str(uuid.uuid4())
return uuid_v4
def get_all_queues(r_list=None):
res = r_serv_metadata.smembers('all_analyzer_queues')
if r_list:
return list(res)
return res
def get_all_queues_standard_format(r_list=None):
res = r_serv_metadata.smembers('all_analyzer_queues')
if r_list:
return list(res)
return res
def get_all_queues_standard_extended_format(r_list=None):
res = r_serv_metadata.smembers('all_analyzer_queues')
if r_list:
return list(res)
return res
def get_all_queues_by_type(format_type, r_list=None):
'''
Get all analyzer Queues by type
:param format_type: data type
:type domain_type: int
:param r_list: return list
:type r_list: boolean
:return: list or set of queus (uuid)
:rtype: list or set
'''
# 'all_analyzer_queues_by_type'
res = r_serv_metadata.smembers('analyzer:{}'.format(format_type))
if r_list:
return list(res)
return res
def get_all_queues_group_by_type(format_type, r_list=None):
res = r_serv_metadata.smembers('analyzer_uuid_group:{}'.format(format_type))
if r_list:
return list(res)
return res
def get_all_queues_by_sensor_group(format_type, sensor_uuid, r_list=None):
print('sensor:queues:{}:{}'.format(format_type, sensor_uuid))
res = r_serv_metadata.smembers('sensor:queues:{}:{}'.format(format_type, sensor_uuid))
if r_list:
return list(res)
return res
def get_queue_group_all_sensors(queue_uuid, r_list=None):
res = r_serv_metadata.smembers('analyzer_sensor_group:{}'.format(queue_uuid))
if r_list:
return list(res)
return res
def get_queue_last_seen(queue_uuid, f_date='str_time'):
res = r_serv_metadata.hget('analyzer:{}'.format(queue_uuid), 'last_updated')
if f_date == 'str_date':
if res is None:
res = 'Never'
else:
res = datetime.datetime.fromtimestamp(float(res)).strftime('%Y-%m-%d %H:%M:%S')
return res
def get_queue_max_size(queue_uuid):
max_size = r_serv_metadata.hget('analyzer:{}'.format(queue_uuid), 'max_size')
if max_size is None:
max_size = LIST_DEFAULT_SIZE
return max_size
def get_queue_metadata(queue_uuid, format_type=None, f_date='str_date'):
dict_queue_meta = {}
dict_queue_meta['uuid'] = queue_uuid
dict_queue_meta['size_limit'] = get_queue_max_size(queue_uuid)
dict_queue_meta['last_updated'] = get_queue_last_seen(queue_uuid, f_date=f_date)
dict_queue_meta['description'] = r_serv_metadata.hget('analyzer:{}'.format(queue_uuid), 'description')
if dict_queue_meta['description'] is None:
dict_queue_meta['description'] = ''
if format_type:
dict_queue_meta['length'] = r_serv_analyzer.llen('analyzer:{}:{}'.format(format_type, queue_uuid))
if dict_queue_meta['length'] is None:
dict_queue_meta['length'] = 0
return dict_queue_meta
def edit_queue_description(queue_uuid, description):
if r_serv_metadata.exists('analyzer:{}'.format(queue_uuid)) and description:
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'description', description)
def edit_queue_max_size(queue_uuid, max_size):
try:
max_size = int(max_size)
except:
return 'analyzer max size, Invalid Integer'
if r_serv_metadata.exists('analyzer:{}'.format(queue_uuid)) and max_size > 0:
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'max_size', max_size)
# create queu by type or by group of uuid
# # TODO: add size limit
def create_queues(format_type, queue_uuid=None, l_uuid=[], queue_type='list', metatype_name=None, description=None):
queue_uuid = sanitize_uuid(queue_uuid)
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'type', format_type)
edit_queue_description(queue_uuid, description)
# # TODO: check l_uuid is valid
if l_uuid:
analyzer_key_name = 'analyzer_uuid_group'
else:
analyzer_key_name = 'analyzer'
if format_type == 254:
# TODO: check metatype_name
r_serv_metadata.sadd('{}:{}:{}'.format(analyzer_key_name, format_type, metatype_name), queue_uuid)
else:
r_serv_metadata.sadd('{}:{}'.format(analyzer_key_name, format_type), queue_uuid)
# Group by UUID
if l_uuid:
# # TODO: check sensor_uuid is valid
for sensor_uuid in l_uuid:
sensor_uuid = sensor_uuid.replace('-', '')
r_serv_metadata.sadd('analyzer_sensor_group:{}'.format(queue_uuid), sensor_uuid)
r_serv_metadata.sadd('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid)
# ALL
r_serv_metadata.sadd('all_analyzer_queues', queue_uuid)
return queue_uuid
# format_type int or str (extended type)
def add_data_to_queue(sensor_uuid, format_type, data):
if data:
# by data type
for queue_uuid in get_all_queues_by_type(format_type):
r_serv_analyzer.lpush('analyzer:{}:{}'.format(format_type, queue_uuid), data)
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
analyser_queue_max_size = get_queue_max_size(queue_uuid)
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(format_type, queue_uuid), 0, analyser_queue_max_size)
# by data type
for queue_uuid in get_all_queues_by_sensor_group(format_type, sensor_uuid):
r_serv_analyzer.lpush('analyzer:{}:{}'.format(format_type, queue_uuid), data)
r_serv_metadata.hset('analyzer:{}'.format(queue_uuid), 'last_updated', time.time())
analyser_queue_max_size = get_queue_max_size(queue_uuid)
r_serv_analyzer.ltrim('analyzer:{}:{}'.format(format_type, queue_uuid), 0, analyser_queue_max_size)
def is_queue_group_of_sensors(queue_uuid):
if r_serv_metadata.exists('analyzer_sensor_group:{}'.format(queue_uuid)):
return True
else:
return False
def flush_queue(queue_uuid, format_type):
r_serv_analyzer.delete('analyzer:{}:{}'.format(format_type, queue_uuid))
def remove_queues(queue_uuid, format_type, metatype_name=None):
if not is_valid_uuid_v4(queue_uuid):
return {'error': 'Invalid uuid'}
# delete metadata
r_serv_metadata.delete('analyzer:{}'.format(queue_uuid))
# delete queue group of sensors uuid
l_sensors_uuid = get_queue_group_all_sensors(queue_uuid, r_list=None)
if l_sensors_uuid:
r_serv_metadata.delete('analyzer_sensor_group:{}'.format(queue_uuid))
for sensor_uuid in l_sensors_uuid:
r_serv_metadata.srem('sensor:queues:{}:{}'.format(format_type, sensor_uuid), queue_uuid)
if l_sensors_uuid:
analyzer_key_name = 'analyzer_uuid_group'
else:
analyzer_key_name = 'analyzer'
if format_type == 254:
# TODO: check metatype_name
r_serv_metadata.srem('{}:{}:{}'.format(analyzer_key_name, format_type, metatype_name), queue_uuid)
else:
r_serv_metadata.srem('{}:{}'.format(analyzer_key_name, format_type), queue_uuid)
r_serv_metadata.srem('all_analyzer_queues', queue_uuid)
# delete qeue
r_serv_analyzer.delete('analyzer:{}:{}'.format(format_type, queue_uuid))
def get_sensor_queues(sensor_uuid):
pass
if __name__ == '__main__':
#create_queues(3, l_uuid=['03c00bcf-fe53-46a1-85bb-ee6084cb5bb2'])
remove_queues('a2e6f95c-1efe-4d2b-a0f5-d8e205d85670', 3)

View File

@ -29,6 +29,7 @@ sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib'))
from User import User
import Sensor
import ConfigLoader
import Analyzer_Queue
# Import Blueprint
from blueprints.restApi import restApi
@ -532,6 +533,9 @@ def server_management():
len_queue = 0
list_analyzer_uuid.append({'uuid': analyzer_uuid, 'description': description_analyzer, 'size_limit': size_limit,'last_updated': last_updated, 'length': len_queue})
for analyzer_uuid in Analyzer_Queue.get_all_queues_group_by_type(type):
list_analyzer_uuid.append(Analyzer_Queue.get_queue_metadata(analyzer_uuid, type))
list_accepted_types.append({"id": int(type), "description": description, 'list_analyzer_uuid': list_analyzer_uuid})
list_accepted_extended_types = []
@ -789,25 +793,20 @@ def uuid_change_description():
@login_required
@login_user_basic
def add_new_analyzer():
type = request.args.get('type')
format_type = request.args.get('type')
user = request.args.get('redirect')
metatype_name = request.args.get('metatype_name')
analyzer_description = request.args.get('analyzer_description')
analyzer_uuid = request.args.get('analyzer_uuid')
if is_valid_uuid_v4(analyzer_uuid):
try:
type = int(type)
if type < 0:
format_type = int(format_type)
if format_type < 0:
return 'type, Invalid Integer'
except:
return 'type, Invalid Integer'
if type == 254:
# # TODO: check metatype_name
redis_server_metadata.sadd('analyzer:{}:{}'.format(type, metatype_name), analyzer_uuid)
else:
redis_server_metadata.sadd('analyzer:{}'.format(type), analyzer_uuid)
if redis_server_metadata.exists('analyzer:{}:{}'.format(type, metatype_name)) or redis_server_metadata.exists('analyzer:{}'.format(type)):
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'description', analyzer_description)
Analyzer_Queue.create_queues(format_type, queue_uuid=analyzer_uuid, l_uuid=[], queue_type='list', metatype_name=metatype_name, description=analyzer_description)
if user:
return redirect(url_for('server_management'))
else:
@ -818,20 +817,13 @@ def add_new_analyzer():
@login_user_basic
def empty_analyzer_queue():
analyzer_uuid = request.args.get('analyzer_uuid')
type = request.args.get('type')
format_type = request.args.get('type')
metatype_name = request.args.get('metatype_name')
user = request.args.get('redirect')
if is_valid_uuid_v4(analyzer_uuid):
try:
type = int(type)
if type < 0:
return 'type, Invalid Integer'
except:
return 'type, Invalid Integer'
if type == 254:
redis_server_analyzer.delete('analyzer:{}:{}'.format(metatype_name, analyzer_uuid))
else:
redis_server_analyzer.delete('analyzer:{}:{}'.format(type, analyzer_uuid))
if format_type == 254:
format_type = metatype_name
Analyzer_Queue.flush_queue(analyzer_uuid, format_type)
if user:
return redirect(url_for('server_management'))
else:
@ -842,23 +834,11 @@ def empty_analyzer_queue():
@login_user_basic
def remove_analyzer():
analyzer_uuid = request.args.get('analyzer_uuid')
type = request.args.get('type')
format_type = request.args.get('type')
metatype_name = request.args.get('metatype_name')
user = request.args.get('redirect')
if is_valid_uuid_v4(analyzer_uuid):
try:
type = int(type)
if type < 0:
return 'type, Invalid Integer'
except:
return 'type, Invalid Integer'
if type == 254:
redis_server_metadata.srem('analyzer:{}:{}'.format(type, metatype_name), analyzer_uuid)
redis_server_analyzer.delete('analyzer:{}:{}'.format(metatype_name, analyzer_uuid))
else:
redis_server_metadata.srem('analyzer:{}'.format(type), analyzer_uuid)
redis_server_analyzer.delete('analyzer:{}:{}'.format(type, analyzer_uuid))
redis_server_metadata.delete('analyzer:{}'.format(analyzer_uuid))
Analyzer_Queue.remove_queues(analyzer_uuid, format_type)
if user:
return redirect(url_for('server_management'))
else:
@ -872,13 +852,7 @@ def analyzer_change_max_size():
user = request.args.get('redirect')
max_size_analyzer = request.args.get('max_size_analyzer')
if is_valid_uuid_v4(analyzer_uuid):
try:
max_size_analyzer = int(max_size_analyzer)
if max_size_analyzer < 0:
return 'analyzer max size, Invalid Integer'
except:
return 'analyzer max size, Invalid Integer'
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'max_size', max_size_analyzer)
Analyzer_Queue.edit_queue_max_size(analyzer_uuid, max_size_analyzer)
if user:
return redirect(url_for('server_management'))
else:

View File

@ -84,13 +84,13 @@ if __name__ == "__main__":
new_date = datetime.datetime.now().strftime("%Y%m%d")
# get all directory files
all_files = os.listdir(worker_data_directory)
not_compressed_file = []
# filter: get all not compressed files
for file in all_files:
if file.endswith('.cap'):
not_compressed_file.append(os.path.join(worker_data_directory, file))
if os.path.isdir(worker_data_directory):
all_files = os.listdir(worker_data_directory)
for file in all_files:
if file.endswith('.cap'):
not_compressed_file.append(os.path.join(worker_data_directory, file))
if not_compressed_file:
### check time-change (minus one hour) ###

View File

@ -11,6 +11,7 @@ import subprocess
sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/'))
import ConfigLoader
import Analyzer_Queue
def data_incorrect_format(stream_name, session_uuid, uuid):
redis_server_stream.sadd('Error:IncorrectType', session_uuid)
@ -39,14 +40,7 @@ def compress_file(file_full_path, i=0):
shutil.copyfileobj(f_in, f_out)
os.remove(file_full_path)
# save full path in anylyzer queue
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)):
analyzer_uuid = analyzer_uuid.decode()
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename)
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
if analyser_queue_max_size is None:
analyser_queue_max_size = analyzer_list_max_default_size
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
Analyzer_Queue.add_data_to_queue(uuid, type, compressed_filename)
config_loader = ConfigLoader.ConfigLoader()
redis_server_stream = config_loader.get_redis_conn("Redis_STREAM", decode_responses=False)

View File

@ -11,6 +11,7 @@ import datetime
sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/'))
import ConfigLoader
import Analyzer_Queue
DEFAULT_FILE_EXTENSION = 'txt'
@ -172,15 +173,7 @@ class MetaTypesDefault:
os.remove(file_full_path)
def send_to_analyzers(self, data_to_send):
## save full path in anylyzer queue
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}:{}'.format(TYPE, self.get_type_name())):
analyzer_uuid = analyzer_uuid.decode()
redis_server_analyzer.lpush('analyzer:{}:{}'.format(self.get_type_name(), analyzer_uuid), data_to_send)
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
if analyser_queue_max_size is None:
analyser_queue_max_size = analyzer_list_max_default_size
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(self.get_type_name(), analyzer_uuid), 0, analyser_queue_max_size)
Analyzer_Queue.add_data_to_queue(self.uuid, self.get_type_name(), data_to_send)
######## GET FUNCTIONS ########

View File

@ -11,6 +11,7 @@ import datetime
sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/'))
import ConfigLoader
import Analyzer_Queue
def data_incorrect_format(session_uuid):
print('Incorrect format')
@ -33,8 +34,6 @@ config_loader = None
type = 3
rotation_save_cycle = 300 #seconds
analyzer_list_max_default_size = 10000
max_buffer_length = 10000
save_to_file = True
@ -112,14 +111,8 @@ if __name__ == "__main__":
if b'\n' in data[b'message']:
all_line = data[b'message'].split(b'\n')
for line in all_line[:-1]:
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)):
analyzer_uuid = analyzer_uuid.decode()
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), line)
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
if analyser_queue_max_size is None:
analyser_queue_max_size = analyzer_list_max_default_size
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
Analyzer_Queue.add_data_to_queue(uuid, type, line)
# analyzer_uuid = analyzer_uuid.decode()
# keep incomplete line
if all_line[-1] != b'':
buffer += all_line[-1]

View File

@ -9,6 +9,7 @@ import datetime
sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/'))
import ConfigLoader
import Analyzer_Queue
def data_incorrect_format(session_uuid):
print('Incorrect format')

View File

@ -11,6 +11,7 @@ import datetime
sys.path.append(os.path.join(os.environ['D4_HOME'], 'lib/'))
import ConfigLoader
import Analyzer_Queue
def data_incorrect_format(session_uuid):
print('Incorrect format')
@ -112,14 +113,7 @@ if __name__ == "__main__":
if b'\n' in data[b'message']:
all_line = data[b'message'].split(b'\n')
for line in all_line[:-1]:
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)):
analyzer_uuid = analyzer_uuid.decode()
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), line)
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
if analyser_queue_max_size is None:
analyser_queue_max_size = analyzer_list_max_default_size
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
Analyzer_Queue.add_data_to_queue(uuid, type, line)
# keep incomplete line
if all_line[-1] != b'':
buffer += all_line[-1]