From b4599492f3e813a63999dd891b87aa5bdff74ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 11 Oct 2023 14:57:36 +0200 Subject: [PATCH] fix: Avoid exception killing website if non-responsive 3rd party module. --- bin/async_capture.py | 2 +- lookyloo/context.py | 4 +- lookyloo/lookyloo.py | 62 ++++++------------------ lookyloo/modules/abstractmodule.py | 42 ++++++++++++++++ lookyloo/modules/cloudflare.py | 24 ++++------ lookyloo/modules/fox.py | 29 +++++------ lookyloo/modules/hashlookup.py | 26 +++++----- lookyloo/modules/misp.py | 77 ++++++++++++------------------ lookyloo/modules/pandora.py | 30 +++++------- lookyloo/modules/phishtank.py | 27 ++++++----- lookyloo/modules/pi.py | 24 +++++----- lookyloo/modules/riskiq.py | 55 +++++++++------------ lookyloo/modules/sanejs.py | 33 +++++++------ lookyloo/modules/urlhaus.py | 20 ++++---- lookyloo/modules/urlscan.py | 34 ++++++------- lookyloo/modules/uwhois.py | 32 +++++-------- lookyloo/modules/vt.py | 28 +++++------ 17 files changed, 254 insertions(+), 295 deletions(-) create mode 100644 lookyloo/modules/abstractmodule.py diff --git a/bin/async_capture.py b/bin/async_capture.py index 3677d3d..d965812 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -33,7 +33,7 @@ class AsyncCapture(AbstractManager): if isinstance(self.lookyloo.lacus, LacusCore): self.captures: Set[asyncio.Task] = set() - self.fox = FOX(get_config('modules', 'FOX')) + self.fox = FOX(config_name='FOX') if not self.fox.available: self.logger.warning('Unable to setup the FOX module') diff --git a/lookyloo/context.py b/lookyloo/context.py index e9b21b6..af8b4b3 100644 --- a/lookyloo/context.py +++ b/lookyloo/context.py @@ -21,9 +21,7 @@ class Context(): self.logger.setLevel(get_config('generic', 'loglevel')) self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True) self._cache_known_content() - self.sanejs = SaneJavaScript(get_config('modules', 'SaneJS')) - if not self.sanejs.available: - self.logger.warning('Unable to setup the SaneJS module') + self.sanejs = SaneJavaScript(config_name='SaneJS') def clear_context(self): self.redis.flushdb() diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 350b47d..7b4edc7 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -88,7 +88,6 @@ class Lookyloo(): self.global_proxy.pop('enable') self.securitytxt = PySecurityTXT(useragent=get_useragent_for_requests()) - self.taxonomies = get_taxonomies() self.redis_pool: ConnectionPool = ConnectionPool(connection_class=UnixDomainSocketConnection, @@ -98,29 +97,20 @@ class Lookyloo(): self._priority = get_config('generic', 'priority') # Initialize 3rd party components - self.pi = PhishingInitiative(get_config('modules', 'PhishingInitiative')) - if not self.pi.available: - self.logger.warning('Unable to setup the PhishingInitiative module') - - self.vt = VirusTotal(get_config('modules', 'VirusTotal')) - if not self.vt.available: - self.logger.warning('Unable to setup the VirusTotal module') - # ## Initialize MISP(s) try_old_config = False - if misps_config := get_config('modules', 'MultipleMISPs'): - # New config - self.misps = MISPs(misps_config) - if not self.misps.available: - self.logger.warning('Unable to setup the MISP module') - try_old_config = True + # New config + self.misps = MISPs(config_name='MultipleMISPs') + if not self.misps.available: + self.logger.warning('Unable to setup the MISPs module') + try_old_config = True if try_old_config: # Legacy MISP config, now use MultipleMISPs key to support more than one MISP instance try: if misp_config := get_config('modules', 'MISP'): misps_config = {'default': 'MISP', 'instances': {'MISP': misp_config}} - self.misps = MISPs(misps_config) + self.misps = MISPs(config=misps_config) if self.misps.available: self.logger.warning('Please migrate the MISP config to the "MultipleMISPs" key in the config, and remove the "MISP" key') else: @@ -129,38 +119,16 @@ class Lookyloo(): # The key was removed from the config, and the sample config pass - if not self.misps.available: - self.logger.info('The MISP module is not configured') - # ## Done with MISP(s) - - self.uwhois = UniversalWhois(get_config('modules', 'UniversalWhois')) - if not self.uwhois.available: - self.logger.warning('Unable to setup the UniversalWhois module') - - self.urlscan = UrlScan(get_config('modules', 'UrlScan')) - if not self.urlscan.available: - self.logger.warning('Unable to setup the UrlScan module') - - self.phishtank = Phishtank(get_config('modules', 'Phishtank')) - if not self.phishtank.available: - self.logger.warning('Unable to setup the Phishtank module') - - self.hashlookup = Hashlookup(get_config('modules', 'Hashlookup')) - if not self.hashlookup.available: - self.logger.warning('Unable to setup the Hashlookup module') - - self.riskiq = RiskIQ(get_config('modules', 'RiskIQ')) - if not self.riskiq.available: - self.logger.warning('Unable to setup the RiskIQ module') - - self.pandora = Pandora(get_config('modules', 'Pandora')) - if not self.pandora.available: - self.logger.warning('Unable to setup the Pandora module') - - self.urlhaus = URLhaus(get_config('modules', 'URLhaus')) - if not self.urlhaus.available: - self.logger.warning('Unable to setup the URLhaus module') + self.pi = PhishingInitiative(config_name='PhishingInitiative') + self.vt = VirusTotal(config_name='VirusTotal') + self.uwhois = UniversalWhois(config_name='UniversalWhois') + self.urlscan = UrlScan(config_name='UrlScan') + self.phishtank = Phishtank(config_name='Phishtank') + self.hashlookup = Hashlookup(config_name='Hashlookup') + self.riskiq = RiskIQ(config_name='RiskIQ') + self.pandora = Pandora(config_name='Pandora') + self.urlhaus = URLhaus(config_name='URLhaus') self.monitoring_enabled = False if monitoring_config := get_config('generic', 'monitoring'): diff --git a/lookyloo/modules/abstractmodule.py b/lookyloo/modules/abstractmodule.py new file mode 100644 index 0000000..4406ef8 --- /dev/null +++ b/lookyloo/modules/abstractmodule.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 + +import logging + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any + +from ..default import get_config + +logging.config.dictConfig(get_config('logging')) + + +class AbstractModule(ABC): + '''Just a simple abstract for the modules to catch issues with initialization''' + + def __init__(self, /, *, config_name: Optional[str]=None, + config: Optional[Dict[str, Any]]=None): + self.logger = logging.getLogger(f'{self.__class__.__name__}') + self.logger.setLevel(get_config('generic', 'loglevel')) + self.config: Dict[str, Any] = {} + self._available = False + if config_name: + try: + self.config = get_config('modules', config_name) + except Exception as e: + self.logger.warning(f'Unable to get config for {config_name}: {e}') + return + elif config: + self.config = config + + try: + self._available = self.module_init() + except Exception as e: + self.logger.warning(f'Unable to initialize module: {e}.') + + @property + def available(self) -> bool: + return self._available + + @abstractmethod + def module_init(self) -> bool: + ... diff --git a/lookyloo/modules/cloudflare.py b/lookyloo/modules/cloudflare.py index 8ae5224..e7f0554 100644 --- a/lookyloo/modules/cloudflare.py +++ b/lookyloo/modules/cloudflare.py @@ -1,43 +1,39 @@ #!/usr/bin/env python3 import ipaddress -import logging from typing import Dict, Set import requests -from ..default import ConfigError, get_config +from ..default import ConfigError + +from .abstractmodule import AbstractModule -class Cloudflare(): +class Cloudflare(AbstractModule): '''This module checks if an IP is announced by Cloudflare.''' - def __init__(self): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - + def module_init(self) -> bool: # Get IPv4 - r = requests.get('https://www.cloudflare.com/ips-v4') try: + r = requests.get('https://www.cloudflare.com/ips-v4') r.raise_for_status() ipv4_list = r.text except Exception as e: self.logger.warning(f'Unable to get Cloudflare IPv4 list: {e}') - self.available = False - return + return False # Get IPv6 try: r = requests.get('https://www.cloudflare.com/ips-v6') + r.raise_for_status() ipv6_list = r.text except Exception as e: self.logger.warning(f'Unable to get Cloudflare IPv6 list: {e}') - self.available = False - return - - self.available = True + return False self.v4_list = [ipaddress.ip_network(net) for net in ipv4_list.split('\n')] self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')] + return True def ips_lookup(self, ips: Set[str]) -> Dict[str, bool]: '''Lookup a list of IPs. True means it is a known Cloudflare IP''' diff --git a/lookyloo/modules/fox.py b/lookyloo/modules/fox.py index 5c35f10..a2f12ee 100644 --- a/lookyloo/modules/fox.py +++ b/lookyloo/modules/fox.py @@ -1,36 +1,33 @@ #!/usr/bin/env python3 -import logging -from typing import Any, Dict +from typing import Dict import requests -from ..default import ConfigError, get_config +from ..default import ConfigError from ..helpers import get_useragent_for_requests +from .abstractmodule import AbstractModule -class FOX(): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('apikey'): - self.available = False - return +class FOX(AbstractModule): + + def module_init(self) -> bool: + if not self.config.get('apikey'): + self.logger.info('No API key.') + return False - self.available = True self.autosubmit = False self.allow_auto_trigger = False self.client = requests.session() self.client.headers['User-Agent'] = get_useragent_for_requests() - self.client.headers['X-API-KEY'] = config['apikey'] + self.client.headers['X-API-KEY'] = self.config['apikey'] self.client.headers['Content-Type'] = 'application/json' - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) + self.autosubmit = bool(self.config.get('autosubmit', False)) - if config.get('autosubmit'): - self.autosubmit = True + return True def capture_default_trigger(self, url: str, /, auto_trigger: bool=False) -> Dict: '''Run the module on the initial URL''' diff --git a/lookyloo/modules/hashlookup.py b/lookyloo/modules/hashlookup.py index fe39fcf..2934131 100644 --- a/lookyloo/modules/hashlookup.py +++ b/lookyloo/modules/hashlookup.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 import json -from typing import Any, Dict, List +from typing import Dict, List from har2tree import CrawledTree from pyhashlookup import Hashlookup @@ -9,25 +9,27 @@ from pyhashlookup import Hashlookup from ..default import ConfigError from ..helpers import get_useragent_for_requests +from .abstractmodule import AbstractModule -class HashlookupModule(): + +class HashlookupModule(AbstractModule): '''This module is a bit different as it will trigger a lookup of all the hashes and store the response in the capture directory''' - def __init__(self, config: Dict[str, Any]): - if not config.get('enabled'): - self.available = False - return + def module_init(self) -> bool: + if not self.config.get('enabled'): + self.logger.info('Not enabled.') + return False - self.available = True - self.allow_auto_trigger = False - if config.get('url'): - self.client = Hashlookup(config['url'], useragent=get_useragent_for_requests()) + if self.config.get('url'): + self.client = Hashlookup(self.config['url'], useragent=get_useragent_for_requests()) else: self.client = Hashlookup(useragent=get_useragent_for_requests()) + # Makes sure the webservice is reachable, raises an exception otherwise. + self.client.info() - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) + return True def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict: '''Run the module on all the nodes up to the final redirect''' diff --git a/lookyloo/modules/misp.py b/lookyloo/modules/misp.py index 79c051a..091222b 100644 --- a/lookyloo/modules/misp.py +++ b/lookyloo/modules/misp.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import logging import re from io import BytesIO @@ -16,35 +15,31 @@ from pymisp.tools import FileObject, URLObject from ..default import get_config, get_homedir from ..helpers import get_public_suffix_list +from .abstractmodule import AbstractModule + if TYPE_CHECKING: from ..capturecache import CaptureCache -class MISPs(Mapping): +class MISPs(Mapping, AbstractModule): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - - if not config.get('default'): - self.available = False + def module_init(self) -> bool: + if not self.config.get('default'): self.logger.info('No default instance configured, disabling MISP.') - return - if not config.get('instances'): - self.available = False + return False + if not self.config.get('instances'): self.logger.warning('No MISP instances configured, disabling MISP.') - return + return False - self.default_instance = config['default'] + self.default_instance = self.config['default'] - if self.default_instance not in config['instances']: - self.available = False - self.logger.warning(f"The default MISP instance ({self.default_instance}) is missing in the instances ({', '.join(config['instances'].keys())}), disabling MISP.") - return + if self.default_instance not in self.config['instances']: + self.logger.warning(f"The default MISP instance ({self.default_instance}) is missing in the instances ({', '.join(self.config['instances'].keys())}), disabling MISP.") + return False self.__misps: Dict[str, 'MISP'] = {} - for instance_name, instance_config in config['instances'].items(): - if misp_connector := MISP(instance_config): + for instance_name, instance_config in self.config['instances'].items(): + if misp_connector := MISP(config=instance_config): if misp_connector.available: self.__misps[instance_name] = misp_connector else: @@ -53,11 +48,10 @@ class MISPs(Mapping): self.logger.warning(f"Unable to initialize the connector to '{instance_name}'. It won't be available.") if not self.__misps.get(self.default_instance) or not self.__misps[self.default_instance].available: - self.available = False self.logger.warning("Unable to initialize the connector to the default MISP instance, disabling MISP.") - return + return False - self.available = True + return True def __getitem__(self, name: str) -> 'MISP': return self.__misps[name] @@ -151,39 +145,30 @@ class MISPs(Mapping): obj.add_attributes('ip', *hostnodes[0].resolved_ips) -class MISP(): +class MISP(AbstractModule): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('apikey'): - self.available = False - self.logger.info('Module not enabled.') - return + def module_init(self) -> bool: + if not self.config.get('apikey'): + self.logger.info('No API key: {self.config}.') + return False - self.available = True - self.enable_lookup = False - self.enable_push = False - self.allow_auto_trigger = False try: - self.client = PyMISP(url=config['url'], key=config['apikey'], - ssl=config['verify_tls_cert'], timeout=config['timeout']) + self.client = PyMISP(url=self.config['url'], key=self.config['apikey'], + ssl=self.config['verify_tls_cert'], timeout=self.config['timeout']) except Exception as e: - self.available = False self.logger.warning(f'Unable to connect to MISP: {e}') - return + return False - if config.get('enable_lookup'): - self.enable_lookup = True - if config.get('enable_push'): - self.enable_push = True - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True - self.default_tags: List[str] = config.get('default_tags') # type: ignore - self.auto_publish = config.get('auto_publish') + self.enable_lookup = bool(self.config.get('enable_lookup', False)) + self.enable_push = bool(self.config.get('enable_push', False)) + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) + + self.default_tags: List[str] = self.config.get('default_tags') # type: ignore + self.auto_publish = bool(self.config.get('auto_publish', False)) self.storage_dir_misp = get_homedir() / 'misp' self.storage_dir_misp.mkdir(parents=True, exist_ok=True) self.psl = get_public_suffix_list() + return True def get_fav_tags(self): return self.client.tags(pythonify=True, favouritesOnly=1) diff --git a/lookyloo/modules/pandora.py b/lookyloo/modules/pandora.py index f66a3e6..c25c946 100644 --- a/lookyloo/modules/pandora.py +++ b/lookyloo/modules/pandora.py @@ -1,35 +1,31 @@ #!/usr/bin/env python3 -import logging from io import BytesIO -from typing import Any, Dict +from typing import Dict from pypandora import PyPandora -from ..default import ConfigError, get_config +from ..default import ConfigError from ..helpers import get_useragent_for_requests +from .abstractmodule import AbstractModule -class Pandora(): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('url'): - self.available = False - return +class Pandora(AbstractModule): - self.client = PyPandora(root_url=config['url'], useragent=get_useragent_for_requests()) + def module_init(self) -> bool: + if not self.config.get('url'): + self.logger.info('No URL in config.') + return False + self.client = PyPandora(root_url=self.config['url'], useragent=get_useragent_for_requests()) if not self.client.is_up: - self.available = False - return + self.logger.warning('Not up.') + return False - self.available = True - self.allow_auto_trigger = False + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + return True def capture_default_trigger(self, file_in_memory: BytesIO, filename: str, /, auto_trigger: bool=False) -> Dict: '''Automatically submit the file if the landing URL is a file instead of a webpage''' diff --git a/lookyloo/modules/phishtank.py b/lookyloo/modules/phishtank.py index b4eee7b..4beec67 100644 --- a/lookyloo/modules/phishtank.py +++ b/lookyloo/modules/phishtank.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import json + from datetime import date, datetime, timedelta, timezone from typing import Any, Dict, Optional, List, TYPE_CHECKING @@ -12,26 +13,30 @@ from ..helpers import get_cache_directory if TYPE_CHECKING: from ..capturecache import CaptureCache +from .abstractmodule import AbstractModule -class Phishtank(): - def __init__(self, config: Dict[str, Any]): - if not config.get('enabled'): - self.available = False - return +class Phishtank(AbstractModule): - self.available = True - self.allow_auto_trigger = False - if config.get('url'): - self.client = PhishtankLookup(config['url']) + def module_init(self) -> bool: + if not self.config.get('enabled'): + self.logger.info('Not enabled.') + return False + + if self.config.get('url'): + self.client = PhishtankLookup(self.config['url']) else: self.client = PhishtankLookup() - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + if not self.client.is_up: + self.logger.warning('Not up.') + return False + + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) self.storage_dir_pt = get_homedir() / 'phishtank' self.storage_dir_pt.mkdir(parents=True, exist_ok=True) + return True def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: url_storage_dir = get_cache_directory(self.storage_dir_pt, url, 'url') diff --git a/lookyloo/modules/pi.py b/lookyloo/modules/pi.py index 518d9ae..5cd7b70 100644 --- a/lookyloo/modules/pi.py +++ b/lookyloo/modules/pi.py @@ -14,27 +14,25 @@ from ..helpers import get_cache_directory if TYPE_CHECKING: from ..capturecache import CaptureCache +from .abstractmodule import AbstractModule -class PhishingInitiative(): - def __init__(self, config: Dict[str, Any]): - if not config.get('apikey'): - self.available = False - return +class PhishingInitiative(AbstractModule): + + def module_init(self) -> bool: + if not self.config.get('apikey'): + self.logger.info('No API key') + return False - self.available = True - self.autosubmit = False self.allow_auto_trigger = False - self.client = PyEUPI(config['apikey']) + self.client = PyEUPI(self.config['apikey']) - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True - - if config.get('autosubmit'): - self.autosubmit = True + self.autosubmit = self.config.get('autosubmit', False) + self.allow_auto_trigger = self.config.get('allow_auto_trigger', False) self.storage_dir_eupi = get_homedir() / 'eupi' self.storage_dir_eupi.mkdir(parents=True, exist_ok=True) + return True def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: url_storage_dir = get_cache_directory(self.storage_dir_eupi, url) diff --git a/lookyloo/modules/riskiq.py b/lookyloo/modules/riskiq.py index 885b1ab..5005afb 100644 --- a/lookyloo/modules/riskiq.py +++ b/lookyloo/modules/riskiq.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import json -import logging from datetime import date, datetime, timedelta from typing import Any, Dict, Optional, Union, TYPE_CHECKING @@ -10,13 +9,15 @@ from urllib.parse import urlparse from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore from requests import Response -from ..default import ConfigError, get_homedir, get_config +from ..default import ConfigError, get_homedir from ..exceptions import ModuleError from ..helpers import get_cache_directory if TYPE_CHECKING: from ..capturecache import CaptureCache +from .abstractmodule import AbstractModule + class RiskIQError(ModuleError): @@ -24,46 +25,36 @@ class RiskIQError(ModuleError): self.response = response -class RiskIQ(): +class RiskIQ(AbstractModule): - def __init__(self, config: Dict[str, Any]): - if not (config.get('user') and config.get('apikey')): - self.available = False - return - - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - - self.available = True - self.allow_auto_trigger = False + def module_init(self) -> bool: + if not (self.config.get('user') and self.config.get('apikey')): + self.logger.info('Missing credentials.') + return False try: # Check if account is working - test_client = AccountClient(username=config.get('user'), api_key=config.get('apikey'), exception_class=RiskIQError) + test_client = AccountClient(username=self.config.get('user'), + api_key=self.config.get('apikey'), exception_class=RiskIQError) details = test_client.get_account_details() + self.client_dns = DnsRequest(username=self.config.get('user'), + api_key=self.config.get('apikey'), exception_class=RiskIQError) + self.client_whois = WhoisRequest(username=self.config.get('user'), + api_key=self.config.get('apikey'), exception_class=RiskIQError) except RiskIQError as e: - self.available = False - if hasattr(e, 'response'): - details = e.response.json() - if 'message' in details: - self.logger.warning(f'RiskIQ not available, {details["message"]}') - self.logger.warning(f'RiskIQ not available: {e}') - return - except Exception as e: - self.available = False - self.logger.warning(f'RiskIQ not available: {e}') - return + details = e.response.json() + if 'message' in details: + self.logger.warning(f'RiskIQ not available: {details["message"]}') + else: + self.logger.warning(f'RiskIQ not available: {details}') + return False - self.client_dns = DnsRequest(username=config.get('user'), api_key=config.get('apikey'), exception_class=RiskIQError) - self.client_whois = WhoisRequest(username=config.get('user'), api_key=config.get('apikey'), exception_class=RiskIQError) - - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True - - self.default_first_seen = config.get('default_first_seen_in_days', 5) + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) + self.default_first_seen = self.config.get('default_first_seen_in_days', 5) self.storage_dir_riskiq = get_homedir() / 'riskiq' self.storage_dir_riskiq.mkdir(parents=True, exist_ok=True) + return True def get_passivedns(self, query: str) -> Optional[Dict[str, Any]]: # The query can be IP or Hostname. For now, we only do it on domains. diff --git a/lookyloo/modules/sanejs.py b/lookyloo/modules/sanejs.py index 04211a2..46182de 100644 --- a/lookyloo/modules/sanejs.py +++ b/lookyloo/modules/sanejs.py @@ -1,34 +1,33 @@ #!/usr/bin/env python3 import json -import logging from datetime import date -from typing import Any, Dict, Iterable, List, Union +from typing import Dict, Iterable, List, Union from pysanejs import SaneJS -from ..default import get_config, get_homedir +from ..default import get_homedir + +from .abstractmodule import AbstractModule -class SaneJavaScript(): +class SaneJavaScript(AbstractModule): + + def module_init(self) -> bool: + if not self.config.get('enabled'): + self.logger.info('Not enabled.') + return False - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('enabled'): - self.available = False - self.logger.info('Module not enabled.') - return self.client = SaneJS() + if not self.client.is_up: - self.available = False - return - self.available = True - self.allow_auto_trigger = False - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + self.logger.warning('Not up.') + return False + + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) self.storage_dir = get_homedir() / 'sanejs' self.storage_dir.mkdir(parents=True, exist_ok=True) + return True def hashes_lookup(self, sha512: Union[Iterable[str], str], force: bool=False) -> Dict[str, List[str]]: if isinstance(sha512, str): diff --git a/lookyloo/modules/urlhaus.py b/lookyloo/modules/urlhaus.py index 1a13edb..c531ede 100644 --- a/lookyloo/modules/urlhaus.py +++ b/lookyloo/modules/urlhaus.py @@ -12,22 +12,22 @@ from ..helpers import get_cache_directory if TYPE_CHECKING: from ..capturecache import CaptureCache +from .abstractmodule import AbstractModule -class URLhaus(): - def __init__(self, config: Dict[str, Any]): - if not config.get('enabled'): - self.available = False - return +class URLhaus(AbstractModule): - self.available = True - self.allow_auto_trigger = False - self.url = config.get('url') - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + def module_init(self) -> bool: + if not self.config.get('enabled'): + self.logger.info('Not enabled') + return False + + self.url = self.config.get('url') + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) self.storage_dir_uh = get_homedir() / 'urlhaus' self.storage_dir_uh.mkdir(parents=True, exist_ok=True) + return True def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: url_storage_dir = get_cache_directory(self.storage_dir_uh, url, 'url') diff --git a/lookyloo/modules/urlscan.py b/lookyloo/modules/urlscan.py index eb5a5cb..70bcd5e 100644 --- a/lookyloo/modules/urlscan.py +++ b/lookyloo/modules/urlscan.py @@ -1,48 +1,41 @@ #!/usr/bin/env python3 import json -import logging from datetime import date from typing import Any, Dict, Optional, TYPE_CHECKING import requests -from ..default import ConfigError, get_config, get_homedir +from ..default import ConfigError, get_homedir from ..helpers import get_useragent_for_requests, get_cache_directory if TYPE_CHECKING: from ..capturecache import CaptureCache +from .abstractmodule import AbstractModule -class UrlScan(): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('apikey'): - self.available = False - return +class UrlScan(AbstractModule): + + def module_init(self) -> bool: + if not self.config.get('apikey'): + self.logger.info('No API key.') + return False - self.available = True - self.autosubmit = False - self.allow_auto_trigger = False self.client = requests.session() self.client.headers['User-Agent'] = get_useragent_for_requests() - self.client.headers['API-Key'] = config['apikey'] + self.client.headers['API-Key'] = self.config['apikey'] self.client.headers['Content-Type'] = 'application/json' - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) + self.autosubmit = bool(self.config.get('autosubmit', False)) - if config.get('autosubmit'): - self.autosubmit = True - - if config.get('force_visibility'): + if self.config.get('force_visibility'): # Cases: # 1. False: unlisted for hidden captures / public for others # 2. "key": default visibility defined on urlscan.io # 3. "public", "unlisted", "private": is set for all submissions - self.force_visibility = config['force_visibility'] + self.force_visibility = self.config['force_visibility'] else: self.force_visibility = False @@ -52,6 +45,7 @@ class UrlScan(): self.storage_dir_urlscan = get_homedir() / 'urlscan' self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True) + return True def get_url_submission(self, capture_info: 'CaptureCache') -> Dict[str, Any]: url_storage_dir = get_cache_directory( diff --git a/lookyloo/modules/uwhois.py b/lookyloo/modules/uwhois.py index c5bf087..f8c8d53 100644 --- a/lookyloo/modules/uwhois.py +++ b/lookyloo/modules/uwhois.py @@ -1,39 +1,33 @@ #!/usr/bin/env python3 -import logging import re import socket -from typing import Any, Dict, overload, Literal, List, Union +from typing import overload, Literal, List, Union from har2tree import CrawledTree, Har2TreeError, HostNode -from ..default import get_config +from .abstractmodule import AbstractModule -class UniversalWhois(): +class UniversalWhois(AbstractModule): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('enabled'): - self.available = False - self.logger.info('Module not enabled.') - return - self.server = config.get('ipaddress') - self.port = config.get('port') - self.allow_auto_trigger = False - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + def module_init(self) -> bool: + if not self.config.get('enabled'): + self.logger.info('Not enabled.') + return False + + self.server = self.config.get('ipaddress') + self.port = self.config.get('port') + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((self.server, self.port)) except Exception as e: - self.available = False self.logger.warning(f'Unable to connect to uwhois ({self.server}:{self.port}): {e}') - return - self.available = True + return False + return True def query_whois_hostnode(self, hostnode: HostNode) -> None: if hasattr(hostnode, 'resolved_ips'): diff --git a/lookyloo/modules/vt.py b/lookyloo/modules/vt.py index 0e6aa73..d8059e7 100644 --- a/lookyloo/modules/vt.py +++ b/lookyloo/modules/vt.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import json -import logging import time from datetime import date from typing import Any, Dict, Optional, TYPE_CHECKING @@ -9,35 +8,30 @@ from typing import Any, Dict, Optional, TYPE_CHECKING import vt # type: ignore from vt.error import APIError # type: ignore -from ..default import ConfigError, get_homedir, get_config +from ..default import ConfigError, get_homedir from ..helpers import get_cache_directory if TYPE_CHECKING: from ..capturecache import CaptureCache +from .abstractmodule import AbstractModule -class VirusTotal(): - def __init__(self, config: Dict[str, Any]): - self.logger = logging.getLogger(f'{self.__class__.__name__}') - self.logger.setLevel(get_config('generic', 'loglevel')) - if not config.get('apikey'): - self.available = False - return +class VirusTotal(AbstractModule): - self.available = True - self.autosubmit = False - self.allow_auto_trigger = False - self.client = vt.Client(config['apikey']) + def module_init(self) -> bool: + if not self.config.get('apikey'): + self.logger.info('Not enabled') + return False - if config.get('allow_auto_trigger'): - self.allow_auto_trigger = True + self.client = vt.Client(self.config['apikey']) - if config.get('autosubmit'): - self.autosubmit = True + self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) + self.autosubmit = bool(self.config.get('autosubmit', False)) self.storage_dir_vt = get_homedir() / 'vt_url' self.storage_dir_vt.mkdir(parents=True, exist_ok=True) + return True def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: url_storage_dir = get_cache_directory(self.storage_dir_vt, vt.url_id(url))