lookyloo/lookyloo/modules/pi.py

104 lines
3.6 KiB
Python
Raw Normal View History

2021-09-16 11:22:02 +02:00
#!/usr/bin/env python3
2024-01-12 17:15:41 +01:00
from __future__ import annotations
2021-09-16 11:22:02 +02:00
import json
import time
2021-09-16 11:22:02 +02:00
from datetime import date
2024-01-13 01:24:32 +01:00
from typing import Any, TYPE_CHECKING
2021-09-16 11:22:02 +02:00
2024-01-12 17:15:41 +01:00
from pyeupi import PyEUPI # type: ignore[attr-defined]
2021-09-16 11:22:02 +02:00
2021-10-18 13:06:43 +02:00
from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory
2021-09-16 11:22:02 +02:00
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
2021-09-16 11:22:02 +02:00
class PhishingInitiative(AbstractModule):
2021-09-16 11:22:02 +02:00
def module_init(self) -> bool:
if not self.config.get('apikey'):
self.logger.info('No API key')
return False
2021-09-16 11:22:02 +02:00
self.allow_auto_trigger = False
self.client = PyEUPI(self.config['apikey'])
2021-09-16 11:22:02 +02:00
self.autosubmit = self.config.get('autosubmit', False)
self.allow_auto_trigger = self.config.get('allow_auto_trigger', False)
2021-09-16 11:22:02 +02:00
self.storage_dir_eupi = get_homedir() / 'eupi'
self.storage_dir_eupi.mkdir(parents=True, exist_ok=True)
return True
2021-09-16 11:22:02 +02:00
2024-01-12 17:15:41 +01:00
def get_url_lookup(self, url: str) -> dict[str, Any] | None:
url_storage_dir = get_cache_directory(self.storage_dir_eupi, url)
2021-09-16 11:22:02 +02:00
if not url_storage_dir.exists():
return None
cached_entries = sorted(url_storage_dir.glob('*'), reverse=True)
if not cached_entries:
return None
with cached_entries[0].open() as f:
return json.load(f)
2024-01-12 17:15:41 +01:00
def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]:
2021-09-16 11:22:02 +02:00
'''Run the module on all the nodes up to the final redirect'''
if not self.available:
return {'error': 'Module not available'}
if auto_trigger and not self.allow_auto_trigger:
return {'error': 'Auto trigger not allowed on module'}
if cache.redirects:
for redirect in cache.redirects:
2021-09-16 11:22:02 +02:00
self.url_lookup(redirect, force)
else:
self.url_lookup(cache.url, force)
2021-09-16 11:22:02 +02:00
return {'success': 'Module triggered'}
def url_lookup(self, url: str, force: bool=False) -> None:
'''Lookup an URL on Phishing Initiative
Note: force means 2 things:
* (re)scan of the URL
* re fetch the object from Phishing Initiative even if we already did it today
Note: the URL will only be sent for scan if autosubmit is set to true in the config
'''
if not self.available:
raise ConfigError('PhishingInitiative not available, probably no API key')
url_storage_dir = get_cache_directory(self.storage_dir_eupi, url)
2021-09-16 11:22:02 +02:00
url_storage_dir.mkdir(parents=True, exist_ok=True)
pi_file = url_storage_dir / date.today().isoformat()
scan_requested = False
if self.autosubmit and force:
self.client.post_submission(url, comment='Received on Lookyloo')
scan_requested = True
if not force and pi_file.exists():
return
for _ in range(3):
url_information = self.client.lookup(url)
if not url_information['results']:
# No results, that should not happen (?)
break
if url_information['results'][0]['tag'] == -1:
# Not submitted
if not self.autosubmit:
break
if not scan_requested:
self.client.post_submission(url, comment='Received on Lookyloo')
scan_requested = True
time.sleep(1)
else:
with pi_file.open('w') as _f:
json.dump(url_information, _f)
break