diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index f3b3a71f..5aa42bc9 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -120,6 +120,8 @@ function launching_logs { screen -S "Logging_AIL" -X screen -t "LogQueue" bash -c "cd ${AIL_BIN}; ${AIL_VENV}/bin/log_subscriber -p 6380 -c Queuing -l ../logs/; read x" sleep 0.1 screen -S "Logging_AIL" -X screen -t "LogScript" bash -c "cd ${AIL_BIN}; ${AIL_VENV}/bin/log_subscriber -p 6380 -c Script -l ../logs/; read x" + sleep 0.1 + screen -S "Logging_AIL" -X screen -t "LogScript" bash -c "cd ${AIL_BIN}; ${AIL_VENV}/bin/log_subscriber -p 6380 -c Sync -l ../logs/; read x" } function launching_queues { diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index 285c0162..13ea781d 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -33,7 +33,7 @@ config_loader = None #### LOGS #### # redis_logger = publisher # redis_logger.port = 6380 -# redis_logger.channel = 'AIL_SYNC' +# redis_logger.channel = 'Sync' ##-- LOGS --## def is_valid_uuid_v4(UUID): @@ -298,14 +298,14 @@ def get_ail_instance_by_key(key): # def check_acl_sync_queue_ail(ail_uuid, queue_uuid, key): # return is_ail_instance_queue(ail_uuid, queue_uuid) -def update_ail_instance_key(ail_uuid, new_key): - old_key = get_ail_instance_key(ail_uuid) - r_serv_sync.srem(f'ail:instance:key:all', old_key) - r_serv_sync.delete(f'ail:instance:key:{old_key}') - - r_serv_sync.sadd(f'ail:instance:key:all', new_key) - r_serv_sync.delete(f'ail:instance:key:{new_key}', ail_uuid) - r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', new_key) +# def update_ail_instance_key(ail_uuid, new_key): +# old_key = get_ail_instance_key(ail_uuid) +# r_serv_sync.srem(f'ail:instance:key:all', old_key) +# r_serv_sync.delete(f'ail:instance:key:{old_key}') +# +# r_serv_sync.sadd(f'ail:instance:key:all', new_key) +# r_serv_sync.delete(f'ail:instance:key:{new_key}', ail_uuid) +# r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', new_key) #- AIL KEYS -# @@ -348,20 +348,33 @@ def is_ail_instance_sync_enabled(ail_uuid, sync_mode=None): else: return False -def change_pull_push_state(ail_uuid, pull=False, push=False): - # sanityze pull/push - if pull: - pull = True - else: - pull = False - if push: - push = True - else: - push = False - r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'push', push) - r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'pull', pull) - set_last_updated_sync_config() - refresh_ail_instance_connection(ail_uuid) +def change_pull_push_state(ail_uuid, pull=None, push=None): + edited = False + curr_pull = is_ail_instance_pull_enabled(ail_uuid) + curr_push = is_ail_instance_push_enabled(ail_uuid) + if pull is not None: + # sanityze pull + if pull: + pull = True + else: + pull = False + if curr_pull != pull: + print('pull hset') + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'pull', pull) + edited = True + if push is not None: + # sanityze push + if push: + push = True + else: + push = False + if curr_push != push: + print('push hset') + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'push', push) + edited = True + if edited: + set_last_updated_sync_config() + refresh_ail_instance_connection(ail_uuid) def get_ail_server_version(ail_uuid): return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'version') @@ -380,7 +393,7 @@ def get_ail_instance_metadata(ail_uuid, sync_queues=False): dict_meta['url'] = get_ail_instance_url(ail_uuid) dict_meta['description'] = get_ail_instance_description(ail_uuid) dict_meta['pull'] = is_ail_instance_pull_enabled(ail_uuid) - dict_meta['push'] = is_ail_instance_pull_enabled(ail_uuid) + dict_meta['push'] = is_ail_instance_push_enabled(ail_uuid) dict_meta['ping'] = get_ail_server_ping(ail_uuid) dict_meta['version'] = get_ail_server_version(ail_uuid) dict_meta['error'] = get_ail_server_error(ail_uuid) @@ -408,6 +421,39 @@ def get_ail_instances_metadata(l_ail_servers): l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) return l_servers +def edit_ail_instance_key(ail_uuid, new_key): + key = get_ail_instance_key(ail_uuid) + if new_key and key != new_key: + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', new_key) + r_serv_sync.srem('ail:instance:key:all', key) + r_serv_sync.srem('ail:instance:key:all', new_key) + r_serv_sync.delete(f'ail:instance:key:{key}') + r_serv_sync.set(f'ail:instance:key:{new_key}', ail_uuid) + refresh_ail_instance_connection(ail_uuid) + +def edit_ail_instance_url(ail_uuid, new_url): + url = get_ail_instance_url(ail_uuid) + if new_url and new_url != url: + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'url', new_url) + refresh_ail_instance_connection(ail_uuid) + +def edit_ail_instance_pull_push(ail_uuid, new_pull, new_push): + pull = is_ail_instance_pull_enabled(ail_uuid) + push = is_ail_instance_push_enabled(ail_uuid) + if new_pull == pull: + new_pull = None + if new_push == push: + new_push = None + change_pull_push_state(ail_uuid, pull=new_pull, push=new_push) + +def edit_ail_instance_description(ail_uuid, new_description): + description = get_ail_instance_description(ail_uuid) + if new_description is not None and description != new_description: + if not new_description: + r_serv_sync.hdel(f'ail:instance:{ail_uuid}', 'description') + else: + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'description', new_description) + # # TODO: VALIDATE URL # API KEY def create_ail_instance(ail_uuid, url, api_key=None, description=None, pull=True, push=True): @@ -438,6 +484,7 @@ def delete_ail_instance(ail_uuid): r_serv_sync.srem('ail:instance:all', ail_uuid) set_last_updated_sync_config() refresh_ail_instance_connection(ail_uuid) + clear_save_ail_server_error(ail_uuid) return ail_uuid ## WEBSOCKET API - ERRORS ## @@ -512,8 +559,10 @@ def ping_remote_ail_server(ail_uuid): if response: response = response.get('message', False) pong = response == 'pong' - set_ail_server_ping(ail_uuid, pong) - return pong + else: + pong = False + set_ail_server_ping(ail_uuid, pong) + return pong ## API ## @@ -571,6 +620,47 @@ def api_create_ail_instance(json_dict): pull=pull, push=push) return res, 200 +def api_edit_ail_instance(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + pull = json_dict.get('pull') + push = json_dict.get('push') + if pull is not None: + if pull: + pull = True + else: + pull = False + if push is not None: + if push: + push = True + else: + push = False + edit_ail_instance_pull_push(ail_uuid, pull, push) + + description = json_dict.get('description') + edit_ail_instance_description(ail_uuid, description) + + ail_url = json_dict.get('url') + if ail_url: + ail_url = ail_url.replace(' ', '') + if not is_valid_websocket_url(ail_url): + return {"status": "error", "reason": "Invalid websocket url"}, 400 + edit_ail_instance_url(ail_uuid, ail_url) + + ail_key = json_dict.get('key') + if ail_key: + ail_key = ail_key.replace(' ', '') + if not is_valid_websocket_key(ail_key): + return {"status": "error", "reason": "Invalid websocket key"}, 400 + edit_ail_instance_key(ail_uuid, ail_key) + + return ail_uuid, 200 + def api_delete_ail_instance(json_dict): ail_uuid = json_dict.get('uuid', '').replace(' ', '') if not is_valid_uuid_v4(ail_uuid): @@ -680,6 +770,31 @@ def unregister_ail_to_sync_queue(ail_uuid, queue_uuid): def get_all_unregistred_queue_by_ail_instance(ail_uuid): return r_serv_sync.sdiff('ail2ail:sync_queue:all', f'ail:instance:sync_queue:{ail_uuid}') +def edit_sync_queue_name(queue_uuid, new_name): + name = get_sync_queue_name(queue_uuid) + if new_name and new_name != name: + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'name', new_name) + +def edit_sync_queue_description(queue_uuid, new_description): + description = get_sync_queue_description(queue_uuid) + if new_description is not None and new_description != description: + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'description', new_description) + +# # TODO: trigger update +def edit_sync_queue_max_size(queue_uuid, new_max_size): + max_size = get_sync_queue_max_size(queue_uuid) + if new_max_size > 0 and new_max_size != max_size: + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'max_size', new_max_size) + +def edit_sync_queue_filter_tags(queue_uuid, new_tags): + tags = set(get_sync_queue_filter(queue_uuid)) + new_tags = set(new_tags) + if new_tags and new_tags != tags: + r_serv_sync.delete(f'ail2ail:sync_queue:filter:tags:{queue_uuid}') + for tag in new_tags: + r_serv_sync.sadd(f'ail2ail:sync_queue:filter:tags:{queue_uuid}', tag) + set_last_updated_sync_config() + # # TODO: optionnal name ??? # # TODO: SANITYZE TAGS def create_sync_queue(name, tags=[], description=None, max_size=100): @@ -718,9 +833,9 @@ def api_create_sync_queue(json_dict): tags = json_dict.get('tags') if not tags: - {"status": "error", "reason": "no tags provided"}, 400 + return {"status": "error", "reason": "no tags provided"}, 400 if not Tag.are_enabled_tags(tags): - {"status": "error", "reason": "Invalid/Disabled tags"}, 400 + return {"status": "error", "reason": "Invalid/Disabled tags"}, 400 max_size = json_dict.get('max_size') if not max_size: @@ -728,7 +843,7 @@ def api_create_sync_queue(json_dict): try: max_size = int(max_size) except ValueError: - {"status": "error", "reason": "Invalid queue size value"}, 400 + return {"status": "error", "reason": "Invalid queue size value"}, 400 if not max_size > 0: return {"status": "error", "reason": "Invalid queue size value"}, 400 @@ -736,6 +851,41 @@ def api_create_sync_queue(json_dict): max_size=max_size) return queue_uuid, 200 +def api_edit_sync_queue(json_dict): + queue_uuid = json_dict.get('uuid', '').replace(' ', '').replace('-', '') + if not is_valid_uuid_v4(queue_uuid): + return {"status": "error", "reason": "Invalid Queue uuid"}, 400 + if not exists_sync_queue(queue_uuid): + return {"status": "error", "reason": "Queue Sync not found"}, 404 + + description = json_dict.get('description') + description = escape(description) + if description is not None: + edit_sync_queue_description(queue_uuid, description) + + queue_name = json_dict.get('name') + if queue_name: + queue_name = escape(queue_name) + edit_sync_queue_name(queue_uuid, queue_name) + + tags = json_dict.get('tags') + if tags: + if not Tag.are_enabled_tags(tags): + return {"status": "error", "reason": "Invalid/Disabled tags"}, 400 + edit_sync_queue_filter_tags(queue_uuid, tags) + + max_size = json_dict.get('max_size') + if max_size: + try: + max_size = int(max_size) + except ValueError: + return {"status": "error", "reason": "Invalid queue size value"}, 400 + if not max_size > 0: + return {"status": "error", "reason": "Invalid queue size value"}, 400 + edit_sync_queue_max_size(queue_uuid, max_size) + + return queue_uuid, 200 + def api_delete_sync_queue(json_dict): queue_uuid = json_dict.get('uuid', '').replace(' ', '').replace('-', '') if not is_valid_uuid_v4(queue_uuid): @@ -872,12 +1022,12 @@ if __name__ == '__main__': # res = get_all_unregistred_queue_by_ail_instance(ail_uuid) ail_uuid = 'c3c2f3ef-ca53-4ff6-8317-51169b73f731' - ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + ail_uuid = '2dfeff47-777d-4e70-8c30-07c059307e6a' # res = ping_remote_ail_server(ail_uuid) # print(res) # - res = get_remote_ail_server_version(ail_uuid) + res = ping_remote_ail_server(ail_uuid) #res = _get_remote_ail_server_response(ail_uuid, 'pin') print(res) diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py index 797c5872..6308cab4 100755 --- a/bin/core/ail_2_ail_client.py +++ b/bin/core/ail_2_ail_client.py @@ -6,6 +6,7 @@ import json import os import sys import time +import traceback from pubsublogger import publisher from urllib.parse import urljoin @@ -23,7 +24,7 @@ from core import ail_2_ail #### LOGS #### redis_logger = publisher redis_logger.port = 6380 -redis_logger.channel = 'AIL_SYNC_client' +redis_logger.channel = 'Sync' ##-- LOGS --## #################################################################### @@ -69,7 +70,6 @@ async def push(websocket, ail_uuid): else: await asyncio.sleep(10) - async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): if not ail_2_ail.exists_ail_instance(ail_uuid): print('AIL server not found') @@ -91,9 +91,10 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): ail_2_ail.clear_save_ail_server_error(ail_uuid) try: - async with websockets.connect( + async with websockets.client.connect( uri, ssl=ssl_context, + #open_timeout=10, websockers 10.0 /!\ python>=3.7 extra_headers={"Authorization": f"{ail_key}"} ) as websocket: @@ -123,24 +124,36 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): error_message = str(e) if error_message: sys.stderr.write(error_message) - redis_logger.warning(f'{error_message}: {ail_uuid}') + redis_logger.warning(f'{ail_uuid}: {error_message}') ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except websockets.exceptions.InvalidURI as e: error_message = f'Invalid AIL url: {e.uri}' sys.stderr.write(error_message) - redis_logger.warning(f'{error_message}: {ail_uuid}') + redis_logger.warning(f'{ail_uuid}: {error_message}') ail_2_ail.save_ail_server_error(ail_uuid, error_message) except ConnectionError as e: error_message = str(e) sys.stderr.write(error_message) - redis_logger.info(f'{error_message}: {ail_uuid}') + redis_logger.info(f'{ail_uuid}: {error_message}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + # OSError: Multiple exceptions + except OSError as e: # # TODO: check if we need to check if is connection error + error_message = str(e) + sys.stderr.write(error_message) + redis_logger.info(f'{ail_uuid}: {error_message}') ail_2_ail.save_ail_server_error(ail_uuid, error_message) except websockets.exceptions.ConnectionClosedOK as e: print('connection closed') - # except Exception as e: - # print(e) - - + except Exception as err: + trace = traceback.format_tb(err.__traceback__) + if len(trace) == 1: + trace = trace[0] + trace = str(trace) + error_message = f'{trace}\n{str(err)}' + sys.stderr.write(error_message) + redis_logger.critical(f'{ail_uuid}: {error_message}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) if __name__ == '__main__': diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index d4e98a1e..c0a0dd21 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -22,7 +22,7 @@ from core import ail_2_ail #### LOGS #### redis_logger = publisher redis_logger.port = 6380 -redis_logger.channel = 'AIL_SYNC_Server' +redis_logger.channel = 'Sync' ############################# diff --git a/bin/packages/Tag.py b/bin/packages/Tag.py index bc044593..cdb5d5a6 100755 --- a/bin/packages/Tag.py +++ b/bin/packages/Tag.py @@ -228,6 +228,7 @@ def is_enabled_taxonomie_tag(tag, enabled_taxonomies=None): return False if not is_taxonomie_tag_enabled(taxonomie, tag): return False + return True def is_enabled_galaxy_tag(tag, enabled_galaxies=None): if not enabled_galaxies: @@ -241,6 +242,16 @@ def is_enabled_galaxy_tag(tag, enabled_galaxies=None): return False return True +def sort_tags_taxonomies_galaxies(tags): + taxonomies_tags = [] + galaxies_tags = [] + for tag in tags: + if is_taxonomie_tag(tag): + taxonomies_tags.append(tag) + else: + galaxies_tags.append(tag) + return taxonomies_tags, galaxies_tags + #### #### def is_tag_in_all_tag(tag): diff --git a/var/www/blueprints/ail_2_ail_sync.py b/var/www/blueprints/ail_2_ail_sync.py index 37643346..cd285018 100644 --- a/var/www/blueprints/ail_2_ail_sync.py +++ b/var/www/blueprints/ail_2_ail_sync.py @@ -123,13 +123,11 @@ def ail_server_add(): if register_key: input_dict['key'] = request.form.get("ail_key") - print(input_dict) - res = ail_2_ail.api_create_ail_instance(input_dict) if res[1] != 200: return create_json_response(res[0], res[1]) - return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=res)) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=res[0])) else: return render_template("add_ail_server.html") @@ -138,7 +136,29 @@ def ail_server_add(): @login_required @login_admin def ail_server_edit(): - ail_uuid = request.args.get('ail_uuid') + ail_uuid = request.args.get('uuid') + if request.method == 'POST': + ail_uuid = request.form.get("ail_uuid") + ail_key = request.form.get("ail_key") + url = request.form.get("ail_url") + description = request.form.get("ail_description") + pull = request.form.get("ail_pull", False) + push = request.form.get("ail_push", False) + if pull: + pull = True + if push: + push = True + + input_dict = {"uuid": ail_uuid, "url": url, + "description": description, + "key": ail_key, + "pull": pull, "push": push} + res = ail_2_ail.api_edit_ail_instance(input_dict) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=res[0])) + else: + server_metadata = ail_2_ail.get_ail_instance_metadata(ail_uuid) + return render_template("edit_ail_server.html", server_metadata=server_metadata) + @ail_2_ail_sync.route('/settings/ail_2_ail/server/delete', methods=['GET']) @login_required @@ -193,8 +213,8 @@ def ail_server_sync_queues_unregister(): #### SYNC QUEUE #### @ail_2_ail_sync.route('/settings/ail_2_ail/sync_queues', methods=['GET']) -# @login_required -# @login_admin +@login_required +@login_admin def sync_queues(): ail_uuid = request.args.get('ail_uuid') l_queues = ail_2_ail.get_all_queues_metadata() @@ -202,8 +222,8 @@ def sync_queues(): ail_uuid=ail_uuid, l_queues=l_queues) @ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/view', methods=['GET']) -# @login_required -# @login_admin +@login_required +@login_admin def sync_queue_view(): queue_uuid = request.args.get('uuid') queue_metadata = ail_2_ail.get_sync_queue_metadata(queue_uuid) @@ -245,19 +265,61 @@ def sync_queue_add(): if res[1] != 200: return create_json_response(res[0], res[1]) - return redirect(url_for('ail_2_ail_sync.sync_queue_view', uuid=res)) + return redirect(url_for('ail_2_ail_sync.sync_queue_view', uuid=res[0])) else: return render_template("add_sync_queue.html", tags_selector_data=Tag.get_tags_selector_data()) @ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/edit', methods=['GET', 'POST']) -# @login_required -# @login_admin +@login_required +@login_admin def sync_queue_edit(): - return '' + if request.method == 'POST': + queue_uuid = request.form.get("queue_uuid") + queue_name = request.form.get("queue_name") + description = request.form.get("queue_description") + max_size = request.form.get("queue_max_size") + + taxonomies_tags = request.form.get('taxonomies_tags') + if taxonomies_tags: + try: + taxonomies_tags = json.loads(taxonomies_tags) + except Exception: + taxonomies_tags = [] + else: + taxonomies_tags = [] + galaxies_tags = request.form.get('galaxies_tags') + if galaxies_tags: + try: + galaxies_tags = json.loads(galaxies_tags) + except Exception: + galaxies_tags = [] + + tags = taxonomies_tags + galaxies_tags + input_dict = {"uuid": queue_uuid, + "name": queue_name, + "tags": tags, + "description": description, + "max_size": max_size} + + res = ail_2_ail.api_edit_sync_queue(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + + return redirect(url_for('ail_2_ail_sync.sync_queue_view', uuid=res[0])) + else: + queue_uuid = request.args.get('uuid') + queue_metadata = ail_2_ail.get_sync_queue_metadata(queue_uuid) + taxonomies_tags, galaxies_tags = Tag.sort_tags_taxonomies_galaxies(queue_metadata['tags']) + tags_selector_data = Tag.get_tags_selector_data() + tags_selector_data['taxonomies_tags'] = taxonomies_tags + tags_selector_data['galaxies_tags'] = galaxies_tags + return render_template("edit_sync_queue.html", queue_metadata=queue_metadata, + bootstrap_label=bootstrap_label, + tags_selector_data=tags_selector_data) @ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/delete', methods=['GET']) -# @login_required -# @login_admin +@login_required +@login_admin def sync_queue_delete(): queue_uuid = request.args.get('uuid') input_dict = {"uuid": queue_uuid} diff --git a/var/www/templates/ail_2_ail/add_sync_queue.html b/var/www/templates/ail_2_ail/add_sync_queue.html index 6d4507b4..7127f20c 100644 --- a/var/www/templates/ail_2_ail/add_sync_queue.html +++ b/var/www/templates/ail_2_ail/add_sync_queue.html @@ -103,13 +103,8 @@ + + + + + + +
+ + {% include 'nav_bar.html' %} + +