chg: Avoid stopping the captures before they're done

pull/660/head
Raphaël Vinot 2023-04-09 13:58:34 +02:00
parent 2ceda75eab
commit 4ceae60db7
4 changed files with 28 additions and 11 deletions

View File

@ -111,14 +111,21 @@ class AsyncCapture(AbstractManager):
if isinstance(self.lookyloo.lacus, LacusCore): if isinstance(self.lookyloo.lacus, LacusCore):
await self._trigger_captures() await self._trigger_captures()
# NOTE: +1 because running this method also counts for one and will
# be decremented when it finishes
self.set_running(len(self.captures) + 1)
self.process_capture_queue() self.process_capture_queue()
async def _wait_to_finish(self): async def _wait_to_finish_async(self):
if isinstance(self.lookyloo.lacus, LacusCore): if isinstance(self.lookyloo.lacus, LacusCore):
while self.captures: while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...') self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
await asyncio.sleep(5) await asyncio.sleep(5)
# NOTE: +1 so we don't quit before the final process capture queue
self.set_running(len(self.captures) + 1)
self.process_capture_queue()
self.unset_running()
self.logger.info('No more captures') self.logger.info('No more captures')

View File

@ -67,8 +67,14 @@ class AbstractManager(ABC):
except RedisConnectionError: except RedisConnectionError:
print('Unable to connect to redis, the system is down.') print('Unable to connect to redis, the system is down.')
def set_running(self) -> None: def set_running(self, number: Optional[int]=None) -> None:
if number == 0:
self.__redis.zrem('running', self.script_name)
else:
if number is None:
self.__redis.zincrby('running', 1, self.script_name) self.__redis.zincrby('running', 1, self.script_name)
else:
self.__redis.zadd('running', {self.script_name: number})
self.__redis.sadd(f'service|{self.script_name}', os.getpid()) self.__redis.sadd(f'service|{self.script_name}', os.getpid())
def unset_running(self) -> None: def unset_running(self) -> None:
@ -148,6 +154,7 @@ class AbstractManager(ABC):
except KeyboardInterrupt: except KeyboardInterrupt:
self.logger.warning(f'{self.script_name} killed by user.') self.logger.warning(f'{self.script_name} killed by user.')
finally: finally:
self._wait_to_finish()
if self.process: if self.process:
self._kill_process() self._kill_process()
try: try:
@ -157,13 +164,16 @@ class AbstractManager(ABC):
pass pass
self.logger.info(f'Shutting down {self.__class__.__name__}') self.logger.info(f'Shutting down {self.__class__.__name__}')
def _wait_to_finish(self) -> None:
self.logger.info('Not implemented, nothing to wait for.')
async def stop(self): async def stop(self):
self.force_stop = True self.force_stop = True
async def _to_run_forever_async(self) -> None: async def _to_run_forever_async(self) -> None:
raise NotImplementedError('This method must be implemented by the child') raise NotImplementedError('This method must be implemented by the child')
async def _wait_to_finish(self) -> None: async def _wait_to_finish_async(self) -> None:
self.logger.info('Not implemented, nothing to wait for.') self.logger.info('Not implemented, nothing to wait for.')
async def stop_async(self): async def stop_async(self):
@ -200,7 +210,7 @@ class AbstractManager(ABC):
except Exception as e: # nosec B110 except Exception as e: # nosec B110
self.logger.exception(e) self.logger.exception(e)
finally: finally:
await self._wait_to_finish() await self._wait_to_finish_async()
if self.process: if self.process:
self._kill_process() self._kill_process()
try: try:

8
poetry.lock generated
View File

@ -1232,14 +1232,14 @@ format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-
[[package]] [[package]]
name = "lacuscore" name = "lacuscore"
version = "1.4.0" version = "1.4.2"
description = "Core of Lacus, usable as a module" description = "Core of Lacus, usable as a module"
category = "main" category = "main"
optional = false optional = false
python-versions = ">=3.8,<4.0" python-versions = ">=3.8,<4.0"
files = [ files = [
{file = "lacuscore-1.4.0-py3-none-any.whl", hash = "sha256:6c1f23d995b0b2082f5f8f6f91029916537336258a2dd4cdb64470de9c5eef6b"}, {file = "lacuscore-1.4.2-py3-none-any.whl", hash = "sha256:ed454ed5808bb0b00983622484e159cf06a51b9110094e01b26d630d56b9a801"},
{file = "lacuscore-1.4.0.tar.gz", hash = "sha256:f968d985069a02f18bc97b767286e6dc59f8d6207e9a2c858455ae93c19adace"}, {file = "lacuscore-1.4.2.tar.gz", hash = "sha256:47e2022cb3f99706af7c579239aa6a52da7576b46eeea042adc1910056c029d1"},
] ]
[package.dependencies] [package.dependencies]
@ -3135,4 +3135,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.8,<3.12" python-versions = ">=3.8,<3.12"
content-hash = "6c10e9bb7722974a3e19d06e51e3231ead718aa933fdb322af1666af9ec7a58d" content-hash = "5917030e798c3068972436cbbcca0c0a6f1dd9c8e1bf789d4df1fa5498d2d684"

View File

@ -65,7 +65,7 @@ passivetotal = "^2.5.9"
werkzeug = "^2.2.3" werkzeug = "^2.2.3"
filetype = "^1.2.0" filetype = "^1.2.0"
pypandora = "^1.4.0" pypandora = "^1.4.0"
lacuscore = "^1.4.0" lacuscore = "^1.4.2"
pylacus = "^1.4.0" pylacus = "^1.4.0"
pyipasnhistory = "^2.1.2" pyipasnhistory = "^2.1.2"
publicsuffixlist = "^0.9.3" publicsuffixlist = "^0.9.3"