chg; Finalize the move to optional admin only 3rd party modules

pull/1000/head
Raphaël Vinot 2024-11-23 01:59:22 +01:00
parent 983ef18c9e
commit b490446436
17 changed files with 59 additions and 50 deletions

View File

@ -118,7 +118,7 @@ class BackgroundBuildCaptures(AbstractManager):
self.logger.info(f'Build pickle for {uuid}: {path.name}')
self.lookyloo.get_crawled_tree(uuid)
try:
self.lookyloo.trigger_modules(uuid, auto_trigger=True)
self.lookyloo.trigger_modules(uuid, auto_trigger=True, force=False, as_admin=False)
except Exception as e:
self.logger.exception(f'Unable to trigger modules for {uuid}: {e}')
self.logger.info(f'Pickle for {uuid} built.')

View File

@ -23,7 +23,7 @@ class Context():
self.logger.setLevel(get_config('generic', 'loglevel'))
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True) # type: ignore[type-arg]
self._cache_known_content()
self.sanejs = SaneJavaScript(config_name='SaneJS')
self.sanejs = SaneJavaScript()
def clear_context(self) -> None:
self.redis.flushdb()

View File

@ -365,7 +365,7 @@ class Lookyloo():
if get_config('generic', 'index_everything'):
get_indexing(full=True).reindex_categories_capture(capture_uuid)
def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False, *, as_admin: bool=False) -> dict[str, Any]:
def trigger_modules(self, capture_uuid: str, /, force: bool, auto_trigger: bool, *, as_admin: bool) -> dict[str, Any]:
'''Launch the 3rd party modules on a capture.
It uses the cached result *if* the module was triggered the same day.
The `force` flag re-triggers the module regardless of the cache.'''
@ -373,16 +373,16 @@ class Lookyloo():
if not cache:
return {'error': f'UUID {capture_uuid} is either unknown or the tree is not ready yet.'}
self.uwhois.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
self.hashlookup.capture_default_trigger(cache, auto_trigger=auto_trigger)
self.uwhois.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
self.hashlookup.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
to_return: dict[str, dict[str, Any]] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {},
'URLhaus': {}}
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['UrlScan'] = self.urlscan.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger)
to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, auto_trigger=auto_trigger)
to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, auto_trigger=auto_trigger)
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
to_return['UrlScan'] = self.urlscan.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
to_return['Phishtank'] = self.phishtank.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
return to_return
def get_modules_responses(self, capture_uuid: str, /) -> dict[str, Any]:
@ -439,7 +439,7 @@ class Lookyloo():
to_return['urlscan']['result'] = result
return to_return
def get_historical_lookups(self, capture_uuid: str, /, force: bool=False) -> dict[str, Any]:
def get_historical_lookups(self, capture_uuid: str, /, force: bool, auto_trigger: bool, as_admin: bool) -> dict[str, Any]:
# this method is only trigered when the user wants to get more details about the capture
# by looking at Passive DNS systems, check if there are hits in the current capture
# in another one and things like that. The trigger_modules method is for getting
@ -451,7 +451,7 @@ class Lookyloo():
to_return: dict[str, Any] = defaultdict(dict)
if self.riskiq.available:
try:
self.riskiq.capture_default_trigger(cache)
self.riskiq.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
if hasattr(cache, 'redirects') and cache.redirects:
hostname = urlparse(cache.redirects[-1]).hostname
else:
@ -462,7 +462,7 @@ class Lookyloo():
except RiskIQError as e:
self.logger.warning(e.response.content)
if self.circl_pdns.available:
self.circl_pdns.capture_default_trigger(cache)
self.circl_pdns.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin)
if hasattr(cache, 'redirects') and cache.redirects:
hostname = urlparse(cache.redirects[-1]).hostname
else:
@ -1180,7 +1180,7 @@ class Lookyloo():
event.objects[-1].add_reference(screenshot, 'rendered-as', 'Screenshot of the page')
if self.vt.available:
response = self.vt.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin)
response = self.vt.capture_default_trigger(cache, force=False, auto_trigger=False, as_admin=as_admin)
if 'error' in response:
self.logger.warning(f'Unable to trigger VT: {response["error"]}')
else:
@ -1205,7 +1205,7 @@ class Lookyloo():
e_obj.add_reference(pt_attribute, 'known-as', 'Permalink on Phishtank')
if self.urlscan.available:
response = self.urlscan.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin)
response = self.urlscan.capture_default_trigger(cache, force=False, auto_trigger=False, as_admin=as_admin)
if 'error' in response:
self.logger.warning(f'Unable to trigger URLScan: {response["error"]}')
else:
@ -1271,7 +1271,7 @@ class Lookyloo():
hashlookup_file = cache.capture_dir / 'hashlookup.json'
if not hashlookup_file.exists():
self.hashlookup.capture_default_trigger(cache, auto_trigger=False, as_admin=as_admin)
self.hashlookup.capture_default_trigger(cache, force=False, auto_trigger=False, as_admin=as_admin)
if not hashlookup_file.exists():
# no hits on hashlookup

View File

@ -5,10 +5,11 @@ from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any
from typing import Any, TYPE_CHECKING
from ..default import get_config
from ..capturecache import CaptureCache
if TYPE_CHECKING:
from ..capturecache import CaptureCache
logging.config.dictConfig(get_config('logging'))
@ -61,8 +62,8 @@ class AbstractModule(ABC):
def module_init(self) -> bool:
...
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:

View File

@ -44,8 +44,8 @@ class CIRCLPDNS(AbstractModule):
with cached_entries[0].open() as f:
return [PDNSRecord(record) for record in json.load(f)]
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return error

View File

@ -29,8 +29,8 @@ class FOX(AbstractModule):
return True
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on the initial URL'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):

View File

@ -34,8 +34,8 @@ class HashlookupModule(AbstractModule):
self.client.info()
return True
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return error

View File

@ -78,8 +78,8 @@ class Phishtank(AbstractModule):
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool = False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return error

View File

@ -43,8 +43,8 @@ class PhishingInitiative(AbstractModule):
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):

View File

@ -69,8 +69,8 @@ class RiskIQ(AbstractModule):
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):

View File

@ -3,34 +3,39 @@
from __future__ import annotations
import json
import logging
from datetime import date
from collections.abc import Iterable
from pysanejs import SaneJS # type: ignore[attr-defined]
from ..default import get_homedir
from .abstractmodule import AbstractModule
from ..default import get_homedir, get_config, LookylooException
class SaneJavaScript(AbstractModule):
class SaneJavaScript():
def module_init(self) -> bool:
def __init__(self) -> None:
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.config = get_config('modules', 'SaneJS')
if not self.config.get('enabled'):
self.logger.info('Not enabled.')
return False
self.available = False
self.client = SaneJS()
if not self.client.is_up:
self.logger.warning('Not up.')
return False
self.available = False
self.storage_dir = get_homedir() / 'sanejs'
self.storage_dir.mkdir(parents=True, exist_ok=True)
return True
self.available = True
def hashes_lookup(self, sha512: Iterable[str] | str, force: bool=False) -> dict[str, list[str]]:
if not self.available:
raise LookylooException('SaneJS is not available.')
if isinstance(sha512, str):
hashes: Iterable[str] = [sha512]
else:

View File

@ -46,8 +46,8 @@ class URLhaus(AbstractModule):
response.raise_for_status()
return response.json()
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):

View File

@ -60,7 +60,8 @@ class UrlScan(AbstractModule):
with cached_entries[0].open() as f:
return json.load(f)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on the initial URL'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return error

View File

@ -49,8 +49,8 @@ class UniversalWhois(AbstractModule):
self.whois(cname, contact_email_only=False)
self.whois(hostnode.name, contact_email_only=False)
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force, auto_trigger=auto_trigger, as_admin=as_admin):
return error

View File

@ -56,8 +56,8 @@ class VirusTotal(AbstractModule):
cached_entries[0].unlink(missing_ok=True)
return None
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False,
auto_trigger: bool=False, as_admin: bool=False) -> dict[str, str]:
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool,
auto_trigger: bool, as_admin: bool) -> dict[str, str]:
'''Run the module on all the nodes up to the final redirect'''
if error := super().capture_default_trigger(cache, force=force,
auto_trigger=auto_trigger, as_admin=as_admin):

View File

@ -662,14 +662,15 @@ def hostnode_popup(tree_uuid: str, node_uuid: str) -> str | WerkzeugResponse | R
def trigger_modules(tree_uuid: str) -> WerkzeugResponse | str | Response:
force = True if (request.args.get('force') and request.args.get('force') == 'True') else False
auto_trigger = True if (request.args.get('auto_trigger') and request.args.get('auto_trigger') == 'True') else False
lookyloo.trigger_modules(tree_uuid, force=force, auto_trigger=auto_trigger)
lookyloo.trigger_modules(tree_uuid, force=force, auto_trigger=auto_trigger, as_admin=flask_login.current_user.is_authenticated)
return redirect(url_for('modules', tree_uuid=tree_uuid))
@app.route('/tree/<string:tree_uuid>/historical_lookups', methods=['GET'])
def historical_lookups(tree_uuid: str) -> str | WerkzeugResponse | Response:
force = True if (request.args.get('force') and request.args.get('force') == 'True') else False
data = lookyloo.get_historical_lookups(tree_uuid, force)
auto_trigger = True if (request.args.get('auto_trigger') and request.args.get('auto_trigger') == 'True') else False
data = lookyloo.get_historical_lookups(tree_uuid, force=force, auto_trigger=auto_trigger, as_admin=flask_login.current_user.is_authenticated)
return render_template('historical_lookups.html', tree_uuid=tree_uuid,
riskiq=data.get('riskiq'),
circl_pdns=data.get('circl_pdns'))

View File

@ -308,7 +308,8 @@ class TriggerModules(Resource): # type: ignore[misc]
def post(self, capture_uuid: str) -> dict[str, Any]:
parameters: dict[str, Any] = request.get_json(force=True)
force = True if parameters.get('force') else False
return lookyloo.trigger_modules(capture_uuid, force=force)
return lookyloo.trigger_modules(capture_uuid, force=force, auto_trigger=False,
as_admin=flask_login.current_user.is_authenticated)
@api.route('/json/<string:capture_uuid>/modules')