From 50dbbd6eba9920a5b7d256899ef775e57a4ec7ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 13 Aug 2021 13:50:26 +0200 Subject: [PATCH] new: [API] Trigger modules for a capture --- lookyloo/lookyloo.py | 15 +++++++++------ lookyloo/modules.py | 21 ++++++++++++--------- website/web/genericapi.py | 17 +++++++++++++++++ 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index d9e7eb4b..d50dc4b4 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -388,7 +388,7 @@ class Lookyloo(): with (capture_dir / 'categories').open('w') as f: f.writelines(f'{t}\n' for t in current_categories) - def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> None: + def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> Dict: '''Launch the 3rd party modules on a capture. It uses the cached result *if* the module was triggered the same day. The `force` flag re-triggers the module regardless of the cache.''' @@ -396,17 +396,20 @@ class Lookyloo(): ct = self.get_crawled_tree(capture_uuid) except LookylooException: self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_uuid}) is cached.') - return + return {'error': f'UUID {capture_uuid} is either unknown or the tree is not ready yet.'} + self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) + + to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}} capture_cache = self.capture_cache(capture_uuid) - self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) - self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) - self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) - self.urlscan.capture_default_trigger( + to_return['PhishingInitiative'] = self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) + to_return['VirusTotal'] = self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) + to_return['UrlScan'] = self.urlscan.capture_default_trigger( self.get_info(capture_uuid), visibility='unlisted' if (capture_cache and capture_cache.no_index) else 'public', force=force, auto_trigger=auto_trigger) + return to_return def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]: '''Get the responses of the modules from the cached responses on the disk''' diff --git a/lookyloo/modules.py b/lookyloo/modules.py index 74e4acdd..30b8fe1f 100644 --- a/lookyloo/modules.py +++ b/lookyloo/modules.py @@ -328,18 +328,19 @@ class PhishingInitiative(): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None: + def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict: '''Run the module on all the nodes up to the final redirect''' if not self.available: - return None + return {'error': 'Module not available'} if auto_trigger and not self.allow_auto_trigger: - return None + return {'error': 'Auto trigger not allowed on module'} if crawled_tree.redirects: for redirect in crawled_tree.redirects: self.url_lookup(redirect, force) else: self.url_lookup(crawled_tree.root_hartree.har.root_url, force) + return {'success': 'Module triggered'} def url_lookup(self, url: str, force: bool=False) -> None: '''Lookup an URL on Phishing Initiative @@ -421,18 +422,19 @@ class VirusTotal(): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None: + def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict: '''Run the module on all the nodes up to the final redirect''' if not self.available: - return None + return {'error': 'Module not available'} if auto_trigger and not self.allow_auto_trigger: - return None + return {'error': 'Auto trigger not allowed on module'} if crawled_tree.redirects: for redirect in crawled_tree.redirects: self.url_lookup(redirect, force) else: self.url_lookup(crawled_tree.root_hartree.har.root_url, force) + return {'success': 'Module triggered'} def url_lookup(self, url: str, force: bool=False) -> None: '''Lookup an URL on VT @@ -531,18 +533,19 @@ class UrlScan(): with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, capture_info: Dict[str, Any], /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> None: + def capture_default_trigger(self, capture_info: Dict[str, Any], /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> Dict: '''Run the module on the initial URL''' if not self.available: - return None + return {'error': 'Module not available'} if auto_trigger and not self.allow_auto_trigger: # NOTE: if auto_trigger is true, it means the request comes from the # auto trigger feature (disabled by default) # Each module can disable auto-trigger to avoid depleating the # API limits. - return None + return {'error': 'Auto trigger not allowed on module'} self.url_submit(capture_info, visibility, force) + return {'success': 'Module triggered'} def __submit_url(self, url: str, useragent: str, referer: str, visibility: str) -> Dict: data = {"url": url, 'customagent': useragent, 'referer': referer} diff --git a/website/web/genericapi.py b/website/web/genericapi.py index 7d0302ab..0dc98467 100644 --- a/website/web/genericapi.py +++ b/website/web/genericapi.py @@ -219,6 +219,23 @@ class MISPPush(Resource): return to_return +trigger_modules_fields = api.model('TriggerModulesFields', { + 'force': fields.Boolean(description="Force trigger the modules, even if the results are already cached.", + default=False, required=False), +}) + + +@api.route('/json//trigger_modules') +@api.doc(description='Trigger all the available 3rd party modules on the given capture', + params={'capture_uuid': 'The UUID of the capture'}) +class TriggerModules(Resource): + @api.doc(body=trigger_modules_fields) + def post(self, capture_uuid: str): + parameters: Dict = request.get_json(force=True) + force = True if parameters.get('force') else False + return lookyloo.trigger_modules(capture_uuid, force=force) + + @api.route('/json/hash_info/') @api.doc(description='Search for a ressource with a specific hash (sha512)', params={'h': 'The hash (sha512)'})