2020-03-31 14:12:49 +02:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
2020-04-20 16:41:42 +02:00
|
|
|
from typing import Dict, Any, Optional
|
2020-03-31 14:12:49 +02:00
|
|
|
from datetime import date
|
|
|
|
import hashlib
|
|
|
|
import json
|
2020-04-20 16:41:42 +02:00
|
|
|
from pathlib import Path
|
|
|
|
import time
|
2020-03-31 14:12:49 +02:00
|
|
|
|
|
|
|
|
|
|
|
from .helpers import get_homedir
|
|
|
|
from .exceptions import ConfigError
|
|
|
|
|
|
|
|
import vt # type: ignore
|
|
|
|
|
|
|
|
|
|
|
|
class VirusTotal():
|
|
|
|
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
|
|
if 'apikey' not in config:
|
|
|
|
self.available = False
|
|
|
|
return
|
|
|
|
|
|
|
|
self.available = True
|
|
|
|
self.autosubmit = False
|
|
|
|
self.client = vt.Client(config['apikey'])
|
|
|
|
if config.get('autosubmit'):
|
|
|
|
self.autosubmit = True
|
|
|
|
self.storage_dir_vt = get_homedir() / 'vt_url'
|
|
|
|
self.storage_dir_vt.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
2020-05-18 18:32:59 +02:00
|
|
|
def __del__(self) -> None:
|
2020-03-31 14:12:49 +02:00
|
|
|
if hasattr(self, 'client'):
|
|
|
|
self.client.close()
|
|
|
|
|
2020-04-20 16:41:42 +02:00
|
|
|
def __get_cache_directory(self, url: str) -> Path:
|
2020-03-31 14:12:49 +02:00
|
|
|
url_id = vt.url_id(url)
|
|
|
|
m = hashlib.md5()
|
|
|
|
m.update(url_id.encode())
|
2020-04-20 16:41:42 +02:00
|
|
|
return self.storage_dir_vt / m.hexdigest()
|
2020-03-31 14:12:49 +02:00
|
|
|
|
2020-05-18 18:32:59 +02:00
|
|
|
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
|
2020-04-20 16:41:42 +02:00
|
|
|
url_storage_dir = self.__get_cache_directory(url)
|
|
|
|
if not url_storage_dir.exists():
|
|
|
|
return None
|
|
|
|
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
|
|
|
|
if not cached_entries:
|
|
|
|
return None
|
2020-03-31 14:12:49 +02:00
|
|
|
|
2020-04-20 16:41:42 +02:00
|
|
|
with cached_entries[0].open() as f:
|
|
|
|
return json.load(f)
|
|
|
|
|
2020-05-18 18:32:59 +02:00
|
|
|
def url_lookup(self, url: str, force: bool=False) -> None:
|
2020-04-20 16:41:42 +02:00
|
|
|
'''Lookup an URL on VT
|
|
|
|
Note: force means 2 things:
|
|
|
|
* (re)scan of the URL
|
|
|
|
* re fetch the object from VT even if we already did it today
|
|
|
|
|
|
|
|
Note: the URL will only be sent for scan if autosubmit is set to true in the config
|
|
|
|
'''
|
|
|
|
if not self.available:
|
|
|
|
raise ConfigError('VirusTotal not available, probably no API key')
|
|
|
|
|
|
|
|
url_id = vt.url_id(url)
|
|
|
|
url_storage_dir = self.__get_cache_directory(url)
|
|
|
|
url_storage_dir.mkdir(parents=True, exist_ok=True)
|
2020-03-31 14:12:49 +02:00
|
|
|
vt_file = url_storage_dir / date.today().isoformat()
|
2020-04-20 16:41:42 +02:00
|
|
|
|
|
|
|
scan_requested = False
|
|
|
|
if self.autosubmit and force:
|
|
|
|
self.client.scan_url(url)
|
|
|
|
scan_requested = True
|
|
|
|
|
|
|
|
if not force and vt_file.exists():
|
2020-03-31 14:12:49 +02:00
|
|
|
return
|
|
|
|
|
2020-04-20 16:41:42 +02:00
|
|
|
for i in range(3):
|
|
|
|
try:
|
|
|
|
url_information = self.client.get_object(f"/urls/{url_id}")
|
|
|
|
with vt_file.open('w') as _f:
|
|
|
|
json.dump(url_information.to_dict(), _f)
|
|
|
|
break
|
|
|
|
except vt.APIError as e:
|
|
|
|
if not self.autosubmit:
|
|
|
|
break
|
|
|
|
if not scan_requested and e.code == 'NotFoundError':
|
|
|
|
self.client.scan_url(url)
|
|
|
|
scan_requested = True
|
|
|
|
time.sleep(5)
|