mirror of https://github.com/CIRCL/lookyloo
fix: Calls with the VT lib fail when not in async mode.
parent
84edd62b28
commit
db5644d782
|
@ -2,12 +2,14 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from datetime import date
|
||||
from typing import Any, TYPE_CHECKING
|
||||
|
||||
import vt # type: ignore[import-untyped]
|
||||
from vt import ClientResponse
|
||||
from vt.error import APIError # type: ignore[import-untyped]
|
||||
from vt.object import WhistleBlowerDict # type: ignore[import-untyped]
|
||||
|
||||
|
@ -71,6 +73,15 @@ class VirusTotal(AbstractModule):
|
|||
self.url_lookup(cache.url, force)
|
||||
return {'success': 'Module triggered'}
|
||||
|
||||
async def get_object_vt(self, url: str) -> ClientResponse:
|
||||
url_id = vt.url_id(url)
|
||||
async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client:
|
||||
return await client.get_object_async(f"/urls/{url_id}")
|
||||
|
||||
async def scan_url(self, url: str) -> None:
|
||||
async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client:
|
||||
await client.scan_url_async(url)
|
||||
|
||||
def url_lookup(self, url: str, force: bool=False) -> None:
|
||||
'''Lookup an URL on VT
|
||||
Note: force means 2 things:
|
||||
|
@ -89,7 +100,7 @@ class VirusTotal(AbstractModule):
|
|||
scan_requested = False
|
||||
if self.autosubmit and force:
|
||||
try:
|
||||
self.client.scan_url(url)
|
||||
asyncio.run(self.scan_url(url))
|
||||
except APIError as e:
|
||||
if e.code == 'QuotaExceededError':
|
||||
self.logger.warning('VirusTotal quota exceeded, sry.')
|
||||
|
@ -100,10 +111,9 @@ class VirusTotal(AbstractModule):
|
|||
if not force and vt_file.exists():
|
||||
return
|
||||
|
||||
url_id = vt.url_id(url)
|
||||
for _ in range(3):
|
||||
try:
|
||||
url_information = self.client.get_object(f"/urls/{url_id}")
|
||||
url_information = asyncio.run(self.get_object_vt(url))
|
||||
with vt_file.open('w') as _f:
|
||||
json.dump(url_information.to_dict(), _f, default=jsonify_vt)
|
||||
break
|
||||
|
@ -112,7 +122,7 @@ class VirusTotal(AbstractModule):
|
|||
break
|
||||
if not scan_requested and e.code == 'NotFoundError':
|
||||
try:
|
||||
self.client.scan_url(url)
|
||||
asyncio.run(self.scan_url(url))
|
||||
scan_requested = True
|
||||
except APIError as e:
|
||||
self.logger.warning(f'Unable to trigger VirusTotal on {url}: {e}')
|
||||
|
|
Loading…
Reference in New Issue