fix: [ail sync] server + client: resend object in queue on ConnectionClosedError

pull/586/head
Terrtia 2021-11-30 15:07:41 +01:00
parent d9e0d4acc5
commit f597e1aaa2
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
4 changed files with 109 additions and 42 deletions

View File

@ -30,10 +30,36 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB")
r_serv_sync = config_loader.get_redis_conn("ARDB_DB") r_serv_sync = config_loader.get_redis_conn("ARDB_DB")
config_loader = None config_loader = None
WEBSOCKETS_CLOSE_CODES = {
1000: 'Normal Closure',
1001: 'Going Away',
1002: 'Protocol Error',
1003: 'Unsupported Data',
1005: 'No Status Received',
1006: 'Abnormal Closure',
1007: 'Invalid frame payload data',
1008: 'Policy Violation',
1009: 'Message too big',
1010: 'Missing Extension',
1011: 'Internal Error',
1012: 'Service Restart',
1013: 'Try Again Later',
1014: 'Bad Gateway',
1015: 'TLS Handshake',
}
#### LOGS #### #### LOGS ####
# redis_logger = publisher # redis_logger = publisher
# redis_logger.port = 6380 # redis_logger.port = 6380
# redis_logger.channel = 'Sync' # redis_logger.channel = 'Sync'
def get_websockets_close_message(code):
if code in WEBSOCKETS_CLOSE_CODES:
msg = f'{code} {WEBSOCKETS_CLOSE_CODES[code]}'
else:
msg = f'{code} Unknow websockets code'
return msg
##-- LOGS --## ##-- LOGS --##
def is_valid_uuid_v4(UUID): def is_valid_uuid_v4(UUID):
@ -1095,12 +1121,16 @@ def api_unregister_ail_to_sync_queue(json_dict):
# # # #
#### SYNC REDIS QUEUE ####### #### SYNC REDIS QUEUE #######
def get_sync_queue_object(ail_uuid, push=True): def get_sync_queue_object_and_queue_uuid(ail_uuid, push=True):
for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid): for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid):
obj_dict = get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=push) obj_dict = get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=push)
if obj_dict: if obj_dict:
return obj_dict, queue_uuid
return None, None
def get_sync_queue_object(ail_uuid, push=True):
obj_dict, queue_uuid = get_sync_queue_object_and_queue_uuid(ail_uuid, push=push)[0]
return obj_dict return obj_dict
return None
def get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=True): def get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=True):
if push: if push:
@ -1125,6 +1155,15 @@ def add_object_to_sync_queue(queue_uuid, ail_uuid, obj_dict, push=True, pull=Tru
r_serv_sync.lpush(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', obj) r_serv_sync.lpush(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', obj)
r_serv_sync.ltrim(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', 0, 200) r_serv_sync.ltrim(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', 0, 200)
def resend_object_to_sync_queue(ail_uuid, queue_uuid, Obj, push=True):
if queue_uuid is not None and Obj is not None:
obj_dict = Obj.get_default_meta()
if push:
pull = False
else:
pull = True
add_object_to_sync_queue(queue_uuid, ail_uuid, obj_dict, push=push, pull=pull)
# # TODO: # REVIEW: USE CACHE ????? USE QUEUE FACTORY ????? # # TODO: # REVIEW: USE CACHE ????? USE QUEUE FACTORY ?????
def get_sync_importer_ail_stream(): def get_sync_importer_ail_stream():
return r_serv_sync.spop('sync:queue:importer') return r_serv_sync.spop('sync:queue:importer')

View File

@ -51,24 +51,42 @@ async def api_request(websocket, ail_uuid):
async def pull(websocket, ail_uuid): async def pull(websocket, ail_uuid):
while True: while True:
obj = await websocket.recv() obj = await websocket.recv()
sys.stdout.write(res) sys.stdout.write(obj)
async def push(websocket, ail_uuid): async def push(websocket, ail_uuid):
## DEBUG:
# try:
# while True:
# await websocket.send('test')
# await asyncio.sleep(10)
# except websockets.exceptions.ConnectionClosedError as err:
# print(err.code)
# raise err
try:
while True: while True:
# get elem to send # get elem to send
Obj = ail_2_ail.get_sync_queue_object(ail_uuid) Obj, queue_uuid = ail_2_ail.get_sync_queue_object_and_queue_uuid(ail_uuid)
if Obj: if Obj:
obj_ail_stream = ail_2_ail.create_ail_stream(Obj) obj_ail_stream = ail_2_ail.create_ail_stream(Obj)
obj_ail_stream = json.dumps(obj_ail_stream) obj_ail_stream = json.dumps(obj_ail_stream)
print(obj_ail_stream)
sys.stdout.write(obj_ail_stream)
# send objects # send objects
await websocket.send(obj_ail_stream) await websocket.send(obj_ail_stream)
# DEBUG:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
else: else:
await asyncio.sleep(10) await asyncio.sleep(10)
# check if connection open
if not websocket.open:
# raise websocket internal exceptions
await websocket.send('')
except websockets.exceptions.ConnectionClosedError as err:
# resend object in queue on Connection Error
ail_2_ail.resend_object_to_sync_queue(ail_uuid, queue_uuid, Obj, push=True)
raise err
async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=None): async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=None):
if not ail_2_ail.exists_ail_instance(ail_uuid): if not ail_2_ail.exists_ail_instance(ail_uuid):
@ -118,17 +136,22 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=No
if status_code == 1000: if status_code == 1000:
print('connection closed') print('connection closed')
elif status_code == 400: elif status_code == 400:
error_message = 'BAD_REQUEST: Invalid path' error_message = '400 BAD_REQUEST: Invalid path'
elif status_code == 401: elif status_code == 401:
error_message = 'UNAUTHORIZED: Invalid Key' error_message = '401 UNAUTHORIZED: Invalid Key'
elif status_code == 403: elif status_code == 403:
error_message = 'FORBIDDEN: SYNC mode disabled' error_message = '403 FORBIDDEN: SYNC mode disabled'
else: else:
error_message = str(e) error_message = str(e)
if error_message: if error_message:
sys.stderr.write(error_message) sys.stderr.write(error_message)
redis_logger.warning(f'{ail_uuid}: {error_message}') redis_logger.warning(f'{ail_uuid}: {error_message}')
ail_2_ail.save_ail_server_error(ail_uuid, error_message) ail_2_ail.save_ail_server_error(ail_uuid, error_message)
except websockets.exceptions.ConnectionClosedError as e:
error_message = ail_2_ail.get_websockets_close_message(e.code)
sys.stderr.write(error_message)
redis_logger.info(f'{ail_uuid}: {error_message}')
ail_2_ail.save_ail_server_error(ail_uuid, error_message)
except websockets.exceptions.InvalidURI as e: except websockets.exceptions.InvalidURI as e:
error_message = f'Invalid AIL url: {e.uri}' error_message = f'Invalid AIL url: {e.uri}'
@ -150,8 +173,7 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=No
print('connection closed') print('connection closed')
except Exception as err: except Exception as err:
trace = traceback.format_tb(err.__traceback__) trace = traceback.format_tb(err.__traceback__)
if len(trace) == 1: trace = ''.join(trace)
trace = trace[0]
trace = str(trace) trace = str(trace)
error_message = f'{trace}\n{str(err)}' error_message = f'{trace}\n{str(err)}'
sys.stderr.write(error_message) sys.stderr.write(error_message)

View File

@ -120,7 +120,7 @@ async def unregister(websocket):
# PULL: Send data to client # PULL: Send data to client
# # TODO: ADD TIMEOUT ??? # # TODO: ADD TIMEOUT ???
async def pull(websocket, ail_uuid): async def pull(websocket, ail_uuid):
try:
for queue_uuid in ail_2_ail.get_ail_instance_all_sync_queue(ail_uuid): for queue_uuid in ail_2_ail.get_ail_instance_all_sync_queue(ail_uuid):
while True: while True:
# get elem to send # get elem to send
@ -132,9 +132,14 @@ async def pull(websocket, ail_uuid):
# send objects # send objects
await websocket.send(Obj) await websocket.send(Obj)
await asyncio.sleep(0.1)
# END PULL # END PULL
else: else:
break break
except websockets.exceptions.ConnectionClosedError as err:
# resend object in queue on Connection Error
ail_2_ail.resend_object_to_sync_queue(ail_uuid, queue_uuid, Obj, push=False)
raise err
# END PULL # END PULL
return None return None
@ -151,6 +156,7 @@ async def push(websocket, ail_uuid):
ail_stream = json.loads(ail_stream) ail_stream = json.loads(ail_stream)
#print(ail_stream) #print(ail_stream)
# # TODO: Close connection on junk
ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) ail_2_ail.add_ail_stream_to_sync_importer(ail_stream)
# API: server API # API: server API