new: SIGTERM handling (PyLacus and LacusCore)

pull/544/head
Raphaël Vinot 2022-10-28 12:40:28 +02:00
parent ff68e29136
commit a48c6e0bd6
5 changed files with 72 additions and 21 deletions

View File

@ -3,10 +3,12 @@
import asyncio import asyncio
import json import json
import logging import logging
import signal
import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Dict, Optional, Set, Union from typing import Dict, Optional, Union
from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy
@ -48,7 +50,7 @@ class AsyncCapture(AbstractManager):
self.lacus = LacusCore(self.redis, get_config('generic', 'tor_proxy'), self.lacus = LacusCore(self.redis, get_config('generic', 'tor_proxy'),
get_config('generic', 'only_global_lookups')) get_config('generic', 'only_global_lookups'))
self.captures: Set[asyncio.Task] = set() self.captures: Dict[asyncio.Task, float] = {}
self.fox = FOX(get_config('modules', 'FOX')) self.fox = FOX(get_config('modules', 'FOX'))
if not self.fox.available: if not self.fox.available:
@ -79,7 +81,10 @@ class AsyncCapture(AbstractManager):
break break
entries = self.lacus.get_capture(uuid) entries = self.lacus.get_capture(uuid)
if entries['status'] == CaptureStatusPy.DONE: if entries['status'] == CaptureStatusPy.DONE:
self.logger.info(f'Got the capture for {uuid} from Lacus') log = f'Got the capture for {uuid} from Lacus'
if runtime := entries.get('runtime'):
log = f'{log} - Runtime: {runtime}'
self.logger.info(log)
break break
else: else:
# No captures are ready # No captures are ready
@ -175,17 +180,44 @@ class AsyncCapture(AbstractManager):
self.unset_running() self.unset_running()
self.logger.info(f'Done with {uuid}') self.logger.info(f'Done with {uuid}')
async def cancel_old_captures(self):
cancelled_tasks = []
for task, timestamp in self.captures.items():
if time.time() - timestamp >= get_config('generic', 'max_capture_time'):
task.cancel()
cancelled_tasks.append(task)
self.logger.warning('A capture has been going for too long, canceling it.')
if cancelled_tasks:
await asyncio.gather(*cancelled_tasks, return_exceptions=True)
async def _to_run_forever_async(self): async def _to_run_forever_async(self):
await self.cancel_old_captures()
if self.force_stop:
return
capture = asyncio.create_task(self.process_capture_queue()) capture = asyncio.create_task(self.process_capture_queue())
capture.add_done_callback(self.captures.discard) self.captures[capture] = time.time()
self.captures.add(capture) capture.add_done_callback(self.captures.pop)
while len(self.captures) >= get_config('generic', 'async_capture_processes'): while len(self.captures) >= get_config('generic', 'async_capture_processes'):
await self.cancel_old_captures()
await asyncio.sleep(1) await asyncio.sleep(1)
async def _wait_to_finish(self):
while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
await asyncio.sleep(5)
self.logger.info('No more captures')
def main(): def main():
m = AsyncCapture() m = AsyncCapture()
asyncio.run(m.run_async(sleep_in_sec=1))
loop = asyncio.new_event_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(m.stop_async()))
try:
loop.run_until_complete(m.run_async(sleep_in_sec=1))
finally:
loop.close()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -50,6 +50,7 @@
}, },
"hide_captures_with_error": false, "hide_captures_with_error": false,
"archive": 180, "archive": 180,
"max_capture_time": 3600,
"_notes": { "_notes": {
"loglevel": "(lookyloo) Can be one of the value listed here: https://docs.python.org/3/library/logging.html#levels", "loglevel": "(lookyloo) Can be one of the value listed here: https://docs.python.org/3/library/logging.html#levels",
"only_global_lookups": "Set it to True if your instance is publicly available so users aren't able to scan your internal network", "only_global_lookups": "Set it to True if your instance is publicly available so users aren't able to scan your internal network",
@ -75,6 +76,7 @@
"email": "Configuration for sending email notifications.", "email": "Configuration for sending email notifications.",
"priority": "Define the priority of a new capture. A capture from the web interface has priority over a capture from the API, same for authenticated user vs. anonymous.", "priority": "Define the priority of a new capture. A capture from the web interface has priority over a capture from the API, same for authenticated user vs. anonymous.",
"hide_captures_with_error": "Capturing an URL may result in an error (domain non-existent, HTTP error, ...). They may be useful to see, but if you have a public instance, they will clutter the index.", "hide_captures_with_error": "Capturing an URL may result in an error (domain non-existent, HTTP error, ...). They may be useful to see, but if you have a public instance, they will clutter the index.",
"archive": "The captures older than this value (in days) will be archived. They're not cached by default in the Lookyloo class." "archive": "The captures older than this value (in days) will be archived. They're not cached by default in the Lookyloo class.",
"max_capture_time": "The very maximal time we allow a capture to keep going. Should only be triggered by captures that cause playwright to never quit."
} }
} }

View File

@ -27,6 +27,8 @@ class AbstractManager(ABC):
self.process: Optional[Popen] = None self.process: Optional[Popen] = None
self.__redis = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) self.__redis = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True)
self.force_stop = False
@staticmethod @staticmethod
def is_running() -> List[Tuple[str, float]]: def is_running() -> List[Tuple[str, float]]:
try: try:
@ -81,7 +83,7 @@ class AbstractManager(ABC):
return True return True
def _to_run_forever(self) -> None: def _to_run_forever(self) -> None:
pass raise NotImplementedError('This method must be implemented by the child')
def _kill_process(self): def _kill_process(self):
if self.process is None: if self.process is None:
@ -103,7 +105,7 @@ class AbstractManager(ABC):
def run(self, sleep_in_sec: int) -> None: def run(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}') self.logger.info(f'Launching {self.__class__.__name__}')
try: try:
while True: while not self.force_stop:
if self.shutdown_requested(): if self.shutdown_requested():
break break
try: try:
@ -135,13 +137,25 @@ class AbstractManager(ABC):
pass pass
self.logger.info(f'Shutting down {self.__class__.__name__}') self.logger.info(f'Shutting down {self.__class__.__name__}')
async def stop(self):
self.force_stop = True
async def _to_run_forever_async(self) -> None: async def _to_run_forever_async(self) -> None:
pass raise NotImplementedError('This method must be implemented by the child')
async def _wait_to_finish(self) -> None:
self.logger.info('Not implemented, nothing to wait for.')
async def stop_async(self):
"""Method to pass the signal handler:
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(p.stop()))
"""
self.force_stop = True
async def run_async(self, sleep_in_sec: int) -> None: async def run_async(self, sleep_in_sec: int) -> None:
self.logger.info(f'Launching {self.__class__.__name__}') self.logger.info(f'Launching {self.__class__.__name__}')
try: try:
while True: while not self.force_stop:
if self.shutdown_requested(): if self.shutdown_requested():
break break
try: try:
@ -163,7 +177,10 @@ class AbstractManager(ABC):
break break
except KeyboardInterrupt: except KeyboardInterrupt:
self.logger.warning(f'{self.script_name} killed by user.') self.logger.warning(f'{self.script_name} killed by user.')
except Exception as e:
self.logger.exception(e)
finally: finally:
await self._wait_to_finish()
if self.process: if self.process:
self._kill_process() self._kill_process()
try: try:

16
poetry.lock generated
View File

@ -571,7 +571,7 @@ format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-
[[package]] [[package]]
name = "lacuscore" name = "lacuscore"
version = "1.1.1" version = "1.1.2"
description = "Core of Lacus, usable as a module" description = "Core of Lacus, usable as a module"
category = "main" category = "main"
optional = false optional = false
@ -956,7 +956,7 @@ virustotal = ["validators (>=0.20.0,<0.21.0)"]
[[package]] [[package]]
name = "pypandora" name = "pypandora"
version = "1.1.2" version = "1.2.0"
description = "Python CLI and module for pandora" description = "Python CLI and module for pandora"
category = "main" category = "main"
optional = false optional = false
@ -966,7 +966,7 @@ python-versions = ">=3.8,<4.0"
requests = ">=2.28.1,<3.0.0" requests = ">=2.28.1,<3.0.0"
[package.extras] [package.extras]
docs = ["Sphinx (>=5.1.1,<6.0.0)"] docs = ["Sphinx (>=5.3.0,<6.0.0)"]
[[package]] [[package]]
name = "pyparsing" name = "pyparsing"
@ -1476,7 +1476,7 @@ misp = ["python-magic", "pydeep2"]
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.8,<3.11" python-versions = ">=3.8,<3.11"
content-hash = "7b37a5eaf62e9f1e0cc99e77699dbd402f38a7ccf1ede62318975307ae24b7c8" content-hash = "77650148d8eec3bbcb65ed2263508e350fa6c656382cd4e70046ce7f326b7dd7"
[metadata.files] [metadata.files]
aiohttp = [ aiohttp = [
@ -1931,8 +1931,8 @@ jsonschema = [
{file = "jsonschema-4.16.0.tar.gz", hash = "sha256:165059f076eff6971bae5b742fc029a7b4ef3f9bcf04c14e4776a7605de14b23"}, {file = "jsonschema-4.16.0.tar.gz", hash = "sha256:165059f076eff6971bae5b742fc029a7b4ef3f9bcf04c14e4776a7605de14b23"},
] ]
lacuscore = [ lacuscore = [
{file = "lacuscore-1.1.1-py3-none-any.whl", hash = "sha256:bef70fd371863fc4641360903d94c84608597d3155018ceae8b11e002b741af2"}, {file = "lacuscore-1.1.2-py3-none-any.whl", hash = "sha256:876d3ccb743bb4d43421d1670762af2f54c6b82dfdec9b5a4c37109dbabd02c6"},
{file = "lacuscore-1.1.1.tar.gz", hash = "sha256:a94d5f7876b6b01b5a3593a8bd4af35e8dcd644e64351f150b3a590df70e7895"}, {file = "lacuscore-1.1.2.tar.gz", hash = "sha256:ed83c8f4cb31e24ec0e39ce85fcd9dd675c0ae96bf9aaededf7c21469be6b1ad"},
] ]
lief = [ lief = [
{file = "lief-0.12.2-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:cdadaab4b9ec756e1d1f0324acd6e280ae849d251e66f836da455df592deaf9e"}, {file = "lief-0.12.2-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:cdadaab4b9ec756e1d1f0324acd6e280ae849d251e66f836da455df592deaf9e"},
@ -2369,8 +2369,8 @@ pymisp = [
{file = "pymisp-2.4.162.1.tar.gz", hash = "sha256:4e5721dfae5ed54ca7c967d913c1adbadf0e495f2db4b340a43523f06493601e"}, {file = "pymisp-2.4.162.1.tar.gz", hash = "sha256:4e5721dfae5ed54ca7c967d913c1adbadf0e495f2db4b340a43523f06493601e"},
] ]
pypandora = [ pypandora = [
{file = "pypandora-1.1.2-py3-none-any.whl", hash = "sha256:6523c0ba2bc10bcc26f379ecec65e39b57c26666be8ab399ad1d22594cedfaed"}, {file = "pypandora-1.2.0-py3-none-any.whl", hash = "sha256:f50286f71721def21210e14300aff7682079da7a1c6c125234242e06259c3c97"},
{file = "pypandora-1.1.2.tar.gz", hash = "sha256:49f911588f8c9be225d07727d0aa49905be83e73e811d5e16e9b0ce5d524915b"}, {file = "pypandora-1.2.0.tar.gz", hash = "sha256:28dc9baf65d7ae13a7a5bec9498dbba0d13b1addf348c285d51f79183c0968df"},
] ]
pyparsing = [ pyparsing = [
{file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"}, {file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"},

View File

@ -66,8 +66,8 @@ har2tree = "^1.16.0"
passivetotal = "^2.5.9" passivetotal = "^2.5.9"
werkzeug = "2.1.2" werkzeug = "2.1.2"
filetype = "^1.1.0" filetype = "^1.1.0"
pypandora = "^1.1.2" pypandora = "^1.2.0"
lacuscore = "^1.1.1" lacuscore = "^1.1.2"
pylacus = "^1.1.0" pylacus = "^1.1.0"
[tool.poetry.extras] [tool.poetry.extras]