chg: Improve status reporting when a capture is ongoing

pull/526/head
Raphaël Vinot 2022-09-23 21:33:38 +02:00
parent 862c9e0750
commit 354841b005
1 changed files with 15 additions and 11 deletions

View File

@ -60,6 +60,8 @@ class AsyncCapture(AbstractManager):
async def process_capture_queue(self) -> None:
'''Process a query from the capture queue'''
self.set_running()
uuid: Optional[str]
if isinstance(self.lacus, LacusCore):
uuid = await self.lacus.consume_queue()
if not uuid:
@ -70,13 +72,14 @@ class AsyncCapture(AbstractManager):
if entries['status'] != CaptureStatusCore.DONE:
self.logger.warning(f'The capture {uuid} is reported as not done ({entries["status"]}) when it should.')
self.redis.zrem('to_capture', uuid)
self.redis.delete(uuid)
return
else:
# Find a capture that is done
for uuid_b in self.redis.zrevrangebyscore('to_capture', 'Inf', '-Inf'):
uuid = uuid_b.decode()
if not uuid:
return
break
entries = self.lacus.get_capture(uuid)
if entries['status'] == CaptureStatusPy.DONE:
break
@ -154,16 +157,17 @@ class AsyncCapture(AbstractManager):
with (dirpath / '0.cookies.json').open('w') as _cookies:
json.dump(entries['cookies'], _cookies)
with self.redis.pipeline() as lazy_cleanup:
lazy_cleanup.hset('lookup_dirs', uuid, str(dirpath))
if queue and self.redis.zscore('queues', queue):
lazy_cleanup.zincrby('queues', -1, queue)
lazy_cleanup.zrem('to_capture', uuid)
lazy_cleanup.srem('ongoing', uuid)
lazy_cleanup.delete(uuid)
# make sure to expire the key if nothing was processed for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
lazy_cleanup = self.redis.pipeline()
lazy_cleanup.hset('lookup_dirs', uuid, str(dirpath))
if queue and self.redis.zscore('queues', queue):
lazy_cleanup.zincrby('queues', -1, queue)
lazy_cleanup.zrem('to_capture', uuid)
lazy_cleanup.srem('ongoing', uuid)
lazy_cleanup.delete(uuid)
# make sure to expire the key if nothing was processed for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
self.unset_running()
async def _to_run_forever_async(self):
capture = asyncio.create_task(self.process_capture_queue())