new: Config to make modules admin only (on demand)

pull/1000/head
Raphaël Vinot 2024-11-22 17:31:39 +01:00
parent e1248cf47e
commit 988bbd296f
20 changed files with 223 additions and 220 deletions

View File

@ -38,10 +38,6 @@ class AsyncCapture(AbstractManager):
if not self.fox.available: if not self.fox.available:
self.logger.warning('Unable to setup the FOX module') self.logger.warning('Unable to setup the FOX module')
def thirdparty_submit(self, url: str) -> None:
if self.fox.available:
self.fox.capture_default_trigger(url, auto_trigger=True)
async def _trigger_captures(self) -> None: async def _trigger_captures(self) -> None:
# Only called if LacusCore is used # Only called if LacusCore is used
def clear_list_callback(task: Task) -> None: # type: ignore[type-arg] def clear_list_callback(task: Task) -> None: # type: ignore[type-arg]

View File

@ -3,26 +3,31 @@
"apikey": null, "apikey": null,
"trustenv": false, "trustenv": false,
"autosubmit": false, "autosubmit": false,
"allow_auto_trigger": false "allow_auto_trigger": false,
"admin_only": true
}, },
"PhishingInitiative": { "PhishingInitiative": {
"apikey": null, "apikey": null,
"autosubmit": false, "autosubmit": false,
"allow_auto_trigger": false "allow_auto_trigger": false,
"admin_only": true
}, },
"FOX": { "FOX": {
"apikey": null, "apikey": null,
"autosubmit": false, "autosubmit": false,
"allow_auto_trigger": false "allow_auto_trigger": false,
"admin_only": true
}, },
"Pandora": { "Pandora": {
"url": "http://127.0.0.1:6100", "url": "http://127.0.0.1:6100",
"autosubmit": false, "autosubmit": false,
"allow_auto_trigger": false "allow_auto_trigger": false,
"admin_only": false
}, },
"SaneJS": { "SaneJS": {
"enabled": true, "enabled": true,
"allow_auto_trigger": true "allow_auto_trigger": true,
"admin_only": false
}, },
"MultipleMISPs": { "MultipleMISPs": {
"default": "MISP", "default": "MISP",
@ -38,7 +43,8 @@
"source:lookyloo" "source:lookyloo"
], ],
"auto_publish": false, "auto_publish": false,
"allow_auto_trigger": false "allow_auto_trigger": false,
"admin_only": true
} }
} }
}, },
@ -46,43 +52,51 @@
"enabled": false, "enabled": false,
"ipaddress": "127.0.0.1", "ipaddress": "127.0.0.1",
"port": 4243, "port": 4243,
"allow_auto_trigger": true "allow_auto_trigger": true,
"admin_only": false
}, },
"UrlScan": { "UrlScan": {
"apikey": null, "apikey": null,
"autosubmit": false, "autosubmit": false,
"allow_auto_trigger": false, "allow_auto_trigger": false,
"force_visibility": false "force_visibility": false,
"admin_only": true
}, },
"Phishtank": { "Phishtank": {
"enabled": false, "enabled": false,
"url": "https://phishtankapi.circl.lu/", "url": "https://phishtankapi.circl.lu/",
"allow_auto_trigger": true "allow_auto_trigger": true,
"admin_only": false
}, },
"URLhaus": { "URLhaus": {
"enabled": false, "enabled": false,
"url": "https://urlhaus-api.abuse.ch/v1/", "url": "https://urlhaus-api.abuse.ch/v1/",
"allow_auto_trigger": true "allow_auto_trigger": true,
"admin_only": false
}, },
"Hashlookup": { "Hashlookup": {
"enabled": false, "enabled": false,
"url": "https://hashlookup.circl.lu/", "url": "https://hashlookup.circl.lu/",
"allow_auto_trigger": true "allow_auto_trigger": true,
"admin_only": false
}, },
"RiskIQ": { "RiskIQ": {
"user": null, "user": null,
"apikey": null, "apikey": null,
"allow_auto_trigger": false, "allow_auto_trigger": false,
"default_first_seen_in_days": 5 "default_first_seen_in_days": 5,
"admin_only": true
}, },
"CIRCLPDNS": { "CIRCLPDNS": {
"user": null, "user": null,
"password": null, "password": null,
"allow_auto_trigger": false "allow_auto_trigger": true,
"admin_only": false
}, },
"_notes": { "_notes": {
"apikey": "null disables the module. Pass a string otherwise.", "apikey": "null disables the module. Pass a string otherwise.",
"autosubmit": "Automatically submits the URL to the 3rd party service.", "autosubmit": "Automatically submits the URL to the 3rd party service.",
"admin_only": "Querying that module is only allowed to logged-in users (generally because the API keys have limits).",
"allow_auto_trigger": "Allow auto trigger per module: some (i.e. VT) can be very expensive", "allow_auto_trigger": "Allow auto trigger per module: some (i.e. VT) can be very expensive",
"VirusTotal": "Module to query Virustotal: https://www.virustotal.com/", "VirusTotal": "Module to query Virustotal: https://www.virustotal.com/",
"PhishingInitiative": "Module to query phishing initiative: https://phishing-initiative.fr/contrib/", "PhishingInitiative": "Module to query phishing initiative: https://phishing-initiative.fr/contrib/",

View File

@ -146,14 +146,11 @@ class CapturesIndex(Mapping): # type: ignore[type-arg]
# Unable to setup IPASN History # Unable to setup IPASN History
self.logger.warning(f'Unable to setup IPASN History: {e}') self.logger.warning(f'Unable to setup IPASN History: {e}')
self.ipasnhistory = None self.ipasnhistory = None
try: self.cloudflare: Cloudflare = Cloudflare()
self.cloudflare: Cloudflare | None = Cloudflare() if not self.cloudflare.available:
if not self.cloudflare.available: self.logger.warning('Unable to setup Cloudflare.')
self.cloudflare = None else:
self.logger.info('Cloudflare ready') self.logger.info('Cloudflare ready')
except Exception as e:
self.logger.warning(f'Unable to setup Cloudflare: {e}')
self.cloudflare = None
@property @property
def cached_captures(self) -> set[str]: def cached_captures(self) -> set[str]:
@ -722,8 +719,7 @@ class CapturesIndex(Mapping): # type: ignore[type-arg]
continue continue
# check if the resolved IPs are cloudflare IPs # check if the resolved IPs are cloudflare IPs
if self.cloudflare: if self.cloudflare.available:
# we just want the cloudflare IPs
if hits := {ip: hit for ip, hit in self.cloudflare.ips_lookup(_all_nodes_ips).items() if hit}: if hits := {ip: hit for ip, hit in self.cloudflare.ips_lookup(_all_nodes_ips).items() if hit}:
node.add_feature('cloudflare', hits) node.add_feature('cloudflare', hits)

View File

@ -132,7 +132,7 @@ class Lookyloo():
self.phishtank = Phishtank(config_name='Phishtank') self.phishtank = Phishtank(config_name='Phishtank')
self.hashlookup = Hashlookup(config_name='Hashlookup') self.hashlookup = Hashlookup(config_name='Hashlookup')
self.riskiq = RiskIQ(config_name='RiskIQ') self.riskiq = RiskIQ(config_name='RiskIQ')
self.pandora = Pandora(config_name='Pandora') self.pandora = Pandora()
self.urlhaus = URLhaus(config_name='URLhaus') self.urlhaus = URLhaus(config_name='URLhaus')
self.circl_pdns = CIRCLPDNS(config_name='CIRCLPDNS') self.circl_pdns = CIRCLPDNS(config_name='CIRCLPDNS')
@ -365,30 +365,24 @@ class Lookyloo():
if get_config('generic', 'index_everything'): if get_config('generic', 'index_everything'):
get_indexing(full=True).reindex_categories_capture(capture_uuid) get_indexing(full=True).reindex_categories_capture(capture_uuid)
def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> dict[str, Any]: def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False, *, as_admin: bool=False) -> dict[str, Any]:
'''Launch the 3rd party modules on a capture. '''Launch the 3rd party modules on a capture.
It uses the cached result *if* the module was triggered the same day. It uses the cached result *if* the module was triggered the same day.
The `force` flag re-triggers the module regardless of the cache.''' The `force` flag re-triggers the module regardless of the cache.'''
try: cache = self.capture_cache(capture_uuid)
ct = self.get_crawled_tree(capture_uuid) if not cache:
except LookylooException:
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_uuid}) is cached.')
return {'error': f'UUID {capture_uuid} is either unknown or the tree is not ready yet.'} return {'error': f'UUID {capture_uuid} is either unknown or the tree is not ready yet.'}
self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) self.uwhois.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
self.hashlookup.capture_default_trigger(ct, auto_trigger=auto_trigger) self.hashlookup.capture_default_trigger(cache, auto_trigger=auto_trigger)
to_return: dict[str, dict[str, Any]] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}, to_return: dict[str, dict[str, Any]] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {},
'URLhaus': {}} 'URLhaus': {}}
if cache := self.capture_cache(capture_uuid): to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) to_return['UrlScan'] = self.urlscan.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['UrlScan'] = self.urlscan.capture_default_trigger( to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, auto_trigger=auto_trigger)
cache, to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, auto_trigger=auto_trigger)
visibility='unlisted' if (cache and cache.no_index) else 'public',
force=force, auto_trigger=auto_trigger)
to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, auto_trigger=auto_trigger)
to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, auto_trigger=auto_trigger)
return to_return return to_return
def get_modules_responses(self, capture_uuid: str, /) -> dict[str, Any]: def get_modules_responses(self, capture_uuid: str, /) -> dict[str, Any]:
@ -1138,7 +1132,6 @@ class Lookyloo():
if not urls: if not urls:
return None return None
url = urls[0] url = urls[0]
self.vt.url_lookup(url.value)
report = self.vt.get_url_lookup(url.value) report = self.vt.get_url_lookup(url.value)
if not report: if not report:
return None return None
@ -1149,9 +1142,9 @@ class Lookyloo():
obj.add_reference(vt_obj, 'analysed-with') obj.add_reference(vt_obj, 'analysed-with')
return vt_obj return vt_obj
def __misp_add_urlscan_to_event(self, capture_uuid: str, visibility: str) -> MISPAttribute | None: def __misp_add_urlscan_to_event(self, capture_uuid: str) -> MISPAttribute | None:
if cache := self.capture_cache(capture_uuid): if cache := self.capture_cache(capture_uuid):
response = self.urlscan.url_submit(cache, visibility) response = self.urlscan.url_result(cache)
if 'result' in response: if 'result' in response:
attribute = MISPAttribute() attribute = MISPAttribute()
attribute.value = response['result'] attribute.value = response['result']
@ -1159,7 +1152,7 @@ class Lookyloo():
return attribute return attribute
return None return None
def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> list[MISPEvent] | dict[str, str]: def misp_export(self, capture_uuid: str, /, with_parent: bool=False, *, as_admin: bool=False) -> list[MISPEvent] | dict[str, str]:
'''Export a capture in MISP format. You can POST the return of this method '''Export a capture in MISP format. You can POST the return of this method
directly to a MISP instance and it will create an event.''' directly to a MISP instance and it will create an event.'''
cache = self.capture_cache(capture_uuid) cache = self.capture_cache(capture_uuid)
@ -1187,12 +1180,16 @@ class Lookyloo():
event.objects[-1].add_reference(screenshot, 'rendered-as', 'Screenshot of the page') event.objects[-1].add_reference(screenshot, 'rendered-as', 'Screenshot of the page')
if self.vt.available: if self.vt.available:
for e_obj in event.objects: response = self.vt.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin)
if e_obj.name != 'url': if 'error' in response:
continue self.logger.warning(f'Unable to trigger VT: {response["error"]}')
vt_obj = self.__misp_add_vt_to_URLObject(e_obj) else:
if vt_obj: for e_obj in event.objects:
event.add_object(vt_obj) if e_obj.name != 'url':
continue
vt_obj = self.__misp_add_vt_to_URLObject(e_obj)
if vt_obj:
event.add_object(vt_obj)
if self.phishtank.available: if self.phishtank.available:
for e_obj in event.objects: for e_obj in event.objects:
@ -1208,11 +1205,13 @@ class Lookyloo():
e_obj.add_reference(pt_attribute, 'known-as', 'Permalink on Phishtank') e_obj.add_reference(pt_attribute, 'known-as', 'Permalink on Phishtank')
if self.urlscan.available: if self.urlscan.available:
urlscan_attribute = self.__misp_add_urlscan_to_event( response = self.urlscan.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin)
capture_uuid, if 'error' in response:
visibility='unlisted' if (cache and cache.no_index) else 'public') self.logger.warning(f'Unable to trigger URLScan: {response["error"]}')
if urlscan_attribute: else:
event.add_attribute(**urlscan_attribute) urlscan_attribute = self.__misp_add_urlscan_to_event(capture_uuid)
if urlscan_attribute:
event.add_attribute(**urlscan_attribute)
if with_parent and cache.parent: if with_parent and cache.parent:
parent = self.misp_export(cache.parent, with_parent) parent = self.misp_export(cache.parent, with_parent)
@ -1262,15 +1261,17 @@ class Lookyloo():
return {h: {node.name for node in nodes} for h, nodes in hashes.items()} return {h: {node.name for node in nodes} for h, nodes in hashes.items()}
return hashes return hashes
def merge_hashlookup_tree(self, tree_uuid: str, /) -> tuple[dict[str, dict[str, Any]], int]: def merge_hashlookup_tree(self, tree_uuid: str, /, as_admin: bool=False) -> tuple[dict[str, dict[str, Any]], int]:
if not self.hashlookup.available: if not self.hashlookup.available:
raise LookylooException('Hashlookup module not enabled.') raise LookylooException('Hashlookup module not enabled.')
cache = self.capture_cache(tree_uuid)
if not cache:
raise LookylooException(f'Capture {tree_uuid} not ready.')
hashes_tree = self.get_hashes_with_context(tree_uuid, algorithm='sha1') hashes_tree = self.get_hashes_with_context(tree_uuid, algorithm='sha1')
hashlookup_file = self._captures_index[tree_uuid].capture_dir / 'hashlookup.json' hashlookup_file = cache.capture_dir / 'hashlookup.json'
if not hashlookup_file.exists(): if not hashlookup_file.exists():
ct = self.get_crawled_tree(tree_uuid) self.hashlookup.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin)
self.hashlookup.capture_default_trigger(ct, auto_trigger=False)
if not hashlookup_file.exists(): if not hashlookup_file.exists():
# no hits on hashlookup # no hits on hashlookup

View File

@ -8,6 +8,7 @@ from abc import ABC, abstractmethod
from typing import Any from typing import Any
from ..default import get_config from ..default import get_config
from ..capturecache import CaptureCache
logging.config.dictConfig(get_config('logging')) logging.config.dictConfig(get_config('logging'))
@ -16,7 +17,7 @@ class AbstractModule(ABC):
'''Just a simple abstract for the modules to catch issues with initialization''' '''Just a simple abstract for the modules to catch issues with initialization'''
def __init__(self, /, *, config_name: str | None=None, def __init__(self, /, *, config_name: str | None=None,
config: dict[str, Any] | None=None): config: dict[str, Any] | None=None) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel')) self.logger.setLevel(get_config('generic', 'loglevel'))
self.config: dict[str, Any] = {} self.config: dict[str, Any] = {}
@ -30,11 +31,28 @@ class AbstractModule(ABC):
elif config: elif config:
self.config = config self.config = config
# Make all module admin only by default. It can be changed in the config file for each module.
self._admin_only = bool(self.config.pop('admin_only', True))
# Default keys in all the modules (if relevant)
self._autosubmit = bool(self.config.pop('autosubmit', False))
self._allow_auto_trigger = bool(self.config.pop('allow_auto_trigger', False))
try: try:
self._available = self.module_init() self._available = self.module_init()
except Exception as e: except Exception as e:
self.logger.warning(f'Unable to initialize module: {e}.') self.logger.warning(f'Unable to initialize module: {e}.')
@property
def admin_only(self) -> bool:
return self._admin_only
@property
def autosubmit(self) -> bool:
return self._autosubmit
@property
def allow_auto_trigger(self) -> bool:
return self._allow_auto_trigger
@property @property
def available(self) -> bool: def available(self) -> bool:
return self._available return self._available
@ -42,3 +60,13 @@ class AbstractModule(ABC):
@abstractmethod @abstractmethod
def module_init(self) -> bool: def module_init(self) -> bool:
... ...
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}
if self.admin_only and not as_admin:
return {'error': 'Admin only module'}
return {}

View File

@ -28,8 +28,6 @@ class CIRCLPDNS(AbstractModule):
self.pypdns = PyPDNS(basic_auth=(self.config['user'], self.config['password'])) self.pypdns = PyPDNS(basic_auth=(self.config['user'], self.config['password']))
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir_pypdns = get_homedir() / 'circl_pypdns' self.storage_dir_pypdns = get_homedir() / 'circl_pypdns'
self.storage_dir_pypdns.mkdir(parents=True, exist_ok=True) self.storage_dir_pypdns.mkdir(parents=True, exist_ok=True)
return True return True
@ -46,12 +44,12 @@ class CIRCLPDNS(AbstractModule):
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return [PDNSRecord(record) for record in json.load(f)] return [PDNSRecord(record) for record in json.load(f)]
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return {'error': 'Module not available'} return error
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}
if cache.url.startswith('file'): if cache.url.startswith('file'):
return {'error': 'CIRCL Passive DNS does not support files.'} return {'error': 'CIRCL Passive DNS does not support files.'}
@ -63,10 +61,10 @@ class CIRCLPDNS(AbstractModule):
if not hostname: if not hostname:
return {'error': 'No hostname found.'} return {'error': 'No hostname found.'}
self.pdns_lookup(hostname, force) self.__pdns_lookup(hostname, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def pdns_lookup(self, hostname: str, force: bool=False) -> None: def __pdns_lookup(self, hostname: str, force: bool=False) -> None:
'''Lookup an hostname on CIRCL Passive DNS '''Lookup an hostname on CIRCL Passive DNS
Note: force means re-fetch the entry even if we already did it today Note: force means re-fetch the entry even if we already did it today
''' '''

View File

@ -3,43 +3,45 @@
from __future__ import annotations from __future__ import annotations
import ipaddress import ipaddress
import logging
import requests import requests
from ..default import ConfigError from ..default import get_config, LookylooException
from .abstractmodule import AbstractModule
class Cloudflare(AbstractModule): class Cloudflare():
'''This module checks if an IP is announced by Cloudflare.''' '''This module checks if an IP is announced by Cloudflare.'''
def module_init(self) -> bool: def __init__(self) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
session = requests.Session()
# Get IPv4 # Get IPv4
try: try:
r = requests.get('https://www.cloudflare.com/ips-v4') r = session.get('https://www.cloudflare.com/ips-v4', timeout=2)
r.raise_for_status() r.raise_for_status()
ipv4_list = r.text ipv4_list = r.text
except Exception as e: except Exception as e:
self.logger.warning(f'Unable to get Cloudflare IPv4 list: {e}') self.logger.warning(f'Unable to get Cloudflare IPv4 list: {e}')
return False self.available = False
# Get IPv6 # Get IPv6
try: try:
r = requests.get('https://www.cloudflare.com/ips-v6') r = session.get('https://www.cloudflare.com/ips-v6', timeout=2)
r.raise_for_status() r.raise_for_status()
ipv6_list = r.text ipv6_list = r.text
except Exception as e: except Exception as e:
self.logger.warning(f'Unable to get Cloudflare IPv6 list: {e}') self.logger.warning(f'Unable to get Cloudflare IPv6 list: {e}')
return False self.available = False
self.v4_list = [ipaddress.ip_network(net) for net in ipv4_list.split('\n')] self.v4_list = [ipaddress.ip_network(net) for net in ipv4_list.split('\n')]
self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')] self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')]
return True self.available = True
def ips_lookup(self, ips: set[str]) -> dict[str, bool]: def ips_lookup(self, ips: set[str]) -> dict[str, bool]:
'''Lookup a list of IPs. True means it is a known Cloudflare IP''' '''Lookup a list of IPs. True means it is a known Cloudflare IP'''
if not self.available: if not self.available:
raise ConfigError('Hashlookup not available, probably not enabled.') raise LookylooException('Cloudflare not available.')
to_return: dict[str, bool] = {} to_return: dict[str, bool] = {}
for ip_s, ip_p in [(ip, ipaddress.ip_address(ip)) for ip in ips]: for ip_s, ip_p in [(ip, ipaddress.ip_address(ip)) for ip in ips]:

View File

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any, TYPE_CHECKING
import requests import requests
@ -11,6 +11,9 @@ from ..helpers import get_useragent_for_requests
from .abstractmodule import AbstractModule from .abstractmodule import AbstractModule
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class FOX(AbstractModule): class FOX(AbstractModule):
@ -19,33 +22,24 @@ class FOX(AbstractModule):
self.logger.info('No API key.') self.logger.info('No API key.')
return False return False
self.autosubmit = False
self.allow_auto_trigger = False
self.client = requests.session() self.client = requests.session()
self.client.headers['User-Agent'] = get_useragent_for_requests() self.client.headers['User-Agent'] = get_useragent_for_requests()
self.client.headers['X-API-KEY'] = self.config['apikey'] self.client.headers['X-API-KEY'] = self.config['apikey']
self.client.headers['Content-Type'] = 'application/json' self.client.headers['Content-Type'] = 'application/json'
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.autosubmit = bool(self.config.get('autosubmit', False))
return True return True
def capture_default_trigger(self, url: str, /, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on the initial URL''' '''Run the module on the initial URL'''
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:
# NOTE: if auto_trigger is true, it means the request comes from the
# auto trigger feature (disabled by default)
# Each module can disable auto-trigger to avoid depleating the
# API limits.
return {'error': 'Auto trigger not allowed on module'}
self.url_submit(url) if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return error
self.__url_submit(cache.url)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def __submit_url(self, url: str, ) -> bool: def __submit_url(self, url: str) -> bool:
if not url.startswith('http'): if not url.startswith('http'):
url = f'http://{url}' url = f'http://{url}'
data = {'url': url} data = {'url': url}
@ -54,7 +48,7 @@ class FOX(AbstractModule):
response.raise_for_status() response.raise_for_status()
return True return True
def url_submit(self, url: str) -> dict[str, Any]: def __url_submit(self, url: str) -> dict[str, Any]:
'''Submit a URL to FOX '''Submit a URL to FOX
''' '''
if not self.available: if not self.available:

View File

@ -4,7 +4,8 @@ from __future__ import annotations
import json import json
from har2tree import CrawledTree from typing import TYPE_CHECKING
from pyhashlookup import Hashlookup from pyhashlookup import Hashlookup
from ..default import ConfigError from ..default import ConfigError
@ -12,6 +13,9 @@ from ..helpers import get_useragent_for_requests
from .abstractmodule import AbstractModule from .abstractmodule import AbstractModule
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class HashlookupModule(AbstractModule): class HashlookupModule(AbstractModule):
'''This module is a bit different as it will trigger a lookup of all the hashes '''This module is a bit different as it will trigger a lookup of all the hashes
@ -28,22 +32,19 @@ class HashlookupModule(AbstractModule):
self.client = Hashlookup(useragent=get_useragent_for_requests()) self.client = Hashlookup(useragent=get_useragent_for_requests())
# Makes sure the webservice is reachable, raises an exception otherwise. # Makes sure the webservice is reachable, raises an exception otherwise.
self.client.info() self.client.info()
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
return True return True
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return {'error': 'Module not available'} return error
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}
store_file = crawled_tree.root_hartree.har.path.parent / 'hashlookup.json' store_file = cache.tree.root_hartree.har.path.parent / 'hashlookup.json'
if store_file.exists(): if store_file.exists():
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
hashes = crawled_tree.root_hartree.build_all_hashes('sha1') hashes = cache.tree.root_hartree.build_all_hashes('sha1')
hits_hashlookup = self.hashes_lookup(list(hashes.keys())) hits_hashlookup = self.hashes_lookup(list(hashes.keys()))
if hits_hashlookup: if hits_hashlookup:

View File

@ -177,7 +177,6 @@ class MISP(AbstractModule):
self.enable_lookup = bool(self.config.get('enable_lookup', False)) self.enable_lookup = bool(self.config.get('enable_lookup', False))
self.enable_push = bool(self.config.get('enable_push', False)) self.enable_push = bool(self.config.get('enable_push', False))
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.default_tags: list[str] = self.config.get('default_tags') # type: ignore[assignment] self.default_tags: list[str] = self.config.get('default_tags') # type: ignore[assignment]
self.auto_publish = bool(self.config.get('auto_publish', False)) self.auto_publish = bool(self.config.get('auto_publish', False))

View File

@ -2,48 +2,37 @@
from __future__ import annotations from __future__ import annotations
import logging
from io import BytesIO from io import BytesIO
from typing import Any from typing import Any
from pypandora import PyPandora from pypandora import PyPandora
from ..default import ConfigError from ..default import get_config, LookylooException
from ..helpers import get_useragent_for_requests from ..helpers import get_useragent_for_requests
from .abstractmodule import AbstractModule
class Pandora():
class Pandora(AbstractModule): def __init__(self) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}')
def module_init(self) -> bool: self.logger.setLevel(get_config('generic', 'loglevel'))
self.config = get_config('modules', 'Pandora')
if not self.config.get('url'): if not self.config.get('url'):
self.logger.info('No URL in config.') self.logger.info('No URL in config.')
return False self.available = False
self.client = PyPandora(root_url=self.config['url'], useragent=get_useragent_for_requests()) self.client = PyPandora(root_url=self.config['url'], useragent=get_useragent_for_requests())
if not self.client.is_up: if not self.client.is_up:
self.logger.warning('Not up.') self.logger.warning('Not up.')
return False self.available = False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) self.available = False
return True
def capture_default_trigger(self, file_in_memory: BytesIO, filename: str, /, auto_trigger: bool=False) -> dict[str, str]:
'''Automatically submit the file if the landing URL is a file instead of a webpage'''
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:
# NOTE: if auto_trigger is true, it means the request comes from the
# auto trigger feature (disabled by default)
return {'error': 'Auto trigger not allowed on module'}
self.submit_file(file_in_memory, filename)
return {'success': 'Module triggered'}
def submit_file(self, file_in_memory: BytesIO, filename: str) -> dict[str, Any]: def submit_file(self, file_in_memory: BytesIO, filename: str) -> dict[str, Any]:
'''Submit a file to Pandora''' '''Submit a file to Pandora'''
if not self.available: if not self.available:
raise ConfigError('Pandora not available, probably not able to reach the server.') raise LookylooException('Pandora not available, probably not able to reach the server.')
return self.client.submit(file_in_memory, filename, seed_expire=0) return self.client.submit(file_in_memory, filename, seed_expire=0)

View File

@ -34,8 +34,6 @@ class Phishtank(AbstractModule):
self.logger.warning('Not up.') self.logger.warning('Not up.')
return False return False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir_pt = get_homedir() / 'phishtank' self.storage_dir_pt = get_homedir() / 'phishtank'
self.storage_dir_pt.mkdir(parents=True, exist_ok=True) self.storage_dir_pt.mkdir(parents=True, exist_ok=True)
return True return True
@ -80,12 +78,11 @@ class Phishtank(AbstractModule):
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool = False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return {'error': 'Module not available'} return error
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}
# Quit if the capture is more than 70h old, the data in phishtank expire around that time. # Quit if the capture is more than 70h old, the data in phishtank expire around that time.
if cache.timestamp <= datetime.now(timezone.utc) - timedelta(hours=70): if cache.timestamp <= datetime.now(timezone.utc) - timedelta(hours=70):
@ -94,9 +91,9 @@ class Phishtank(AbstractModule):
# Check URLs up to the redirect # Check URLs up to the redirect
if cache.redirects: if cache.redirects:
for redirect in cache.redirects: for redirect in cache.redirects:
self.url_lookup(redirect) self.__url_lookup(redirect)
else: else:
self.url_lookup(cache.url) self.__url_lookup(cache.url)
# Check all the IPs in the ips file of the capture # Check all the IPs in the ips file of the capture
ips_file = cache.capture_dir / 'ips.json' ips_file = cache.capture_dir / 'ips.json'
@ -105,10 +102,10 @@ class Phishtank(AbstractModule):
with ips_file.open() as f: with ips_file.open() as f:
ips_dump = json.load(f) ips_dump = json.load(f)
for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}: for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}:
self.ip_lookup(ip) self.__ip_lookup(ip)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def ip_lookup(self, ip: str) -> None: def __ip_lookup(self, ip: str) -> None:
'''Lookup for the URLs related to an IP on Phishtank lookup '''Lookup for the URLs related to an IP on Phishtank lookup
Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day. Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day.
''' '''
@ -134,9 +131,9 @@ class Phishtank(AbstractModule):
with pt_file.open('w') as _f: with pt_file.open('w') as _f:
json.dump(to_dump, _f) json.dump(to_dump, _f)
for url in urls: for url in urls:
self.url_lookup(url) self.__url_lookup(url)
def url_lookup(self, url: str) -> None: def __url_lookup(self, url: str) -> None:
'''Lookup an URL on Phishtank lookup '''Lookup an URL on Phishtank lookup
Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day. Note: It will trigger a request to phishtank every time *until* there is a hit (it's cheap), then once a day.
''' '''

View File

@ -26,12 +26,8 @@ class PhishingInitiative(AbstractModule):
self.logger.info('No API key') self.logger.info('No API key')
return False return False
self.allow_auto_trigger = False
self.client = PyEUPI(self.config['apikey']) self.client = PyEUPI(self.config['apikey'])
self.autosubmit = self.config.get('autosubmit', False)
self.allow_auto_trigger = self.config.get('allow_auto_trigger', False)
self.storage_dir_eupi = get_homedir() / 'eupi' self.storage_dir_eupi = get_homedir() / 'eupi'
self.storage_dir_eupi.mkdir(parents=True, exist_ok=True) self.storage_dir_eupi.mkdir(parents=True, exist_ok=True)
return True return True
@ -47,21 +43,21 @@ class PhishingInitiative(AbstractModule):
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available:
return {'error': 'Module not available'} if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
if auto_trigger and not self.allow_auto_trigger: return error
return {'error': 'Auto trigger not allowed on module'}
if cache.redirects: if cache.redirects:
for redirect in cache.redirects: for redirect in cache.redirects:
self.url_lookup(redirect, force) self.__url_lookup(redirect, force)
else: else:
self.url_lookup(cache.url, force) self.__url_lookup(cache.url, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def url_lookup(self, url: str, force: bool=False) -> None: def __url_lookup(self, url: str, force: bool=False) -> None:
'''Lookup an URL on Phishing Initiative '''Lookup an URL on Phishing Initiative
Note: force means 2 things: Note: force means 2 things:
* (re)scan of the URL * (re)scan of the URL

View File

@ -51,7 +51,6 @@ class RiskIQ(AbstractModule):
self.logger.warning(f'RiskIQ not available: {details}') self.logger.warning(f'RiskIQ not available: {details}')
return False return False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.default_first_seen = self.config.get('default_first_seen_in_days', 5) self.default_first_seen = self.config.get('default_first_seen_in_days', 5)
self.storage_dir_riskiq = get_homedir() / 'riskiq' self.storage_dir_riskiq = get_homedir() / 'riskiq'
@ -70,12 +69,13 @@ class RiskIQ(AbstractModule):
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available:
return {'error': 'Module not available'} if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
if auto_trigger and not self.allow_auto_trigger: return error
return {'error': 'Auto trigger not allowed on module'}
if cache.url.startswith('file'): if cache.url.startswith('file'):
return {'error': 'RiskIQ does not support files.'} return {'error': 'RiskIQ does not support files.'}
@ -87,10 +87,10 @@ class RiskIQ(AbstractModule):
if not hostname: if not hostname:
return {'error': 'No hostname found.'} return {'error': 'No hostname found.'}
self.pdns_lookup(hostname, force) self.__pdns_lookup(hostname, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def pdns_lookup(self, hostname: str, force: bool=False, first_seen: date | datetime | None=None) -> None: def __pdns_lookup(self, hostname: str, force: bool=False, first_seen: date | datetime | None=None) -> None:
'''Lookup an hostname on RiskIQ Passive DNS '''Lookup an hostname on RiskIQ Passive DNS
Note: force means re-fetch the entry RiskIQ even if we already did it today Note: force means re-fetch the entry RiskIQ even if we already did it today
''' '''

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import json import json
from datetime import date from datetime import date
from typing import Iterable from collections.abc import Iterable
from pysanejs import SaneJS # type: ignore[attr-defined] from pysanejs import SaneJS # type: ignore[attr-defined]
@ -26,7 +26,6 @@ class SaneJavaScript(AbstractModule):
self.logger.warning('Not up.') self.logger.warning('Not up.')
return False return False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir = get_homedir() / 'sanejs' self.storage_dir = get_homedir() / 'sanejs'
self.storage_dir.mkdir(parents=True, exist_ok=True) self.storage_dir.mkdir(parents=True, exist_ok=True)
return True return True

View File

@ -25,8 +25,6 @@ class URLhaus(AbstractModule):
return False return False
self.url = self.config.get('url') self.url = self.config.get('url')
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir_uh = get_homedir() / 'urlhaus' self.storage_dir_uh = get_homedir() / 'urlhaus'
self.storage_dir_uh.mkdir(parents=True, exist_ok=True) self.storage_dir_uh.mkdir(parents=True, exist_ok=True)
return True return True
@ -48,23 +46,23 @@ class URLhaus(AbstractModule):
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def capture_default_trigger(self, cache: CaptureCache, /, *, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available:
return {'error': 'Module not available'} if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
if auto_trigger and not self.allow_auto_trigger: return error
return {'error': 'Auto trigger not allowed on module'}
# Check URLs up to the redirect # Check URLs up to the redirect
if cache.redirects: if cache.redirects:
for redirect in cache.redirects: for redirect in cache.redirects:
self.url_lookup(redirect) self.__url_lookup(redirect)
else: else:
self.url_lookup(cache.url) self.__url_lookup(cache.url)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def url_lookup(self, url: str) -> None: def __url_lookup(self, url: str) -> None:
'''Lookup an URL on URL haus '''Lookup an URL on URL haus
Note: It will trigger a request to URL haus every time *until* there is a hit (it's cheap), then once a day. Note: It will trigger a request to URL haus every time *until* there is a hit (it's cheap), then once a day.
''' '''

View File

@ -29,9 +29,6 @@ class UrlScan(AbstractModule):
self.client.headers['API-Key'] = self.config['apikey'] self.client.headers['API-Key'] = self.config['apikey']
self.client.headers['Content-Type'] = 'application/json' self.client.headers['Content-Type'] = 'application/json'
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.autosubmit = bool(self.config.get('autosubmit', False))
if self.config.get('force_visibility'): if self.config.get('force_visibility'):
# Cases: # Cases:
# 1. False: unlisted for hidden captures / public for others # 1. False: unlisted for hidden captures / public for others
@ -63,18 +60,13 @@ class UrlScan(AbstractModule):
with cached_entries[0].open() as f: with cached_entries[0].open() as f:
return json.load(f) return json.load(f)
def capture_default_trigger(self, capture_info: CaptureCache, /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on the initial URL''' '''Run the module on the initial URL'''
if not self.available: if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return {'error': 'Module not available'} return error
if auto_trigger and not self.allow_auto_trigger:
# NOTE: if auto_trigger is true, it means the request comes from the
# auto trigger feature (disabled by default)
# Each module can disable auto-trigger to avoid depleating the
# API limits.
return {'error': 'Auto trigger not allowed on module'}
self.url_submit(capture_info, visibility, force) visibility = 'unlisted' if cache.no_index else 'public'
self.__url_submit(cache, visibility, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
def __submit_url(self, url: str, useragent: str | None, referer: str | None, visibility: str) -> dict[str, Any]: def __submit_url(self, url: str, useragent: str | None, referer: str | None, visibility: str) -> dict[str, Any]:
@ -103,7 +95,7 @@ class UrlScan(AbstractModule):
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def url_submit(self, capture_info: CaptureCache, visibility: str, force: bool=False) -> dict[str, Any]: def __url_submit(self, capture_info: CaptureCache, visibility: str, force: bool=False) -> dict[str, Any]:
'''Lookup an URL on urlscan.io '''Lookup an URL on urlscan.io
Note: force means 2 things: Note: force means 2 things:
* (re)scan of the URL * (re)scan of the URL

View File

@ -5,12 +5,15 @@ from __future__ import annotations
import re import re
import socket import socket
from typing import overload, Literal from typing import overload, Literal, TYPE_CHECKING
from har2tree import CrawledTree, Har2TreeError, HostNode from har2tree import Har2TreeError, HostNode
from .abstractmodule import AbstractModule from .abstractmodule import AbstractModule
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class UniversalWhois(AbstractModule): class UniversalWhois(AbstractModule):
@ -21,7 +24,6 @@ class UniversalWhois(AbstractModule):
self.server = self.config.get('ipaddress') self.server = self.config.get('ipaddress')
self.port = self.config.get('port') self.port = self.config.get('port')
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
try: try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
@ -47,15 +49,14 @@ class UniversalWhois(AbstractModule):
self.whois(cname, contact_email_only=False) self.whois(cname, contact_email_only=False)
self.whois(hostnode.name, contact_email_only=False) self.whois(hostnode.name, contact_email_only=False)
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return None return error
if auto_trigger and not self.allow_auto_trigger:
return None
try: try:
hostnode = crawled_tree.root_hartree.get_host_node_by_uuid(crawled_tree.root_hartree.rendered_node.hostnode_uuid) hostnode = cache.tree.root_hartree.get_host_node_by_uuid(cache.tree.root_hartree.rendered_node.hostnode_uuid)
except Har2TreeError as e: except Har2TreeError as e:
self.logger.warning(e) self.logger.warning(e)
else: else:
@ -63,6 +64,8 @@ class UniversalWhois(AbstractModule):
for n in hostnode.get_ancestors(): for n in hostnode.get_ancestors():
self.query_whois_hostnode(n) self.query_whois_hostnode(n)
return {'success': 'Module triggered'}
@overload @overload
def whois(self, query: str, contact_email_only: Literal[True]) -> list[str]: def whois(self, query: str, contact_email_only: Literal[True]) -> list[str]:
... ...

View File

@ -37,9 +37,6 @@ class VirusTotal(AbstractModule):
self.client = vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) self.client = vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False))
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.autosubmit = bool(self.config.get('autosubmit', False))
self.storage_dir_vt = get_homedir() / 'vt_url' self.storage_dir_vt = get_homedir() / 'vt_url'
self.storage_dir_vt.mkdir(parents=True, exist_ok=True) self.storage_dir_vt.mkdir(parents=True, exist_ok=True)
return True return True
@ -59,30 +56,30 @@ class VirusTotal(AbstractModule):
cached_entries[0].unlink(missing_ok=True) cached_entries[0].unlink(missing_ok=True)
return None return None
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect''' '''Run the module on all the nodes up to the final redirect'''
if not self.available: if error := super().capture_default_trigger(cache, force=force,
return {'error': 'Module not available'} auto_trigger=auto_trigger, as_admin=as_admin):
if auto_trigger and not self.allow_auto_trigger: return error
return {'error': 'Auto trigger not allowed on module'}
if cache.redirects: if cache.redirects:
for redirect in cache.redirects: for redirect in cache.redirects:
self.url_lookup(redirect, force) self.__url_lookup(redirect, force)
else: else:
self.url_lookup(cache.url, force) self.__url_lookup(cache.url, force)
return {'success': 'Module triggered'} return {'success': 'Module triggered'}
async def get_object_vt(self, url: str) -> ClientResponse: async def __get_object_vt(self, url: str) -> ClientResponse:
url_id = vt.url_id(url) url_id = vt.url_id(url)
async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client: async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client:
return await client.get_object_async(f"/urls/{url_id}") return await client.get_object_async(f"/urls/{url_id}")
async def scan_url(self, url: str) -> None: async def __scan_url(self, url: str) -> None:
async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client: async with vt.Client(self.config['apikey'], trust_env=self.config.get('trustenv', False)) as client:
await client.scan_url_async(url) await client.scan_url_async(url)
def url_lookup(self, url: str, force: bool=False) -> None: def __url_lookup(self, url: str, force: bool=False) -> None:
'''Lookup an URL on VT '''Lookup an URL on VT
Note: force means 2 things: Note: force means 2 things:
* (re)scan of the URL * (re)scan of the URL
@ -100,7 +97,7 @@ class VirusTotal(AbstractModule):
scan_requested = False scan_requested = False
if self.autosubmit and force: if self.autosubmit and force:
try: try:
asyncio.run(self.scan_url(url)) asyncio.run(self.__scan_url(url))
except APIError as e: except APIError as e:
if e.code == 'QuotaExceededError': if e.code == 'QuotaExceededError':
self.logger.warning('VirusTotal quota exceeded, sry.') self.logger.warning('VirusTotal quota exceeded, sry.')
@ -113,7 +110,7 @@ class VirusTotal(AbstractModule):
for _ in range(3): for _ in range(3):
try: try:
url_information = asyncio.run(self.get_object_vt(url)) url_information = asyncio.run(self.__get_object_vt(url))
with vt_file.open('w') as _f: with vt_file.open('w') as _f:
json.dump(url_information.to_dict(), _f, default=jsonify_vt) json.dump(url_information.to_dict(), _f, default=jsonify_vt)
break break
@ -122,7 +119,7 @@ class VirusTotal(AbstractModule):
break break
if not scan_requested and e.code == 'NotFoundError': if not scan_requested and e.code == 'NotFoundError':
try: try:
asyncio.run(self.scan_url(url)) asyncio.run(self.__scan_url(url))
scan_requested = True scan_requested = True
except APIError as e: except APIError as e:
self.logger.warning(f'Unable to trigger VirusTotal on {url}: {e}') self.logger.warning(f'Unable to trigger VirusTotal on {url}: {e}')

View File

@ -1014,7 +1014,8 @@ def urls_rendered_page(tree_uuid: str) -> WerkzeugResponse | str | Response:
@app.route('/tree/<string:tree_uuid>/hashlookup', methods=['GET']) @app.route('/tree/<string:tree_uuid>/hashlookup', methods=['GET'])
def hashlookup(tree_uuid: str) -> str | WerkzeugResponse | Response: def hashlookup(tree_uuid: str) -> str | WerkzeugResponse | Response:
try: try:
merged, total_ressources = lookyloo.merge_hashlookup_tree(tree_uuid) merged, total_ressources = lookyloo.merge_hashlookup_tree(tree_uuid,
as_admin=flask_login.current_user.is_authenticated)
# We only want unique URLs for the template # We only want unique URLs for the template
for sha1, entries in merged.items(): for sha1, entries in merged.items():
entries['nodes'] = {node.name for node in entries['nodes']} entries['nodes'] = {node.name for node in entries['nodes']}
@ -1319,6 +1320,8 @@ def tree_urls(tree_uuid: str) -> str:
@app.route('/tree/<string:tree_uuid>/pandora', methods=['GET', 'POST']) @app.route('/tree/<string:tree_uuid>/pandora', methods=['GET', 'POST'])
def pandora_submit(tree_uuid: str) -> dict[str, Any] | Response: def pandora_submit(tree_uuid: str) -> dict[str, Any] | Response:
if not lookyloo.pandora.available:
return {'error': 'Pandora not available.'}
node_uuid = None node_uuid = None
if request.method == 'POST': if request.method == 'POST':
input_json = request.get_json(force=True) input_json = request.get_json(force=True)