From b490446436ebf58e402dfec8db1f9a3e85e8a3e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Sat, 23 Nov 2024 01:59:22 +0100 Subject: [PATCH] chg; Finalize the move to optional admin only 3rd party modules --- bin/background_build_captures.py | 2 +- lookyloo/context.py | 2 +- lookyloo/lookyloo.py | 28 ++++++++++++++-------------- lookyloo/modules/abstractmodule.py | 9 +++++---- lookyloo/modules/circlpdns.py | 4 ++-- lookyloo/modules/fox.py | 4 ++-- lookyloo/modules/hashlookup.py | 4 ++-- lookyloo/modules/phishtank.py | 4 ++-- lookyloo/modules/pi.py | 4 ++-- lookyloo/modules/riskiq.py | 4 ++-- lookyloo/modules/sanejs.py | 21 +++++++++++++-------- lookyloo/modules/urlhaus.py | 4 ++-- lookyloo/modules/urlscan.py | 3 ++- lookyloo/modules/uwhois.py | 4 ++-- lookyloo/modules/vt.py | 4 ++-- website/web/__init__.py | 5 +++-- website/web/genericapi.py | 3 ++- 17 files changed, 59 insertions(+), 50 deletions(-) diff --git a/bin/background_build_captures.py b/bin/background_build_captures.py index 2c19ff77..0f58fe34 100755 --- a/bin/background_build_captures.py +++ b/bin/background_build_captures.py @@ -118,7 +118,7 @@ class BackgroundBuildCaptures(AbstractManager): self.logger.info(f'Build pickle for {uuid}: {path.name}') self.lookyloo.get_crawled_tree(uuid) try: - self.lookyloo.trigger_modules(uuid, auto_trigger=True) + self.lookyloo.trigger_modules(uuid, auto_trigger=True, force=False, as_admin=False) except Exception as e: self.logger.exception(f'Unable to trigger modules for {uuid}: {e}') self.logger.info(f'Pickle for {uuid} built.') diff --git a/lookyloo/context.py b/lookyloo/context.py index a76f8d6d..323dce76 100644 --- a/lookyloo/context.py +++ b/lookyloo/context.py @@ -23,7 +23,7 @@ class Context(): self.logger.setLevel(get_config('generic', 'loglevel')) self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True) # type: ignore[type-arg] self._cache_known_content() - self.sanejs = SaneJavaScript(config_name='SaneJS') + self.sanejs = SaneJavaScript() def clear_context(self) -> None: self.redis.flushdb() diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 9b058d42..098a7c00 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -365,7 +365,7 @@ class Lookyloo(): if get_config('generic', 'index_everything'): get_indexing(full=True).reindex_categories_capture(capture_uuid) - def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False, *, as_admin: bool=False) -> dict[str, Any]: + def trigger_modules(self, capture_uuid: str, /, force: bool, auto_trigger: bool, *, as_admin: bool) -> dict[str, Any]: '''Launch the 3rd party modules on a capture. It uses the cached result *if* the module was triggered the same day. The `force` flag re-triggers the module regardless of the cache.''' @@ -373,16 +373,16 @@ class Lookyloo(): if not cache: return {'error': f'UUID {capture_uuid} is either unknown or the tree is not ready yet.'} - self.uwhois.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) - self.hashlookup.capture_default_trigger(cache, auto_trigger=auto_trigger) + self.uwhois.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) + self.hashlookup.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) to_return: dict[str, dict[str, Any]] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}, 'URLhaus': {}} - to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) - to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) - to_return['UrlScan'] = self.urlscan.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) - to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, auto_trigger=auto_trigger) - to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, auto_trigger=auto_trigger) + to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) + to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) + to_return['UrlScan'] = self.urlscan.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) + to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) + to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) return to_return def get_modules_responses(self, capture_uuid: str, /) -> dict[str, Any]: @@ -439,7 +439,7 @@ class Lookyloo(): to_return['urlscan']['result'] = result return to_return - def get_historical_lookups(self, capture_uuid: str, /, force: bool=False) -> dict[str, Any]: + def get_historical_lookups(self, capture_uuid: str, /, force: bool, auto_trigger: bool, as_admin: bool) -> dict[str, Any]: # this method is only trigered when the user wants to get more details about the capture # by looking at Passive DNS systems, check if there are hits in the current capture # in another one and things like that. The trigger_modules method is for getting @@ -451,7 +451,7 @@ class Lookyloo(): to_return: dict[str, Any] = defaultdict(dict) if self.riskiq.available: try: - self.riskiq.capture_default_trigger(cache) + self.riskiq.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) if hasattr(cache, 'redirects') and cache.redirects: hostname = urlparse(cache.redirects[-1]).hostname else: @@ -462,7 +462,7 @@ class Lookyloo(): except RiskIQError as e: self.logger.warning(e.response.content) if self.circl_pdns.available: - self.circl_pdns.capture_default_trigger(cache) + self.circl_pdns.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin) if hasattr(cache, 'redirects') and cache.redirects: hostname = urlparse(cache.redirects[-1]).hostname else: @@ -1180,7 +1180,7 @@ class Lookyloo(): event.objects[-1].add_reference(screenshot, 'rendered-as', 'Screenshot of the page') if self.vt.available: - response = self.vt.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin) + response = self.vt.capture_default_trigger(cache, force=False, auto_trigger=False, as_admin=as_admin) if 'error' in response: self.logger.warning(f'Unable to trigger VT: {response["error"]}') else: @@ -1205,7 +1205,7 @@ class Lookyloo(): e_obj.add_reference(pt_attribute, 'known-as', 'Permalink on Phishtank') if self.urlscan.available: - response = self.urlscan.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin) + response = self.urlscan.capture_default_trigger(cache, force=False, auto_trigger=False, as_admin=as_admin) if 'error' in response: self.logger.warning(f'Unable to trigger URLScan: {response["error"]}') else: @@ -1271,7 +1271,7 @@ class Lookyloo(): hashlookup_file = cache.capture_dir / 'hashlookup.json' if not hashlookup_file.exists(): - self.hashlookup.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin) + self.hashlookup.capture_default_trigger(cache, force=False, auto_trigger=False, as_admin=as_admin) if not hashlookup_file.exists(): # no hits on hashlookup diff --git a/lookyloo/modules/abstractmodule.py b/lookyloo/modules/abstractmodule.py index 82f2e1dc..e30ac15e 100644 --- a/lookyloo/modules/abstractmodule.py +++ b/lookyloo/modules/abstractmodule.py @@ -5,10 +5,11 @@ from __future__ import annotations import logging from abc import ABC, abstractmethod -from typing import Any +from typing import Any, TYPE_CHECKING from ..default import get_config -from ..capturecache import CaptureCache +if TYPE_CHECKING: + from ..capturecache import CaptureCache logging.config.dictConfig(get_config('logging')) @@ -61,8 +62,8 @@ class AbstractModule(ABC): def module_init(self) -> bool: ... - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: if not self.available: return {'error': 'Module not available'} if auto_trigger and not self.allow_auto_trigger: diff --git a/lookyloo/modules/circlpdns.py b/lookyloo/modules/circlpdns.py index 1471339d..12c98c29 100644 --- a/lookyloo/modules/circlpdns.py +++ b/lookyloo/modules/circlpdns.py @@ -44,8 +44,8 @@ class CIRCLPDNS(AbstractModule): with cached_entries[0].open() as f: return [PDNSRecord(record) for record in json.load(f)] - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): return error diff --git a/lookyloo/modules/fox.py b/lookyloo/modules/fox.py index 15d02d64..4b792727 100644 --- a/lookyloo/modules/fox.py +++ b/lookyloo/modules/fox.py @@ -29,8 +29,8 @@ class FOX(AbstractModule): return True - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on the initial URL''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): diff --git a/lookyloo/modules/hashlookup.py b/lookyloo/modules/hashlookup.py index fcea9c9e..e9c4aa9e 100644 --- a/lookyloo/modules/hashlookup.py +++ b/lookyloo/modules/hashlookup.py @@ -34,8 +34,8 @@ class HashlookupModule(AbstractModule): self.client.info() return True - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): return error diff --git a/lookyloo/modules/phishtank.py b/lookyloo/modules/phishtank.py index a1019a55..ce9f6f20 100644 --- a/lookyloo/modules/phishtank.py +++ b/lookyloo/modules/phishtank.py @@ -78,8 +78,8 @@ class Phishtank(AbstractModule): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool = False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): return error diff --git a/lookyloo/modules/pi.py b/lookyloo/modules/pi.py index 983ee277..c057ff86 100644 --- a/lookyloo/modules/pi.py +++ b/lookyloo/modules/pi.py @@ -43,8 +43,8 @@ class PhishingInitiative(AbstractModule): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): diff --git a/lookyloo/modules/riskiq.py b/lookyloo/modules/riskiq.py index 44fdc921..488ccaba 100644 --- a/lookyloo/modules/riskiq.py +++ b/lookyloo/modules/riskiq.py @@ -69,8 +69,8 @@ class RiskIQ(AbstractModule): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): diff --git a/lookyloo/modules/sanejs.py b/lookyloo/modules/sanejs.py index 806d6d4e..92a96157 100644 --- a/lookyloo/modules/sanejs.py +++ b/lookyloo/modules/sanejs.py @@ -3,34 +3,39 @@ from __future__ import annotations import json +import logging from datetime import date from collections.abc import Iterable from pysanejs import SaneJS # type: ignore[attr-defined] -from ..default import get_homedir - -from .abstractmodule import AbstractModule +from ..default import get_homedir, get_config, LookylooException -class SaneJavaScript(AbstractModule): +class SaneJavaScript(): - def module_init(self) -> bool: + def __init__(self) -> None: + self.logger = logging.getLogger(f'{self.__class__.__name__}') + self.logger.setLevel(get_config('generic', 'loglevel')) + self.config = get_config('modules', 'SaneJS') if not self.config.get('enabled'): self.logger.info('Not enabled.') - return False + self.available = False self.client = SaneJS() if not self.client.is_up: self.logger.warning('Not up.') - return False + self.available = False self.storage_dir = get_homedir() / 'sanejs' self.storage_dir.mkdir(parents=True, exist_ok=True) - return True + self.available = True def hashes_lookup(self, sha512: Iterable[str] | str, force: bool=False) -> dict[str, list[str]]: + if not self.available: + raise LookylooException('SaneJS is not available.') + if isinstance(sha512, str): hashes: Iterable[str] = [sha512] else: diff --git a/lookyloo/modules/urlhaus.py b/lookyloo/modules/urlhaus.py index 3effce74..b53475a0 100644 --- a/lookyloo/modules/urlhaus.py +++ b/lookyloo/modules/urlhaus.py @@ -46,8 +46,8 @@ class URLhaus(AbstractModule): response.raise_for_status() return response.json() - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): diff --git a/lookyloo/modules/urlscan.py b/lookyloo/modules/urlscan.py index e891926d..a8793a01 100644 --- a/lookyloo/modules/urlscan.py +++ b/lookyloo/modules/urlscan.py @@ -60,7 +60,8 @@ class UrlScan(AbstractModule): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on the initial URL''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): return error diff --git a/lookyloo/modules/uwhois.py b/lookyloo/modules/uwhois.py index 4d9f3d7f..a70854bd 100644 --- a/lookyloo/modules/uwhois.py +++ b/lookyloo/modules/uwhois.py @@ -49,8 +49,8 @@ class UniversalWhois(AbstractModule): self.whois(cname, contact_email_only=False) self.whois(hostnode.name, contact_email_only=False) - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): return error diff --git a/lookyloo/modules/vt.py b/lookyloo/modules/vt.py index 927bf3d3..24866ec4 100644 --- a/lookyloo/modules/vt.py +++ b/lookyloo/modules/vt.py @@ -56,8 +56,8 @@ class VirusTotal(AbstractModule): cached_entries[0].unlink(missing_ok=True) return None - def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, - auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool, + auto_trigger: bool, as_admin: bool) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin): diff --git a/website/web/__init__.py b/website/web/__init__.py index 3ba541d8..6c595816 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -662,14 +662,15 @@ def hostnode_popup(tree_uuid: str, node_uuid: str) -> str | WerkzeugResponse | R def trigger_modules(tree_uuid: str) -> WerkzeugResponse | str | Response: force = True if (request.args.get('force') and request.args.get('force') == 'True') else False auto_trigger = True if (request.args.get('auto_trigger') and request.args.get('auto_trigger') == 'True') else False - lookyloo.trigger_modules(tree_uuid, force=force, auto_trigger=auto_trigger) + lookyloo.trigger_modules(tree_uuid, force=force, auto_trigger=auto_trigger, as_admin=flask_login.current_user.is_authenticated) return redirect(url_for('modules', tree_uuid=tree_uuid)) @app.route('/tree//historical_lookups', methods=['GET']) def historical_lookups(tree_uuid: str) -> str | WerkzeugResponse | Response: force = True if (request.args.get('force') and request.args.get('force') == 'True') else False - data = lookyloo.get_historical_lookups(tree_uuid, force) + auto_trigger = True if (request.args.get('auto_trigger') and request.args.get('auto_trigger') == 'True') else False + data = lookyloo.get_historical_lookups(tree_uuid, force=force, auto_trigger=auto_trigger, as_admin=flask_login.current_user.is_authenticated) return render_template('historical_lookups.html', tree_uuid=tree_uuid, riskiq=data.get('riskiq'), circl_pdns=data.get('circl_pdns')) diff --git a/website/web/genericapi.py b/website/web/genericapi.py index daa377c3..879b7bce 100644 --- a/website/web/genericapi.py +++ b/website/web/genericapi.py @@ -308,7 +308,8 @@ class TriggerModules(Resource): # type: ignore[misc] def post(self, capture_uuid: str) -> dict[str, Any]: parameters: dict[str, Any] = request.get_json(force=True) force = True if parameters.get('force') else False - return lookyloo.trigger_modules(capture_uuid, force=force) + return lookyloo.trigger_modules(capture_uuid, force=force, auto_trigger=False, + as_admin=flask_login.current_user.is_authenticated) @api.route('/json//modules')