mirror of https://github.com/CIRCL/lookyloo
new: [API] Trigger modules for a capture
parent
92a466d78b
commit
50dbbd6eba
|
@ -388,7 +388,7 @@ class Lookyloo():
|
|||
with (capture_dir / 'categories').open('w') as f:
|
||||
f.writelines(f'{t}\n' for t in current_categories)
|
||||
|
||||
def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> None:
|
||||
def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> Dict:
|
||||
'''Launch the 3rd party modules on a capture.
|
||||
It uses the cached result *if* the module was triggered the same day.
|
||||
The `force` flag re-triggers the module regardless of the cache.'''
|
||||
|
@ -396,17 +396,20 @@ class Lookyloo():
|
|||
ct = self.get_crawled_tree(capture_uuid)
|
||||
except LookylooException:
|
||||
self.logger.warning(f'Unable to trigger the modules unless the tree ({capture_uuid}) is cached.')
|
||||
return
|
||||
return {'error': f'UUID {capture_uuid} is either unknown or the tree is not ready yet.'}
|
||||
|
||||
self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
|
||||
|
||||
to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}}
|
||||
capture_cache = self.capture_cache(capture_uuid)
|
||||
|
||||
self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
|
||||
self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
|
||||
self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
|
||||
self.urlscan.capture_default_trigger(
|
||||
to_return['PhishingInitiative'] = self.pi.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
|
||||
to_return['VirusTotal'] = self.vt.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger)
|
||||
to_return['UrlScan'] = self.urlscan.capture_default_trigger(
|
||||
self.get_info(capture_uuid),
|
||||
visibility='unlisted' if (capture_cache and capture_cache.no_index) else 'public',
|
||||
force=force, auto_trigger=auto_trigger)
|
||||
return to_return
|
||||
|
||||
def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
|
||||
'''Get the responses of the modules from the cached responses on the disk'''
|
||||
|
|
|
@ -328,18 +328,19 @@ class PhishingInitiative():
|
|||
with cached_entries[0].open() as f:
|
||||
return json.load(f)
|
||||
|
||||
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None:
|
||||
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict:
|
||||
'''Run the module on all the nodes up to the final redirect'''
|
||||
if not self.available:
|
||||
return None
|
||||
return {'error': 'Module not available'}
|
||||
if auto_trigger and not self.allow_auto_trigger:
|
||||
return None
|
||||
return {'error': 'Auto trigger not allowed on module'}
|
||||
|
||||
if crawled_tree.redirects:
|
||||
for redirect in crawled_tree.redirects:
|
||||
self.url_lookup(redirect, force)
|
||||
else:
|
||||
self.url_lookup(crawled_tree.root_hartree.har.root_url, force)
|
||||
return {'success': 'Module triggered'}
|
||||
|
||||
def url_lookup(self, url: str, force: bool=False) -> None:
|
||||
'''Lookup an URL on Phishing Initiative
|
||||
|
@ -421,18 +422,19 @@ class VirusTotal():
|
|||
with cached_entries[0].open() as f:
|
||||
return json.load(f)
|
||||
|
||||
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> None:
|
||||
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=False, auto_trigger: bool=False) -> Dict:
|
||||
'''Run the module on all the nodes up to the final redirect'''
|
||||
if not self.available:
|
||||
return None
|
||||
return {'error': 'Module not available'}
|
||||
if auto_trigger and not self.allow_auto_trigger:
|
||||
return None
|
||||
return {'error': 'Auto trigger not allowed on module'}
|
||||
|
||||
if crawled_tree.redirects:
|
||||
for redirect in crawled_tree.redirects:
|
||||
self.url_lookup(redirect, force)
|
||||
else:
|
||||
self.url_lookup(crawled_tree.root_hartree.har.root_url, force)
|
||||
return {'success': 'Module triggered'}
|
||||
|
||||
def url_lookup(self, url: str, force: bool=False) -> None:
|
||||
'''Lookup an URL on VT
|
||||
|
@ -531,18 +533,19 @@ class UrlScan():
|
|||
with cached_entries[0].open() as f:
|
||||
return json.load(f)
|
||||
|
||||
def capture_default_trigger(self, capture_info: Dict[str, Any], /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> None:
|
||||
def capture_default_trigger(self, capture_info: Dict[str, Any], /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> Dict:
|
||||
'''Run the module on the initial URL'''
|
||||
if not self.available:
|
||||
return None
|
||||
return {'error': 'Module not available'}
|
||||
if auto_trigger and not self.allow_auto_trigger:
|
||||
# NOTE: if auto_trigger is true, it means the request comes from the
|
||||
# auto trigger feature (disabled by default)
|
||||
# Each module can disable auto-trigger to avoid depleating the
|
||||
# API limits.
|
||||
return None
|
||||
return {'error': 'Auto trigger not allowed on module'}
|
||||
|
||||
self.url_submit(capture_info, visibility, force)
|
||||
return {'success': 'Module triggered'}
|
||||
|
||||
def __submit_url(self, url: str, useragent: str, referer: str, visibility: str) -> Dict:
|
||||
data = {"url": url, 'customagent': useragent, 'referer': referer}
|
||||
|
|
|
@ -219,6 +219,23 @@ class MISPPush(Resource):
|
|||
return to_return
|
||||
|
||||
|
||||
trigger_modules_fields = api.model('TriggerModulesFields', {
|
||||
'force': fields.Boolean(description="Force trigger the modules, even if the results are already cached.",
|
||||
default=False, required=False),
|
||||
})
|
||||
|
||||
|
||||
@api.route('/json/<string:capture_uuid>/trigger_modules')
|
||||
@api.doc(description='Trigger all the available 3rd party modules on the given capture',
|
||||
params={'capture_uuid': 'The UUID of the capture'})
|
||||
class TriggerModules(Resource):
|
||||
@api.doc(body=trigger_modules_fields)
|
||||
def post(self, capture_uuid: str):
|
||||
parameters: Dict = request.get_json(force=True)
|
||||
force = True if parameters.get('force') else False
|
||||
return lookyloo.trigger_modules(capture_uuid, force=force)
|
||||
|
||||
|
||||
@api.route('/json/hash_info/<h>')
|
||||
@api.doc(description='Search for a ressource with a specific hash (sha512)',
|
||||
params={'h': 'The hash (sha512)'})
|
||||
|
|
Loading…
Reference in New Issue