chg: Add more entries to the cache, use it in the modules.

pull/559/head
Raphaël Vinot 2022-12-07 13:03:15 +01:00
parent 52076925b8
commit 94b3b487f3
11 changed files with 147 additions and 109 deletions

View File

@ -29,7 +29,8 @@ from .exceptions import MissingCaptureDirectory, NoValidHarFile, MissingUUID, Tr
class CaptureCache(): class CaptureCache():
__slots__ = ('uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir', __slots__ = ('uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir',
'error', 'incomplete_redirects', 'no_index', 'categories', 'parent') 'error', 'incomplete_redirects', 'no_index', 'categories', 'parent',
'user_agent', 'referer')
def __init__(self, cache_entry: Dict[str, Any]): def __init__(self, cache_entry: Dict[str, Any]):
__default_cache_keys: Tuple[str, str, str, str, str, str] = ('uuid', 'title', 'timestamp', __default_cache_keys: Tuple[str, str, str, str, str, str] = ('uuid', 'title', 'timestamp',
@ -61,6 +62,8 @@ class CaptureCache():
self.no_index: bool = True if cache_entry.get('no_index') in [1, '1'] else False self.no_index: bool = True if cache_entry.get('no_index') in [1, '1'] else False
self.categories: List[str] = json.loads(cache_entry['categories']) if cache_entry.get('categories') else [] self.categories: List[str] = json.loads(cache_entry['categories']) if cache_entry.get('categories') else []
self.parent: Optional[str] = cache_entry.get('parent') self.parent: Optional[str] = cache_entry.get('parent')
self.user_agent: Optional[str] = cache_entry.get('user_agent')
self.referer: Optional[str] = cache_entry.get('referer')
@property @property
def tree(self) -> CrawledTree: def tree(self) -> CrawledTree:
@ -308,6 +311,11 @@ class CapturesIndex(Mapping):
cache['url'] = har.root_url cache['url'] = har.root_url
cache['redirects'] = json.dumps(tree.redirects) cache['redirects'] = json.dumps(tree.redirects)
cache['incomplete_redirects'] = 0 cache['incomplete_redirects'] = 0
if har.root_referrer:
cache['referer'] = har.root_referrer
if har.root_user_agent:
# NOTE: This should always be the case (?)
cache['user_agent'] = har.root_user_agent
except Har2TreeError as e: except Har2TreeError as e:
cache['error'] = str(e) cache['error'] = str(e)
else: else:

View File

@ -14,6 +14,7 @@ from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import (Any, Dict, Iterable, List, MutableMapping, Optional, Set, from typing import (Any, Dict, Iterable, List, MutableMapping, Optional, Set,
Tuple, Union) Tuple, Union)
from urllib.parse import urlparse
from uuid import uuid4 from uuid import uuid4
from zipfile import ZipFile from zipfile import ZipFile
@ -190,10 +191,14 @@ class Lookyloo():
def get_info(self, capture_uuid: str, /) -> Dict[str, Any]: def get_info(self, capture_uuid: str, /) -> Dict[str, Any]:
'''Get basic information about the capture.''' '''Get basic information about the capture.'''
ct = self.get_crawled_tree(capture_uuid) cache = self.capture_cache(capture_uuid)
to_return = {'url': ct.root_url, 'title': ct.root_hartree.har.initial_title, if not cache:
'capture_time': ct.start_time.isoformat(), 'user_agent': ct.user_agent, # NOTE: Return an exception?
'referer': ct.referer if ct.referer else ''} return {}
to_return = {'url': cache.url, 'title': cache.title,
'capture_time': cache.timestamp.isoformat(),
'user_agent': cache.user_agent,
'referer': cache.referer if cache.referer else ''}
return to_return return to_return
def get_meta(self, capture_uuid: str, /) -> Dict[str, str]: def get_meta(self, capture_uuid: str, /) -> Dict[str, str]:
@ -280,65 +285,62 @@ class Lookyloo():
to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}, to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {},
'URLhaus': {}} 'URLhaus': {}}
capture_cache = self.capture_cache(capture_uuid) if cache := self.capture_cache(capture_uuid):
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['VirusTotal'] = self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) to_return['UrlScan'] = self.urlscan.capture_default_trigger(
to_return['UrlScan'] = self.urlscan.capture_default_trigger( cache,
self.get_info(capture_uuid), visibility='unlisted' if (cache and cache.no_index) else 'public',
visibility='unlisted' if (capture_cache and capture_cache.no_index) else 'public', force=force, auto_trigger=auto_trigger)
force=force, auto_trigger=auto_trigger) to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, auto_trigger=auto_trigger)
to_return['Phishtank'] = self.phishtank.capture_default_trigger(ct, auto_trigger=auto_trigger) to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, auto_trigger=auto_trigger)
to_return['URLhaus'] = self.urlhaus.capture_default_trigger(ct, auto_trigger=auto_trigger)
return to_return return to_return
def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]: def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
'''Get the responses of the modules from the cached responses on the disk''' '''Get the responses of the modules from the cached responses on the disk'''
try: cache = self.capture_cache(capture_uuid)
ct = self.get_crawled_tree(capture_uuid) if not cache:
except LookylooException: self.logger.warning(f'Unable to get the modules responses unless the capture {capture_uuid} is cached')
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
return None return None
to_return: Dict[str, Any] = {} to_return: Dict[str, Any] = {}
if self.vt.available: if self.vt.available:
to_return['vt'] = {} to_return['vt'] = {}
if ct.redirects: if cache.redirects:
for redirect in ct.redirects: for redirect in cache.redirects:
to_return['vt'][redirect] = self.vt.get_url_lookup(redirect) to_return['vt'][redirect] = self.vt.get_url_lookup(redirect)
else: else:
to_return['vt'][ct.root_hartree.har.root_url] = self.vt.get_url_lookup(ct.root_hartree.har.root_url) to_return['vt'][cache.url] = self.vt.get_url_lookup(cache.url)
if self.pi.available: if self.pi.available:
to_return['pi'] = {} to_return['pi'] = {}
if ct.redirects: if cache.redirects:
for redirect in ct.redirects: for redirect in cache.redirects:
to_return['pi'][redirect] = self.pi.get_url_lookup(redirect) to_return['pi'][redirect] = self.pi.get_url_lookup(redirect)
else: else:
to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url) to_return['pi'][cache.url] = self.pi.get_url_lookup(cache.url)
if self.phishtank.available: if self.phishtank.available:
to_return['phishtank'] = {'urls': {}, 'ips_hits': {}} to_return['phishtank'] = {'urls': {}, 'ips_hits': {}}
if ct.redirects: if cache.redirects:
for redirect in ct.redirects: for redirect in cache.redirects:
to_return['phishtank']['urls'][redirect] = self.phishtank.get_url_lookup(redirect) to_return['phishtank']['urls'][redirect] = self.phishtank.get_url_lookup(redirect)
else: else:
to_return['phishtank']['urls'][ct.root_hartree.har.root_url] = self.phishtank.get_url_lookup(ct.root_hartree.har.root_url) to_return['phishtank']['urls'][cache.url] = self.phishtank.get_url_lookup(cache.url)
ips_hits = self.phishtank.lookup_ips_capture(ct) ips_hits = self.phishtank.lookup_ips_capture(cache)
if ips_hits: if ips_hits:
to_return['phishtank']['ips_hits'] = ips_hits to_return['phishtank']['ips_hits'] = ips_hits
if self.urlhaus.available: if self.urlhaus.available:
to_return['urlhaus'] = {'urls': {}} to_return['urlhaus'] = {'urls': {}}
if ct.redirects: if cache.redirects:
for redirect in ct.redirects: for redirect in cache.redirects:
to_return['urlhaus']['urls'][redirect] = self.urlhaus.get_url_lookup(redirect) to_return['urlhaus']['urls'][redirect] = self.urlhaus.get_url_lookup(redirect)
else: else:
to_return['urlhaus']['urls'][ct.root_hartree.har.root_url] = self.urlhaus.get_url_lookup(ct.root_hartree.har.root_url) to_return['urlhaus']['urls'][cache.url] = self.urlhaus.get_url_lookup(cache.url)
if self.urlscan.available: if self.urlscan.available:
info = self.get_info(capture_uuid)
to_return['urlscan'] = {'submission': {}, 'result': {}} to_return['urlscan'] = {'submission': {}, 'result': {}}
to_return['urlscan']['submission'] = self.urlscan.get_url_submission(info) to_return['urlscan']['submission'] = self.urlscan.get_url_submission(cache)
if to_return['urlscan']['submission'] and 'uuid' in to_return['urlscan']['submission']: if to_return['urlscan']['submission'] and 'uuid' in to_return['urlscan']['submission']:
# The submission was done, try to get the results # The submission was done, try to get the results
result = self.urlscan.url_result(info) result = self.urlscan.url_result(cache)
if 'error' not in result: if 'error' not in result:
to_return['urlscan']['result'] = result to_return['urlscan']['result'] = result
return to_return return to_return
@ -348,16 +350,20 @@ class Lookyloo():
# by looking at Passive DNS systems, check if there are hits in the current capture # by looking at Passive DNS systems, check if there are hits in the current capture
# in another one and things like that. The trigger_modules method is for getting # in another one and things like that. The trigger_modules method is for getting
# information about the current status of the capture in other systems. # information about the current status of the capture in other systems.
try: cache = self.capture_cache(capture_uuid)
ct = self.get_crawled_tree(capture_uuid) if not cache:
except LookylooException: self.logger.warning(f'Unable to get the modules responses unless the capture {capture_uuid} is cached')
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
return {} return {}
to_return: Dict[str, Any] = {} to_return: Dict[str, Any] = {}
if self.riskiq.available: if self.riskiq.available:
try: try:
self.riskiq.capture_default_trigger(ct) self.riskiq.capture_default_trigger(cache)
to_return['riskiq'] = self.riskiq.get_passivedns(ct.root_hartree.rendered_node.hostname) if cache.redirects:
hostname = urlparse(cache.redirects[-1]).hostname
else:
hostname = urlparse(cache.url).hostname
if hostname:
to_return['riskiq'] = self.riskiq.get_passivedns(hostname)
except RiskIQError as e: except RiskIQError as e:
self.logger.warning(e.response.content) self.logger.warning(e.response.content)
return to_return return to_return
@ -843,12 +849,13 @@ class Lookyloo():
return vt_obj return vt_obj
def __misp_add_urlscan_to_event(self, capture_uuid: str, visibility: str) -> Optional[MISPAttribute]: def __misp_add_urlscan_to_event(self, capture_uuid: str, visibility: str) -> Optional[MISPAttribute]:
response = self.urlscan.url_submit(self.get_info(capture_uuid), visibility) if cache := self.capture_cache(capture_uuid):
if 'result' in response: response = self.urlscan.url_submit(cache, visibility)
attribute = MISPAttribute() if 'result' in response:
attribute.value = response['result'] attribute = MISPAttribute()
attribute.type = 'link' attribute.value = response['result']
return attribute attribute.type = 'link'
return attribute
return None return None
def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> Union[List[MISPEvent], Dict[str, str]]: def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> Union[List[MISPEvent], Dict[str, str]]:

View File

@ -12,6 +12,7 @@ from pymisp.tools import FileObject, URLObject
from ..default import get_config, get_homedir from ..default import get_config, get_homedir
from ..helpers import get_public_suffix_list from ..helpers import get_public_suffix_list
if TYPE_CHECKING: if TYPE_CHECKING:
from ..capturecache import CaptureCache from ..capturecache import CaptureCache

View File

@ -2,14 +2,16 @@
import json import json
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from typing import Any, Dict, Optional, List from typing import Any, Dict, Optional, List, TYPE_CHECKING
from har2tree import CrawledTree
from pyphishtanklookup import PhishtankLookup from pyphishtanklookup import PhishtankLookup
from ..default import ConfigError, get_homedir from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class Phishtank(): class Phishtank():
@ -42,8 +44,8 @@ class Phishtank():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def lookup_ips_capture(self, crawled_tree: CrawledTree) -> Dict[str, List[Dict[str, Any]]]: def lookup_ips_capture(self, cache: 'CaptureCache') -> Dict[str, List[Dict[str, Any]]]:
with (crawled_tree.root_hartree.har.path.parent / 'ips.json').open() as f: with (cache.capture_dir / 'ips.json').open() as f:
ips_dump = json.load(f) ips_dump = json.load(f)
to_return: Dict[str, List[Dict[str, Any]]] = {} to_return: Dict[str, List[Dict[str, Any]]] = {}
for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}: for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}:
@ -68,7 +70,7 @@ class Phishtank():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, cache: 'CaptureCache', /, *, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if not self.available:
return {'error': 'Module not available'} return {'error': 'Module not available'}
@ -76,18 +78,18 @@ class Phishtank():
return {'error': 'Auto trigger not allowed on module'} return {'error': 'Auto trigger not allowed on module'}
# Quit if the capture is more than 70h old, the data in phishtank expire around that time. # Quit if the capture is more than 70h old, the data in phishtank expire around that time.
if crawled_tree.start_time <= datetime.now(timezone.utc) - timedelta(hours=70): if cache.timestamp <= datetime.now(timezone.utc) - timedelta(hours=70):
return {'error': 'Capture to old, the response will be irrelevant.'} return {'error': 'Capture to old, the response will be irrelevant.'}
# Check URLs up to the redirect # Check URLs up to the redirect
if crawled_tree.redirects: if cache.redirects:
for redirect in crawled_tree.redirects: for redirect in cache.redirects:
self.url_lookup(redirect) self.url_lookup(redirect)
else: else:
self.url_lookup(crawled_tree.root_hartree.har.root_url) self.url_lookup(cache.url)
# Check all the IPs in the ips file of the capture # Check all the IPs in the ips file of the capture
with (crawled_tree.root_hartree.har.path.parent / 'ips.json').open() as f: with (cache.capture_dir / 'ips.json').open() as f:
ips_dump = json.load(f) ips_dump = json.load(f)
for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}: for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}:
self.ip_lookup(ip) self.ip_lookup(ip)

View File

@ -2,15 +2,18 @@
import json import json
import time import time
from datetime import date
from typing import Any, Dict, Optional
from har2tree import CrawledTree from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
from pyeupi import PyEUPI from pyeupi import PyEUPI
from ..default import ConfigError, get_homedir from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class PhishingInitiative(): class PhishingInitiative():
@ -44,18 +47,18 @@ class PhishingInitiative():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if not self.available:
return {'error': 'Module not available'} return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger: if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'} return {'error': 'Auto trigger not allowed on module'}
if crawled_tree.redirects: if cache.redirects:
for redirect in crawled_tree.redirects: for redirect in cache.redirects:
self.url_lookup(redirect, force) self.url_lookup(redirect, force)
else: else:
self.url_lookup(crawled_tree.root_hartree.har.root_url, force) self.url_lookup(cache.url, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def url_lookup(self, url: str, force: bool=False) -> None: def url_lookup(self, url: str, force: bool=False) -> None:

View File

@ -4,9 +4,9 @@ import json
import logging import logging
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union, TYPE_CHECKING
from urllib.parse import urlparse
from har2tree import CrawledTree
from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore
from requests import Response from requests import Response
@ -14,6 +14,9 @@ from ..default import ConfigError, get_homedir, get_config
from ..exceptions import ModuleError from ..exceptions import ModuleError
from ..helpers import get_cache_directory from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class RiskIQError(ModuleError): class RiskIQError(ModuleError):
@ -74,14 +77,22 @@ class RiskIQ():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if not self.available:
return {'error': 'Module not available'} return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger: if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'} return {'error': 'Auto trigger not allowed on module'}
self.pdns_lookup(crawled_tree.root_hartree.rendered_node.hostname, force) if cache.redirects:
hostname = urlparse(cache.redirects[-1]).hostname
else:
hostname = urlparse(cache.url).hostname
if not hostname:
return {'error': 'No hostname found.'}
self.pdns_lookup(hostname, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def pdns_lookup(self, hostname: str, force: bool=False, first_seen: Optional[Union[date, datetime]]=None) -> None: def pdns_lookup(self, hostname: str, force: bool=False, first_seen: Optional[Union[date, datetime]]=None) -> None:

View File

@ -2,15 +2,16 @@
import json import json
from datetime import date from datetime import date
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, TYPE_CHECKING
import requests import requests
from har2tree import CrawledTree
from ..default import ConfigError, get_homedir from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class URLhaus(): class URLhaus():
@ -45,7 +46,7 @@ class URLhaus():
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, cache: 'CaptureCache', /, *, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if not self.available:
return {'error': 'Module not available'} return {'error': 'Module not available'}
@ -53,11 +54,11 @@ class URLhaus():
return {'error': 'Auto trigger not allowed on module'} return {'error': 'Auto trigger not allowed on module'}
# Check URLs up to the redirect # Check URLs up to the redirect
if crawled_tree.redirects: if cache.redirects:
for redirect in crawled_tree.redirects: for redirect in cache.redirects:
self.url_lookup(redirect) self.url_lookup(redirect)
else: else:
self.url_lookup(crawled_tree.root_hartree.har.root_url) self.url_lookup(cache.url)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}

View File

@ -3,13 +3,16 @@
import json import json
import logging import logging
from datetime import date from datetime import date
from typing import Any, Dict from typing import Any, Dict, Optional, TYPE_CHECKING
import requests import requests
from ..default import ConfigError, get_config, get_homedir from ..default import ConfigError, get_config, get_homedir
from ..helpers import get_useragent_for_requests, get_cache_directory from ..helpers import get_useragent_for_requests, get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class UrlScan(): class UrlScan():
@ -50,10 +53,10 @@ class UrlScan():
self.storage_dir_urlscan = get_homedir() / 'urlscan' self.storage_dir_urlscan = get_homedir() / 'urlscan'
self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True) self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True)
def get_url_submission(self, capture_info: Dict[str, Any]) -> Dict[str, Any]: def get_url_submission(self, capture_info: 'CaptureCache') -> Dict[str, Any]:
url_storage_dir = get_cache_directory( url_storage_dir = get_cache_directory(
self.storage_dir_urlscan, self.storage_dir_urlscan,
f'{capture_info["url"]}{capture_info["user_agent"]}{capture_info["referer"]}', f'{capture_info.url}{capture_info.user_agent}{capture_info.referer}',
'submit') 'submit')
if not url_storage_dir.exists(): if not url_storage_dir.exists():
return {} return {}
@ -64,7 +67,7 @@ class UrlScan():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, capture_info: Dict[str, Any], /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, capture_info: 'CaptureCache', /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> Dict:
'''Run the module on the initial URL''' '''Run the module on the initial URL'''
if not self.available: if not self.available:
return {'error': 'Module not available'} return {'error': 'Module not available'}
@ -78,8 +81,8 @@ class UrlScan():
self.url_submit(capture_info, visibility, force) self.url_submit(capture_info, visibility, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def __submit_url(self, url: str, useragent: str, referer: str, visibility: str) -> Dict: def __submit_url(self, url: str, useragent: Optional[str], referer: Optional[str], visibility: str) -> Dict:
data = {'customagent': useragent, 'referer': referer} data = {'customagent': useragent if useragent else '', 'referer': referer if referer else ''}
if not url.startswith('http'): if not url.startswith('http'):
url = f'http://{url}' url = f'http://{url}'
@ -104,7 +107,7 @@ class UrlScan():
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def url_submit(self, capture_info: Dict[str, Any], visibility: str, force: bool=False) -> Dict: def url_submit(self, capture_info: 'CaptureCache', visibility: str, force: bool=False) -> Dict:
'''Lookup an URL on urlscan.io '''Lookup an URL on urlscan.io
Note: force means 2 things: Note: force means 2 things:
* (re)scan of the URL * (re)scan of the URL
@ -117,7 +120,7 @@ class UrlScan():
url_storage_dir = get_cache_directory( url_storage_dir = get_cache_directory(
self.storage_dir_urlscan, self.storage_dir_urlscan,
f'{capture_info["url"]}{capture_info["user_agent"]}{capture_info["referer"]}', f'{capture_info.url}{capture_info.user_agent}{capture_info.referer}',
'submit') 'submit')
url_storage_dir.mkdir(parents=True, exist_ok=True) url_storage_dir.mkdir(parents=True, exist_ok=True)
urlscan_file_submit = url_storage_dir / date.today().isoformat() urlscan_file_submit = url_storage_dir / date.today().isoformat()
@ -129,9 +132,9 @@ class UrlScan():
elif self.autosubmit: elif self.autosubmit:
# submit is allowed and we either force it, or it's just allowed # submit is allowed and we either force it, or it's just allowed
try: try:
response = self.__submit_url(capture_info['url'], response = self.__submit_url(capture_info.url,
capture_info['user_agent'], capture_info.user_agent,
capture_info['referer'], capture_info.referer,
visibility) visibility)
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
return {'error': e} return {'error': e}
@ -142,14 +145,14 @@ class UrlScan():
return response return response
return {'error': 'Submitting is not allowed by the configuration'} return {'error': 'Submitting is not allowed by the configuration'}
def url_result(self, capture_info: Dict[str, Any]): def url_result(self, capture_info: 'CaptureCache'):
'''Get the result from a submission.''' '''Get the result from a submission.'''
submission = self.get_url_submission(capture_info) submission = self.get_url_submission(capture_info)
if submission and 'uuid' in submission: if submission and 'uuid' in submission:
uuid = submission['uuid'] uuid = submission['uuid']
url_storage_dir_response = get_cache_directory( url_storage_dir_response = get_cache_directory(
self.storage_dir_urlscan, self.storage_dir_urlscan,
f'{capture_info["url"]}{capture_info["user_agent"]}{capture_info["referer"]}', f'{capture_info.url}{capture_info.user_agent}{capture_info.referer}',
'response') 'response')
url_storage_dir_response.mkdir(parents=True, exist_ok=True) url_storage_dir_response.mkdir(parents=True, exist_ok=True)
if (url_storage_dir_response / f'{uuid}.json').exists(): if (url_storage_dir_response / f'{uuid}.json').exists():

View File

@ -4,15 +4,17 @@ import json
import logging import logging
import time import time
from datetime import date from datetime import date
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, TYPE_CHECKING
import vt # type: ignore import vt # type: ignore
from har2tree import CrawledTree
from vt.error import APIError # type: ignore from vt.error import APIError # type: ignore
from ..default import ConfigError, get_homedir, get_config from ..default import ConfigError, get_homedir, get_config
from ..helpers import get_cache_directory from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class VirusTotal(): class VirusTotal():
@ -48,18 +50,18 @@ class VirusTotal():
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict: def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if not self.available:
return {'error': 'Module not available'} return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger: if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'} return {'error': 'Auto trigger not allowed on module'}
if crawled_tree.redirects: if cache.redirects:
for redirect in crawled_tree.redirects: for redirect in cache.redirects:
self.url_lookup(redirect, force) self.url_lookup(redirect, force)
else: else:
self.url_lookup(crawled_tree.root_hartree.har.root_url, force) self.url_lookup(cache.url, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def url_lookup(self, url: str, force: bool=False) -> None: def url_lookup(self, url: str, force: bool=False) -> None:

28
poetry.lock generated
View File

@ -281,7 +281,7 @@ tests = ["asttokens", "littleutils", "pytest", "rich"]
[[package]] [[package]]
name = "filelock" name = "filelock"
version = "3.8.1" version = "3.8.2"
description = "A platform independent file lock." description = "A platform independent file lock."
category = "main" category = "main"
optional = false optional = false
@ -409,7 +409,7 @@ tornado = ["tornado (>=0.2)"]
[[package]] [[package]]
name = "har2tree" name = "har2tree"
version = "1.16.4" version = "1.16.5"
description = "HTTP Archive (HAR) to ETE Toolkit generator" description = "HTTP Archive (HAR) to ETE Toolkit generator"
category = "main" category = "main"
optional = false optional = false
@ -427,7 +427,7 @@ numpy = [
] ]
publicsuffix2 = ">=2.20191221,<3.0" publicsuffix2 = ">=2.20191221,<3.0"
six = ">=1.16.0,<2.0.0" six = ">=1.16.0,<2.0.0"
w3lib = ">=2.0.1,<3.0.0" w3lib = ">=2.1.0,<3.0.0"
[package.extras] [package.extras]
docs = ["Sphinx (>=5.3.0,<6.0.0)"] docs = ["Sphinx (>=5.3.0,<6.0.0)"]
@ -466,7 +466,7 @@ testing = ["flake8 (<5)", "flufl.flake8", "importlib-resources (>=1.3)", "packag
[[package]] [[package]]
name = "importlib-resources" name = "importlib-resources"
version = "5.10.0" version = "5.10.1"
description = "Read resources from Python packages" description = "Read resources from Python packages"
category = "main" category = "main"
optional = false optional = false
@ -784,7 +784,7 @@ recaptcha = ["SpeechRecognition (>=3.9.0,<4.0.0)", "pydub (>=0.25.1,<0.26.0)", "
[[package]] [[package]]
name = "prompt-toolkit" name = "prompt-toolkit"
version = "3.0.33" version = "3.0.36"
description = "Library for building powerful interactive command lines in Python" description = "Library for building powerful interactive command lines in Python"
category = "dev" category = "dev"
optional = false optional = false
@ -1434,7 +1434,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools"
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.8,<3.12" python-versions = ">=3.8,<3.12"
content-hash = "56d9186cffb5ca1a5c1f12b02a97f5ab3a59f54c0274babfe3350acdbf7c23ba" content-hash = "cb74e1a5e4d1f303cca0486e3adfbf2dfe344db224377c5f0b364dc5fd96311c"
[metadata.files] [metadata.files]
aiohttp = [ aiohttp = [
@ -1662,8 +1662,8 @@ executing = [
{file = "executing-1.2.0.tar.gz", hash = "sha256:19da64c18d2d851112f09c287f8d3dbbdf725ab0e569077efb6cdcbd3497c107"}, {file = "executing-1.2.0.tar.gz", hash = "sha256:19da64c18d2d851112f09c287f8d3dbbdf725ab0e569077efb6cdcbd3497c107"},
] ]
filelock = [ filelock = [
{file = "filelock-3.8.1-py3-none-any.whl", hash = "sha256:3156639b1454b5f828255abf5710f7fc1e10dac69bde3e09e6189b29a91f2505"}, {file = "filelock-3.8.2-py3-none-any.whl", hash = "sha256:8df285554452285f79c035efb0c861eb33a4bcfa5b7a137016e32e6a90f9792c"},
{file = "filelock-3.8.1.tar.gz", hash = "sha256:9255d3cd8de8fcb2a441444f7a4f1949ae826da36cd070dc3e0c883614b4bbad"}, {file = "filelock-3.8.2.tar.gz", hash = "sha256:7565f628ea56bfcd8e54e42bdc55da899c85c1abfe1b5bcfd147e9188cebb3b2"},
] ]
filetype = [ filetype = [
{file = "filetype-1.2.0-py2.py3-none-any.whl", hash = "sha256:7ce71b6880181241cf7ac8697a2f1eb6a8bd9b429f7ad6d27b8db9ba5f1c2d25"}, {file = "filetype-1.2.0-py2.py3-none-any.whl", hash = "sha256:7ce71b6880181241cf7ac8697a2f1eb6a8bd9b429f7ad6d27b8db9ba5f1c2d25"},
@ -1831,8 +1831,8 @@ gunicorn = [
{file = "gunicorn-20.1.0.tar.gz", hash = "sha256:e0a968b5ba15f8a328fdfd7ab1fcb5af4470c28aaf7e55df02a99bc13138e6e8"}, {file = "gunicorn-20.1.0.tar.gz", hash = "sha256:e0a968b5ba15f8a328fdfd7ab1fcb5af4470c28aaf7e55df02a99bc13138e6e8"},
] ]
har2tree = [ har2tree = [
{file = "har2tree-1.16.4-py3-none-any.whl", hash = "sha256:2bc2862e5e9f5ea3d990d03adaf2bc3bb0fa202127383b0687904b26f1d280a2"}, {file = "har2tree-1.16.5-py3-none-any.whl", hash = "sha256:45721aae4b6dd3eabaa0699d0aa058d26c08a9864919db768053c50dff5b3d71"},
{file = "har2tree-1.16.4.tar.gz", hash = "sha256:a7c0f99e3babc94340e1c56f27ca344b9dc620c27bcd63cc4397f3fa41f98f06"}, {file = "har2tree-1.16.5.tar.gz", hash = "sha256:122b8a8769d230d121cdab1128e53b6f646be103d19e322cf1caf14746aa3f5b"},
] ]
hiredis = [ hiredis = [
{file = "hiredis-2.0.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:b4c8b0bc5841e578d5fb32a16e0c305359b987b850a06964bd5a62739d688048"}, {file = "hiredis-2.0.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:b4c8b0bc5841e578d5fb32a16e0c305359b987b850a06964bd5a62739d688048"},
@ -1886,8 +1886,8 @@ importlib-metadata = [
{file = "importlib_metadata-5.1.0.tar.gz", hash = "sha256:d5059f9f1e8e41f80e9c56c2ee58811450c31984dfa625329ffd7c0dad88a73b"}, {file = "importlib_metadata-5.1.0.tar.gz", hash = "sha256:d5059f9f1e8e41f80e9c56c2ee58811450c31984dfa625329ffd7c0dad88a73b"},
] ]
importlib-resources = [ importlib-resources = [
{file = "importlib_resources-5.10.0-py3-none-any.whl", hash = "sha256:ee17ec648f85480d523596ce49eae8ead87d5631ae1551f913c0100b5edd3437"}, {file = "importlib_resources-5.10.1-py3-none-any.whl", hash = "sha256:c09b067d82e72c66f4f8eb12332f5efbebc9b007c0b6c40818108c9870adc363"},
{file = "importlib_resources-5.10.0.tar.gz", hash = "sha256:c01b1b94210d9849f286b86bb51bcea7cd56dde0600d8db721d7b81330711668"}, {file = "importlib_resources-5.10.1.tar.gz", hash = "sha256:32bb095bda29741f6ef0e5278c42df98d135391bee5f932841efc0041f748dc3"},
] ]
ipython = [ ipython = [
{file = "ipython-8.7.0-py3-none-any.whl", hash = "sha256:352042ddcb019f7c04e48171b4dd78e4c4bb67bf97030d170e154aac42b656d9"}, {file = "ipython-8.7.0-py3-none-any.whl", hash = "sha256:352042ddcb019f7c04e48171b4dd78e4c4bb67bf97030d170e154aac42b656d9"},
@ -2326,8 +2326,8 @@ playwrightcapture = [
{file = "playwrightcapture-1.16.6.tar.gz", hash = "sha256:2b64a0b39aa4000b8e9ceb6fe61e485a426bd79b6d7e7e55005677b315c901b1"}, {file = "playwrightcapture-1.16.6.tar.gz", hash = "sha256:2b64a0b39aa4000b8e9ceb6fe61e485a426bd79b6d7e7e55005677b315c901b1"},
] ]
prompt-toolkit = [ prompt-toolkit = [
{file = "prompt_toolkit-3.0.33-py3-none-any.whl", hash = "sha256:ced598b222f6f4029c0800cefaa6a17373fb580cd093223003475ce32805c35b"}, {file = "prompt_toolkit-3.0.36-py3-none-any.whl", hash = "sha256:aa64ad242a462c5ff0363a7b9cfe696c20d55d9fc60c11fd8e632d064804d305"},
{file = "prompt_toolkit-3.0.33.tar.gz", hash = "sha256:535c29c31216c77302877d5120aef6c94ff573748a5b5ca5b1b1f76f5e700c73"}, {file = "prompt_toolkit-3.0.36.tar.gz", hash = "sha256:3e163f254bef5a03b146397d7c1963bd3e2812f0964bb9a24e6ec761fd28db63"},
] ]
ptyprocess = [ ptyprocess = [
{file = "ptyprocess-0.7.0-py2.py3-none-any.whl", hash = "sha256:4b41f3967fce3af57cc7e94b888626c18bf37a083e3651ca8feeb66d492fef35"}, {file = "ptyprocess-0.7.0-py2.py3-none-any.whl", hash = "sha256:4b41f3967fce3af57cc7e94b888626c18bf37a083e3651ca8feeb66d492fef35"},

View File

@ -60,7 +60,7 @@ pyhashlookup = "^1.2.1"
lief = "^0.12.3" lief = "^0.12.3"
ua-parser = "^0.16.1" ua-parser = "^0.16.1"
Flask-Login = "^0.6.2" Flask-Login = "^0.6.2"
har2tree = "^1.16.4" har2tree = "^1.16.5"
passivetotal = "^2.5.9" passivetotal = "^2.5.9"
werkzeug = "^2.2.2" werkzeug = "^2.2.2"
filetype = "^1.2.0" filetype = "^1.2.0"