From f597e1aaa2b2cff80e20061e58babf0d9deb5790 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Tue, 30 Nov 2021 15:07:41 +0100 Subject: [PATCH] fix: [ail sync] server + client: resend object in queue on ConnectionClosedError --- bin/core/ail_2_ail.py | 45 +++++++++++++- bin/core/ail_2_ail_client.py | 60 +++++++++++++------ bin/core/ail_2_ail_server.py | 34 ++++++----- .../templates/ail_2_ail/view_ail_server.html | 12 ++-- 4 files changed, 109 insertions(+), 42 deletions(-) diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index 607bf9dd..cbd457cd 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -30,10 +30,36 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB") r_serv_sync = config_loader.get_redis_conn("ARDB_DB") config_loader = None +WEBSOCKETS_CLOSE_CODES = { + 1000: 'Normal Closure', + 1001: 'Going Away', + 1002: 'Protocol Error', + 1003: 'Unsupported Data', + 1005: 'No Status Received', + 1006: 'Abnormal Closure', + 1007: 'Invalid frame payload data', + 1008: 'Policy Violation', + 1009: 'Message too big', + 1010: 'Missing Extension', + 1011: 'Internal Error', + 1012: 'Service Restart', + 1013: 'Try Again Later', + 1014: 'Bad Gateway', + 1015: 'TLS Handshake', + } + #### LOGS #### # redis_logger = publisher # redis_logger.port = 6380 # redis_logger.channel = 'Sync' + +def get_websockets_close_message(code): + if code in WEBSOCKETS_CLOSE_CODES: + msg = f'{code} {WEBSOCKETS_CLOSE_CODES[code]}' + else: + msg = f'{code} Unknow websockets code' + return msg + ##-- LOGS --## def is_valid_uuid_v4(UUID): @@ -1095,12 +1121,16 @@ def api_unregister_ail_to_sync_queue(json_dict): # # #### SYNC REDIS QUEUE ####### -def get_sync_queue_object(ail_uuid, push=True): +def get_sync_queue_object_and_queue_uuid(ail_uuid, push=True): for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid): obj_dict = get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=push) if obj_dict: - return obj_dict - return None + return obj_dict, queue_uuid + return None, None + +def get_sync_queue_object(ail_uuid, push=True): + obj_dict, queue_uuid = get_sync_queue_object_and_queue_uuid(ail_uuid, push=push)[0] + return obj_dict def get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=True): if push: @@ -1125,6 +1155,15 @@ def add_object_to_sync_queue(queue_uuid, ail_uuid, obj_dict, push=True, pull=Tru r_serv_sync.lpush(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', obj) r_serv_sync.ltrim(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', 0, 200) +def resend_object_to_sync_queue(ail_uuid, queue_uuid, Obj, push=True): + if queue_uuid is not None and Obj is not None: + obj_dict = Obj.get_default_meta() + if push: + pull = False + else: + pull = True + add_object_to_sync_queue(queue_uuid, ail_uuid, obj_dict, push=push, pull=pull) + # # TODO: # REVIEW: USE CACHE ????? USE QUEUE FACTORY ????? def get_sync_importer_ail_stream(): return r_serv_sync.spop('sync:queue:importer') diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py index 3e7e5267..a1e7d300 100755 --- a/bin/core/ail_2_ail_client.py +++ b/bin/core/ail_2_ail_client.py @@ -51,24 +51,42 @@ async def api_request(websocket, ail_uuid): async def pull(websocket, ail_uuid): while True: obj = await websocket.recv() - sys.stdout.write(res) + sys.stdout.write(obj) async def push(websocket, ail_uuid): + ## DEBUG: + # try: + # while True: + # await websocket.send('test') + # await asyncio.sleep(10) + # except websockets.exceptions.ConnectionClosedError as err: + # print(err.code) + # raise err - while True: - # get elem to send - Obj = ail_2_ail.get_sync_queue_object(ail_uuid) - if Obj: - obj_ail_stream = ail_2_ail.create_ail_stream(Obj) - obj_ail_stream = json.dumps(obj_ail_stream) - print(obj_ail_stream) + try: + while True: + # get elem to send + Obj, queue_uuid = ail_2_ail.get_sync_queue_object_and_queue_uuid(ail_uuid) + if Obj: + obj_ail_stream = ail_2_ail.create_ail_stream(Obj) + obj_ail_stream = json.dumps(obj_ail_stream) - # send objects - await websocket.send(obj_ail_stream) - # DEBUG: - await asyncio.sleep(0.1) - else: - await asyncio.sleep(10) + sys.stdout.write(obj_ail_stream) + + # send objects + await websocket.send(obj_ail_stream) + await asyncio.sleep(0.1) + else: + await asyncio.sleep(10) + # check if connection open + if not websocket.open: + # raise websocket internal exceptions + await websocket.send('') + + except websockets.exceptions.ConnectionClosedError as err: + # resend object in queue on Connection Error + ail_2_ail.resend_object_to_sync_queue(ail_uuid, queue_uuid, Obj, push=True) + raise err async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=None): if not ail_2_ail.exists_ail_instance(ail_uuid): @@ -118,17 +136,22 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=No if status_code == 1000: print('connection closed') elif status_code == 400: - error_message = 'BAD_REQUEST: Invalid path' + error_message = '400 BAD_REQUEST: Invalid path' elif status_code == 401: - error_message = 'UNAUTHORIZED: Invalid Key' + error_message = '401 UNAUTHORIZED: Invalid Key' elif status_code == 403: - error_message = 'FORBIDDEN: SYNC mode disabled' + error_message = '403 FORBIDDEN: SYNC mode disabled' else: error_message = str(e) if error_message: sys.stderr.write(error_message) redis_logger.warning(f'{ail_uuid}: {error_message}') ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except websockets.exceptions.ConnectionClosedError as e: + error_message = ail_2_ail.get_websockets_close_message(e.code) + sys.stderr.write(error_message) + redis_logger.info(f'{ail_uuid}: {error_message}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) except websockets.exceptions.InvalidURI as e: error_message = f'Invalid AIL url: {e.uri}' @@ -150,8 +173,7 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=No print('connection closed') except Exception as err: trace = traceback.format_tb(err.__traceback__) - if len(trace) == 1: - trace = trace[0] + trace = ''.join(trace) trace = str(trace) error_message = f'{trace}\n{str(err)}' sys.stderr.write(error_message) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index 2ac8e546..41760071 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -120,21 +120,26 @@ async def unregister(websocket): # PULL: Send data to client # # TODO: ADD TIMEOUT ??? async def pull(websocket, ail_uuid): + try: + for queue_uuid in ail_2_ail.get_ail_instance_all_sync_queue(ail_uuid): + while True: + # get elem to send + Obj = ail_2_ail.get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=False) + if Obj: + obj_ail_stream = ail_2_ail.create_ail_stream(Obj) + Obj = json.dumps(obj_ail_stream) + #print(Obj) - for queue_uuid in ail_2_ail.get_ail_instance_all_sync_queue(ail_uuid): - while True: - # get elem to send - Obj = ail_2_ail.get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=False) - if Obj: - obj_ail_stream = ail_2_ail.create_ail_stream(Obj) - Obj = json.dumps(obj_ail_stream) - #print(Obj) - - # send objects - await websocket.send(Obj) - # END PULL - else: - break + # send objects + await websocket.send(Obj) + await asyncio.sleep(0.1) + # END PULL + else: + break + except websockets.exceptions.ConnectionClosedError as err: + # resend object in queue on Connection Error + ail_2_ail.resend_object_to_sync_queue(ail_uuid, queue_uuid, Obj, push=False) + raise err # END PULL return None @@ -151,6 +156,7 @@ async def push(websocket, ail_uuid): ail_stream = json.loads(ail_stream) #print(ail_stream) + # # TODO: Close connection on junk ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) # API: server API diff --git a/var/www/templates/ail_2_ail/view_ail_server.html b/var/www/templates/ail_2_ail/view_ail_server.html index 31461acf..3426f8a5 100644 --- a/var/www/templates/ail_2_ail/view_ail_server.html +++ b/var/www/templates/ail_2_ail/view_ail_server.html @@ -184,12 +184,12 @@ {% if server_metadata['error']%} -
-								----------------------------
-								-          ERROR           -
-								----------------------------
-								{{server_metadata['error']}}
-							
+
+					----------------------------
+					-          ERROR           -
+					----------------------------
+{{server_metadata['error']}}
+
{% endif %}