chg: [Modules] use same directory structure for cache in all modules.

pull/348/head
Raphaël Vinot 2022-02-03 12:38:44 +01:00
parent d1314aea0c
commit afc77126d4
3 changed files with 13 additions and 34 deletions

View File

@ -1,17 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import hashlib
import json import json
import time import time
from datetime import date from datetime import date
from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from har2tree import CrawledTree from har2tree import CrawledTree
from pyeupi import PyEUPI from pyeupi import PyEUPI
from ..default import ConfigError, get_homedir from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory
class PhishingInitiative(): class PhishingInitiative():
@ -35,13 +34,8 @@ class PhishingInitiative():
self.storage_dir_eupi = get_homedir() / 'eupi' self.storage_dir_eupi = get_homedir() / 'eupi'
self.storage_dir_eupi.mkdir(parents=True, exist_ok=True) self.storage_dir_eupi.mkdir(parents=True, exist_ok=True)
def __get_cache_directory(self, url: str) -> Path:
m = hashlib.md5()
m.update(url.encode())
return self.storage_dir_eupi / m.hexdigest()
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = self.__get_cache_directory(url) url_storage_dir = get_cache_directory(self.storage_dir_eupi, url)
if not url_storage_dir.exists(): if not url_storage_dir.exists():
return None return None
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True) cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
@ -76,7 +70,7 @@ class PhishingInitiative():
if not self.available: if not self.available:
raise ConfigError('PhishingInitiative not available, probably no API key') raise ConfigError('PhishingInitiative not available, probably no API key')
url_storage_dir = self.__get_cache_directory(url) url_storage_dir = get_cache_directory(self.storage_dir_eupi, url)
url_storage_dir.mkdir(parents=True, exist_ok=True) url_storage_dir.mkdir(parents=True, exist_ok=True)
pi_file = url_storage_dir / date.today().isoformat() pi_file = url_storage_dir / date.today().isoformat()

View File

@ -1,17 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import hashlib
import json import json
import logging import logging
from datetime import date from datetime import date
from pathlib import Path
from typing import Any, Dict from typing import Any, Dict
import requests import requests
from ..default import ConfigError, get_config, get_homedir from ..default import ConfigError, get_config, get_homedir
from ..helpers import get_useragent_for_requests from ..helpers import get_useragent_for_requests, get_cache_directory
class UrlScan(): class UrlScan():
@ -53,16 +51,10 @@ class UrlScan():
self.storage_dir_urlscan = get_homedir() / 'urlscan' self.storage_dir_urlscan = get_homedir() / 'urlscan'
self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True) self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True)
def __get_cache_directory(self, url: str, useragent: str, referer: str) -> Path:
m = hashlib.md5()
to_hash = f'{url}{useragent}{referer}'
m.update(to_hash.encode())
return self.storage_dir_urlscan / m.hexdigest()
def get_url_submission(self, capture_info: Dict[str, Any]) -> Dict[str, Any]: def get_url_submission(self, capture_info: Dict[str, Any]) -> Dict[str, Any]:
url_storage_dir = self.__get_cache_directory(capture_info['url'], url_storage_dir = get_cache_directory(
capture_info['user_agent'], self.storage_dir_urlscan,
capture_info['referer']) / 'submit' f'{capture_info["url"]}{capture_info["user_agent"]}{capture_info["referer"]}') / 'submit'
if not url_storage_dir.exists(): if not url_storage_dir.exists():
return {} return {}
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True) cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
@ -123,9 +115,9 @@ class UrlScan():
if not self.available: if not self.available:
raise ConfigError('UrlScan not available, probably no API key') raise ConfigError('UrlScan not available, probably no API key')
url_storage_dir = self.__get_cache_directory(capture_info['url'], url_storage_dir = get_cache_directory(
capture_info['user_agent'], self.storage_dir_urlscan,
capture_info['referer']) / 'submit' f'{capture_info["url"]}{capture_info["user_agent"]}{capture_info["referer"]}') / 'submit'
url_storage_dir.mkdir(parents=True, exist_ok=True) url_storage_dir.mkdir(parents=True, exist_ok=True)
urlscan_file_submit = url_storage_dir / date.today().isoformat() urlscan_file_submit = url_storage_dir / date.today().isoformat()

View File

@ -1,11 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import hashlib
import json import json
import time import time
from datetime import date from datetime import date
from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import vt # type: ignore import vt # type: ignore
@ -13,6 +11,7 @@ from har2tree import CrawledTree
from vt.error import APIError # type: ignore from vt.error import APIError # type: ignore
from ..default import ConfigError, get_homedir from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory
class VirusTotal(): class VirusTotal():
@ -36,14 +35,8 @@ class VirusTotal():
self.storage_dir_vt = get_homedir() / 'vt_url' self.storage_dir_vt = get_homedir() / 'vt_url'
self.storage_dir_vt.mkdir(parents=True, exist_ok=True) self.storage_dir_vt.mkdir(parents=True, exist_ok=True)
def __get_cache_directory(self, url: str) -> Path:
url_id = vt.url_id(url)
m = hashlib.md5()
m.update(url_id.encode())
return self.storage_dir_vt / m.hexdigest()
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = self.__get_cache_directory(url) url_storage_dir = get_cache_directory(self.storage_dir_vt, vt.url_id(url))
if not url_storage_dir.exists(): if not url_storage_dir.exists():
return None return None
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True) cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
@ -78,7 +71,7 @@ class VirusTotal():
if not self.available: if not self.available:
raise ConfigError('VirusTotal not available, probably no API key') raise ConfigError('VirusTotal not available, probably no API key')
url_storage_dir = self.__get_cache_directory(url) url_storage_dir = get_cache_directory(self.storage_dir_vt, vt.url_id(url))
url_storage_dir.mkdir(parents=True, exist_ok=True) url_storage_dir.mkdir(parents=True, exist_ok=True)
vt_file = url_storage_dir / date.today().isoformat() vt_file = url_storage_dir / date.today().isoformat()