lookyloo/bin/async_capture.py

149 lines
6.0 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
2024-01-12 17:15:41 +01:00
from __future__ import annotations
import asyncio
import logging
2022-11-23 15:54:22 +01:00
import logging.config
import signal
from asyncio import Task
from pathlib import Path
from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
2024-01-16 00:27:43 +01:00
from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy
2024-01-13 01:24:32 +01:00
from lookyloo import Lookyloo, CaptureSettings
from lookyloo.exceptions import LacusUnreachable
from lookyloo.default import AbstractManager, get_config, LookylooException
2022-09-20 14:49:58 +02:00
from lookyloo.helpers import get_captures_dir
2022-05-02 13:04:55 +02:00
from lookyloo.modules import FOX
2022-11-23 15:54:22 +01:00
logging.config.dictConfig(get_config('logging'))
2020-11-05 14:14:33 +01:00
class AsyncCapture(AbstractManager):
2024-01-12 17:15:41 +01:00
def __init__(self, loglevel: int | None=None) -> None:
super().__init__(loglevel)
self.script_name = 'async_capture'
self.only_global_lookups: bool = get_config('generic', 'only_global_lookups')
self.capture_dir: Path = get_captures_dir()
2024-05-25 12:29:01 +02:00
self.lookyloo = Lookyloo(cache_max_size=1)
self.captures: set[asyncio.Task] = set() # type: ignore[type-arg]
self.fox = FOX(config_name='FOX')
2022-05-02 13:04:55 +02:00
if not self.fox.available:
self.logger.warning('Unable to setup the FOX module')
2024-01-12 17:15:41 +01:00
async def _trigger_captures(self) -> None:
# Only called if LacusCore is used
def clear_list_callback(task: Task) -> None: # type: ignore[type-arg]
self.captures.discard(task)
self.unset_running()
max_new_captures = get_config('generic', 'async_capture_processes') - len(self.captures)
self.logger.debug(f'{len(self.captures)} ongoing captures.')
if max_new_captures <= 0:
self.logger.info(f'Max amount of captures in parallel reached ({len(self.captures)})')
2024-01-12 17:15:41 +01:00
return None
for capture_task in self.lookyloo.lacus.consume_queue(max_new_captures): # type: ignore[union-attr]
self.captures.add(capture_task)
self.set_running()
capture_task.add_done_callback(clear_list_callback)
2024-01-12 17:15:41 +01:00
def uuids_ready(self) -> list[str]:
'''Get the list of captures ready to be processed'''
# Only check if the top 50 in the priority list are done, as they are the most likely ones to be
# and if the list it very very long, iterating over it takes a very long time.
return [uuid for uuid in self.lookyloo.redis.zrevrangebyscore('to_capture', 'Inf', '-Inf', start=0, num=500)
if uuid and self.lookyloo.lacus.get_capture_status(uuid) in [CaptureStatusPy.DONE, CaptureStatusCore.DONE]]
def process_capture_queue(self) -> None:
'''Process a query from the capture queue'''
2024-01-12 17:15:41 +01:00
entries: CaptureResponseCore | CaptureResponsePy
for uuid in self.uuids_ready():
if isinstance(self.lookyloo.lacus, LacusCore):
entries = self.lookyloo.lacus.get_capture(uuid, decode=True)
elif isinstance(self.lookyloo.lacus, PyLacus):
entries = self.lookyloo.lacus.get_capture(uuid)
else:
raise LookylooException(f'lacus must be LacusCore or PyLacus, not {type(self.lookyloo.lacus)}.')
log = f'Got the capture for {uuid} from Lacus'
if runtime := entries.get('runtime'):
log = f'{log} - Runtime: {runtime}'
self.logger.info(log)
self.lookyloo.redis.sadd('ongoing', uuid)
2024-01-12 17:15:41 +01:00
queue: str | None = self.lookyloo.redis.getdel(f'{uuid}_mgmt')
2024-07-19 16:12:24 +02:00
to_capture: CaptureSettings | None = self.lookyloo.get_capture_settings(uuid)
if not to_capture:
continue
self.lookyloo.store_capture(
2024-07-19 16:12:24 +02:00
uuid, to_capture.listing,
os=to_capture.os, browser=to_capture.browser,
parent=to_capture.parent,
downloaded_filename=entries.get('downloaded_filename'),
downloaded_file=entries.get('downloaded_file'),
error=entries.get('error'), har=entries.get('har'),
png=entries.get('png'), html=entries.get('html'),
last_redirected_url=entries.get('last_redirected_url'),
cookies=entries.get('cookies'),
capture_settings=to_capture,
potential_favicons=entries.get('potential_favicons'),
auto_report=to_capture.auto_report,
)
lazy_cleanup = self.lookyloo.redis.pipeline()
if queue and self.lookyloo.redis.zscore('queues', queue):
lazy_cleanup.zincrby('queues', -1, queue)
lazy_cleanup.zrem('to_capture', uuid)
lazy_cleanup.srem('ongoing', uuid)
lazy_cleanup.delete(uuid)
# make sure to expire the key if nothing was processed for a while (= queues empty)
lazy_cleanup.expire('queues', 600)
lazy_cleanup.execute()
self.logger.info(f'Done with {uuid}')
2024-01-12 17:15:41 +01:00
async def _to_run_forever_async(self) -> None:
if self.force_stop:
2024-01-12 17:15:41 +01:00
return None
try:
if isinstance(self.lookyloo.lacus, LacusCore):
await self._trigger_captures()
self.process_capture_queue()
except LacusUnreachable:
self.logger.error('Lacus is unreachable, retrying later.')
2024-01-12 17:15:41 +01:00
async def _wait_to_finish_async(self) -> None:
try:
if isinstance(self.lookyloo.lacus, LacusCore):
while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
await asyncio.sleep(5)
self.process_capture_queue()
self.logger.info('No more captures')
except LacusUnreachable:
self.logger.error('Lacus is unreachable, nothing to wait for')
2024-01-12 17:15:41 +01:00
def main() -> None:
2020-11-05 14:14:33 +01:00
m = AsyncCapture()
loop = asyncio.new_event_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(m.stop_async()))
try:
loop.run_until_complete(m.run_async(sleep_in_sec=1))
finally:
loop.close()
if __name__ == '__main__':
main()