diff --git a/lookyloo/modules/vt.py b/lookyloo/modules/vt.py index 006edee5..0e6242f6 100644 --- a/lookyloo/modules/vt.py +++ b/lookyloo/modules/vt.py @@ -2,12 +2,14 @@ from __future__ import annotations +import asyncio import json import time from datetime import date from typing import Any, TYPE_CHECKING import vt # type: ignore[import-untyped] +from vt import ClientResponse from vt.error import APIError # type: ignore[import-untyped] from vt.object import WhistleBlowerDict # type: ignore[import-untyped] @@ -71,6 +73,15 @@ class VirusTotal(AbstractModule): self.url_lookup(cache.url, force) return {'success': 'Module triggered'} + async def get_object_vt(self, url: str) -> ClientResponse: + url_id = vt.url_id(url) + async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client: + return await client.get_object_async(f"/urls/{url_id}") + + async def scan_url(self, url: str) -> None: + async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client: + await client.scan_url_async(url) + def url_lookup(self, url: str, force: bool=False) -> None: '''Lookup an URL on VT Note: force means 2 things: @@ -89,7 +100,7 @@ class VirusTotal(AbstractModule): scan_requested = False if self.autosubmit and force: try: - self.client.scan_url(url) + asyncio.run(self.scan_url(url)) except APIError as e: if e.code == 'QuotaExceededError': self.logger.warning('VirusTotal quota exceeded, sry.') @@ -100,10 +111,9 @@ class VirusTotal(AbstractModule): if not force and vt_file.exists(): return - url_id = vt.url_id(url) for _ in range(3): try: - url_information = self.client.get_object(f"/urls/{url_id}") + url_information = asyncio.run(self.get_object_vt(url)) with vt_file.open('w') as _f: json.dump(url_information.to_dict(), _f, default=jsonify_vt) break @@ -112,7 +122,7 @@ class VirusTotal(AbstractModule): break if not scan_requested and e.code == 'NotFoundError': try: - self.client.scan_url(url) + asyncio.run(self.scan_url(url)) scan_requested = True except APIError as e: self.logger.warning(f'Unable to trigger VirusTotal on {url}: {e}')