From 354841b005442c4f9d86c6466cee8750d803657c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 23 Sep 2022 21:33:38 +0200 Subject: [PATCH] chg: Improve status reporting when a capture is ongoing --- bin/async_capture.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/bin/async_capture.py b/bin/async_capture.py index 5b375698..52431902 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -60,6 +60,8 @@ class AsyncCapture(AbstractManager): async def process_capture_queue(self) -> None: '''Process a query from the capture queue''' + self.set_running() + uuid: Optional[str] if isinstance(self.lacus, LacusCore): uuid = await self.lacus.consume_queue() if not uuid: @@ -70,13 +72,14 @@ class AsyncCapture(AbstractManager): if entries['status'] != CaptureStatusCore.DONE: self.logger.warning(f'The capture {uuid} is reported as not done ({entries["status"]}) when it should.') self.redis.zrem('to_capture', uuid) + self.redis.delete(uuid) return else: # Find a capture that is done for uuid_b in self.redis.zrevrangebyscore('to_capture', 'Inf', '-Inf'): uuid = uuid_b.decode() if not uuid: - return + break entries = self.lacus.get_capture(uuid) if entries['status'] == CaptureStatusPy.DONE: break @@ -154,16 +157,17 @@ class AsyncCapture(AbstractManager): with (dirpath / '0.cookies.json').open('w') as _cookies: json.dump(entries['cookies'], _cookies) - with self.redis.pipeline() as lazy_cleanup: - lazy_cleanup.hset('lookup_dirs', uuid, str(dirpath)) - if queue and self.redis.zscore('queues', queue): - lazy_cleanup.zincrby('queues', -1, queue) - lazy_cleanup.zrem('to_capture', uuid) - lazy_cleanup.srem('ongoing', uuid) - lazy_cleanup.delete(uuid) - # make sure to expire the key if nothing was processed for a while (= queues empty) - lazy_cleanup.expire('queues', 600) - lazy_cleanup.execute() + lazy_cleanup = self.redis.pipeline() + lazy_cleanup.hset('lookup_dirs', uuid, str(dirpath)) + if queue and self.redis.zscore('queues', queue): + lazy_cleanup.zincrby('queues', -1, queue) + lazy_cleanup.zrem('to_capture', uuid) + lazy_cleanup.srem('ongoing', uuid) + lazy_cleanup.delete(uuid) + # make sure to expire the key if nothing was processed for a while (= queues empty) + lazy_cleanup.expire('queues', 600) + lazy_cleanup.execute() + self.unset_running() async def _to_run_forever_async(self): capture = asyncio.create_task(self.process_capture_queue())