fix: Avoid exception killing website if non-responsive 3rd party module.

pull/802/head
Raphaël Vinot 2023-10-11 14:57:36 +02:00
parent 5db3401079
commit b4599492f3
17 changed files with 254 additions and 295 deletions

View File

@ -33,7 +33,7 @@ class AsyncCapture(AbstractManager):
if isinstance(self.lookyloo.lacus, LacusCore):
self.captures: Set[asyncio.Task] = set()
self.fox = FOX(get_config('modules', 'FOX'))
self.fox = FOX(config_name='FOX')
if not self.fox.available:
self.logger.warning('Unable to setup the FOX module')

View File

@ -21,9 +21,7 @@ class Context():
self.logger.setLevel(get_config('generic', 'loglevel'))
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True)
self._cache_known_content()
self.sanejs = SaneJavaScript(get_config('modules', 'SaneJS'))
if not self.sanejs.available:
self.logger.warning('Unable to setup the SaneJS module')
self.sanejs = SaneJavaScript(config_name='SaneJS')
def clear_context(self):
self.redis.flushdb()

View File

@ -88,7 +88,6 @@ class Lookyloo():
self.global_proxy.pop('enable')
self.securitytxt = PySecurityTXT(useragent=get_useragent_for_requests())
self.taxonomies = get_taxonomies()
self.redis_pool: ConnectionPool = ConnectionPool(connection_class=UnixDomainSocketConnection,
@ -98,29 +97,20 @@ class Lookyloo():
self._priority = get_config('generic', 'priority')
# Initialize 3rd party components
self.pi = PhishingInitiative(get_config('modules', 'PhishingInitiative'))
if not self.pi.available:
self.logger.warning('Unable to setup the PhishingInitiative module')
self.vt = VirusTotal(get_config('modules', 'VirusTotal'))
if not self.vt.available:
self.logger.warning('Unable to setup the VirusTotal module')
# ## Initialize MISP(s)
try_old_config = False
if misps_config := get_config('modules', 'MultipleMISPs'):
# New config
self.misps = MISPs(misps_config)
if not self.misps.available:
self.logger.warning('Unable to setup the MISP module')
try_old_config = True
# New config
self.misps = MISPs(config_name='MultipleMISPs')
if not self.misps.available:
self.logger.warning('Unable to setup the MISPs module')
try_old_config = True
if try_old_config:
# Legacy MISP config, now use MultipleMISPs key to support more than one MISP instance
try:
if misp_config := get_config('modules', 'MISP'):
misps_config = {'default': 'MISP', 'instances': {'MISP': misp_config}}
self.misps = MISPs(misps_config)
self.misps = MISPs(config=misps_config)
if self.misps.available:
self.logger.warning('Please migrate the MISP config to the "MultipleMISPs" key in the config, and remove the "MISP" key')
else:
@ -129,38 +119,16 @@ class Lookyloo():
# The key was removed from the config, and the sample config
pass
if not self.misps.available:
self.logger.info('The MISP module is not configured')
# ## Done with MISP(s)
self.uwhois = UniversalWhois(get_config('modules', 'UniversalWhois'))
if not self.uwhois.available:
self.logger.warning('Unable to setup the UniversalWhois module')
self.urlscan = UrlScan(get_config('modules', 'UrlScan'))
if not self.urlscan.available:
self.logger.warning('Unable to setup the UrlScan module')
self.phishtank = Phishtank(get_config('modules', 'Phishtank'))
if not self.phishtank.available:
self.logger.warning('Unable to setup the Phishtank module')
self.hashlookup = Hashlookup(get_config('modules', 'Hashlookup'))
if not self.hashlookup.available:
self.logger.warning('Unable to setup the Hashlookup module')
self.riskiq = RiskIQ(get_config('modules', 'RiskIQ'))
if not self.riskiq.available:
self.logger.warning('Unable to setup the RiskIQ module')
self.pandora = Pandora(get_config('modules', 'Pandora'))
if not self.pandora.available:
self.logger.warning('Unable to setup the Pandora module')
self.urlhaus = URLhaus(get_config('modules', 'URLhaus'))
if not self.urlhaus.available:
self.logger.warning('Unable to setup the URLhaus module')
self.pi = PhishingInitiative(config_name='PhishingInitiative')
self.vt = VirusTotal(config_name='VirusTotal')
self.uwhois = UniversalWhois(config_name='UniversalWhois')
self.urlscan = UrlScan(config_name='UrlScan')
self.phishtank = Phishtank(config_name='Phishtank')
self.hashlookup = Hashlookup(config_name='Hashlookup')
self.riskiq = RiskIQ(config_name='RiskIQ')
self.pandora = Pandora(config_name='Pandora')
self.urlhaus = URLhaus(config_name='URLhaus')
self.monitoring_enabled = False
if monitoring_config := get_config('generic', 'monitoring'):

View File

@ -0,0 +1,42 @@
#!/usr/bin/env python3
import logging
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from ..default import get_config
logging.config.dictConfig(get_config('logging'))
class AbstractModule(ABC):
'''Just a simple abstract for the modules to catch issues with initialization'''
def __init__(self, /, *, config_name: Optional[str]=None,
config: Optional[Dict[str, Any]]=None):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.config: Dict[str, Any] = {}
self._available = False
if config_name:
try:
self.config = get_config('modules', config_name)
except Exception as e:
self.logger.warning(f'Unable to get config for {config_name}: {e}')
return
elif config:
self.config = config
try:
self._available = self.module_init()
except Exception as e:
self.logger.warning(f'Unable to initialize module: {e}.')
@property
def available(self) -> bool:
return self._available
@abstractmethod
def module_init(self) -> bool:
...

View File

@ -1,43 +1,39 @@
#!/usr/bin/env python3
import ipaddress
import logging
from typing import Dict, Set
import requests
from ..default import ConfigError, get_config
from ..default import ConfigError
from .abstractmodule import AbstractModule
class Cloudflare():
class Cloudflare(AbstractModule):
'''This module checks if an IP is announced by Cloudflare.'''
def __init__(self):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
def module_init(self) -> bool:
# Get IPv4
r = requests.get('https://www.cloudflare.com/ips-v4')
try:
r = requests.get('https://www.cloudflare.com/ips-v4')
r.raise_for_status()
ipv4_list = r.text
except Exception as e:
self.logger.warning(f'Unable to get Cloudflare IPv4 list: {e}')
self.available = False
return
return False
# Get IPv6
try:
r = requests.get('https://www.cloudflare.com/ips-v6')
r.raise_for_status()
ipv6_list = r.text
except Exception as e:
self.logger.warning(f'Unable to get Cloudflare IPv6 list: {e}')
self.available = False
return
self.available = True
return False
self.v4_list = [ipaddress.ip_network(net) for net in ipv4_list.split('\n')]
self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')]
return True
def ips_lookup(self, ips: Set[str]) -> Dict[str, bool]:
'''Lookup a list of IPs. True means it is a known Cloudflare IP'''

View File

@ -1,36 +1,33 @@
#!/usr/bin/env python3
import logging
from typing import Any, Dict
from typing import Dict
import requests
from ..default import ConfigError, get_config
from ..default import ConfigError
from ..helpers import get_useragent_for_requests
from .abstractmodule import AbstractModule
class FOX():
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('apikey'):
self.available = False
return
class FOX(AbstractModule):
def module_init(self) -> bool:
if not self.config.get('apikey'):
self.logger.info('No API key.')
return False
self.available = True
self.autosubmit = False
self.allow_auto_trigger = False
self.client = requests.session()
self.client.headers['User-Agent'] = get_useragent_for_requests()
self.client.headers['X-API-KEY'] = config['apikey']
self.client.headers['X-API-KEY'] = self.config['apikey']
self.client.headers['Content-Type'] = 'application/json'
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.autosubmit = bool(self.config.get('autosubmit', False))
if config.get('autosubmit'):
self.autosubmit = True
return True
def capture_default_trigger(self, url: str, /, auto_trigger: bool=False) -> Dict:
'''Run the module on the initial URL'''

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
import json
from typing import Any, Dict, List
from typing import Dict, List
from har2tree import CrawledTree
from pyhashlookup import Hashlookup
@ -9,25 +9,27 @@ from pyhashlookup import Hashlookup
from ..default import ConfigError
from ..helpers import get_useragent_for_requests
from .abstractmodule import AbstractModule
class HashlookupModule():
class HashlookupModule(AbstractModule):
'''This module is a bit different as it will trigger a lookup of all the hashes
and store the response in the capture directory'''
def __init__(self, config: Dict[str, Any]):
if not config.get('enabled'):
self.available = False
return
def module_init(self) -> bool:
if not self.config.get('enabled'):
self.logger.info('Not enabled.')
return False
self.available = True
self.allow_auto_trigger = False
if config.get('url'):
self.client = Hashlookup(config['url'], useragent=get_useragent_for_requests())
if self.config.get('url'):
self.client = Hashlookup(self.config['url'], useragent=get_useragent_for_requests())
else:
self.client = Hashlookup(useragent=get_useragent_for_requests())
# Makes sure the webservice is reachable, raises an exception otherwise.
self.client.info()
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
return True
def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict:
'''Run the module on all the nodes up to the final redirect'''

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
import logging
import re
from io import BytesIO
@ -16,35 +15,31 @@ from pymisp.tools import FileObject, URLObject
from ..default import get_config, get_homedir
from ..helpers import get_public_suffix_list
from .abstractmodule import AbstractModule
if TYPE_CHECKING:
from ..capturecache import CaptureCache
class MISPs(Mapping):
class MISPs(Mapping, AbstractModule):
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('default'):
self.available = False
def module_init(self) -> bool:
if not self.config.get('default'):
self.logger.info('No default instance configured, disabling MISP.')
return
if not config.get('instances'):
self.available = False
return False
if not self.config.get('instances'):
self.logger.warning('No MISP instances configured, disabling MISP.')
return
return False
self.default_instance = config['default']
self.default_instance = self.config['default']
if self.default_instance not in config['instances']:
self.available = False
self.logger.warning(f"The default MISP instance ({self.default_instance}) is missing in the instances ({', '.join(config['instances'].keys())}), disabling MISP.")
return
if self.default_instance not in self.config['instances']:
self.logger.warning(f"The default MISP instance ({self.default_instance}) is missing in the instances ({', '.join(self.config['instances'].keys())}), disabling MISP.")
return False
self.__misps: Dict[str, 'MISP'] = {}
for instance_name, instance_config in config['instances'].items():
if misp_connector := MISP(instance_config):
for instance_name, instance_config in self.config['instances'].items():
if misp_connector := MISP(config=instance_config):
if misp_connector.available:
self.__misps[instance_name] = misp_connector
else:
@ -53,11 +48,10 @@ class MISPs(Mapping):
self.logger.warning(f"Unable to initialize the connector to '{instance_name}'. It won't be available.")
if not self.__misps.get(self.default_instance) or not self.__misps[self.default_instance].available:
self.available = False
self.logger.warning("Unable to initialize the connector to the default MISP instance, disabling MISP.")
return
return False
self.available = True
return True
def __getitem__(self, name: str) -> 'MISP':
return self.__misps[name]
@ -151,39 +145,30 @@ class MISPs(Mapping):
obj.add_attributes('ip', *hostnodes[0].resolved_ips)
class MISP():
class MISP(AbstractModule):
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('apikey'):
self.available = False
self.logger.info('Module not enabled.')
return
def module_init(self) -> bool:
if not self.config.get('apikey'):
self.logger.info('No API key: {self.config}.')
return False
self.available = True
self.enable_lookup = False
self.enable_push = False
self.allow_auto_trigger = False
try:
self.client = PyMISP(url=config['url'], key=config['apikey'],
ssl=config['verify_tls_cert'], timeout=config['timeout'])
self.client = PyMISP(url=self.config['url'], key=self.config['apikey'],
ssl=self.config['verify_tls_cert'], timeout=self.config['timeout'])
except Exception as e:
self.available = False
self.logger.warning(f'Unable to connect to MISP: {e}')
return
return False
if config.get('enable_lookup'):
self.enable_lookup = True
if config.get('enable_push'):
self.enable_push = True
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.default_tags: List[str] = config.get('default_tags') # type: ignore
self.auto_publish = config.get('auto_publish')
self.enable_lookup = bool(self.config.get('enable_lookup', False))
self.enable_push = bool(self.config.get('enable_push', False))
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.default_tags: List[str] = self.config.get('default_tags') # type: ignore
self.auto_publish = bool(self.config.get('auto_publish', False))
self.storage_dir_misp = get_homedir() / 'misp'
self.storage_dir_misp.mkdir(parents=True, exist_ok=True)
self.psl = get_public_suffix_list()
return True
def get_fav_tags(self):
return self.client.tags(pythonify=True, favouritesOnly=1)

View File

@ -1,35 +1,31 @@
#!/usr/bin/env python3
import logging
from io import BytesIO
from typing import Any, Dict
from typing import Dict
from pypandora import PyPandora
from ..default import ConfigError, get_config
from ..default import ConfigError
from ..helpers import get_useragent_for_requests
from .abstractmodule import AbstractModule
class Pandora():
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('url'):
self.available = False
return
class Pandora(AbstractModule):
self.client = PyPandora(root_url=config['url'], useragent=get_useragent_for_requests())
def module_init(self) -> bool:
if not self.config.get('url'):
self.logger.info('No URL in config.')
return False
self.client = PyPandora(root_url=self.config['url'], useragent=get_useragent_for_requests())
if not self.client.is_up:
self.available = False
return
self.logger.warning('Not up.')
return False
self.available = True
self.allow_auto_trigger = False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
return True
def capture_default_trigger(self, file_in_memory: BytesIO, filename: str, /, auto_trigger: bool=False) -> Dict:
'''Automatically submit the file if the landing URL is a file instead of a webpage'''

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import json
from datetime import date, datetime, timedelta, timezone
from typing import Any, Dict, Optional, List, TYPE_CHECKING
@ -12,26 +13,30 @@ from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
class Phishtank():
def __init__(self, config: Dict[str, Any]):
if not config.get('enabled'):
self.available = False
return
class Phishtank(AbstractModule):
self.available = True
self.allow_auto_trigger = False
if config.get('url'):
self.client = PhishtankLookup(config['url'])
def module_init(self) -> bool:
if not self.config.get('enabled'):
self.logger.info('Not enabled.')
return False
if self.config.get('url'):
self.client = PhishtankLookup(self.config['url'])
else:
self.client = PhishtankLookup()
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
if not self.client.is_up:
self.logger.warning('Not up.')
return False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir_pt = get_homedir() / 'phishtank'
self.storage_dir_pt.mkdir(parents=True, exist_ok=True)
return True
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = get_cache_directory(self.storage_dir_pt, url, 'url')

View File

@ -14,27 +14,25 @@ from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
class PhishingInitiative():
def __init__(self, config: Dict[str, Any]):
if not config.get('apikey'):
self.available = False
return
class PhishingInitiative(AbstractModule):
def module_init(self) -> bool:
if not self.config.get('apikey'):
self.logger.info('No API key')
return False
self.available = True
self.autosubmit = False
self.allow_auto_trigger = False
self.client = PyEUPI(config['apikey'])
self.client = PyEUPI(self.config['apikey'])
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
if config.get('autosubmit'):
self.autosubmit = True
self.autosubmit = self.config.get('autosubmit', False)
self.allow_auto_trigger = self.config.get('allow_auto_trigger', False)
self.storage_dir_eupi = get_homedir() / 'eupi'
self.storage_dir_eupi.mkdir(parents=True, exist_ok=True)
return True
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = get_cache_directory(self.storage_dir_eupi, url)

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import json
import logging
from datetime import date, datetime, timedelta
from typing import Any, Dict, Optional, Union, TYPE_CHECKING
@ -10,13 +9,15 @@ from urllib.parse import urlparse
from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore
from requests import Response
from ..default import ConfigError, get_homedir, get_config
from ..default import ConfigError, get_homedir
from ..exceptions import ModuleError
from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
class RiskIQError(ModuleError):
@ -24,46 +25,36 @@ class RiskIQError(ModuleError):
self.response = response
class RiskIQ():
class RiskIQ(AbstractModule):
def __init__(self, config: Dict[str, Any]):
if not (config.get('user') and config.get('apikey')):
self.available = False
return
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.available = True
self.allow_auto_trigger = False
def module_init(self) -> bool:
if not (self.config.get('user') and self.config.get('apikey')):
self.logger.info('Missing credentials.')
return False
try:
# Check if account is working
test_client = AccountClient(username=config.get('user'), api_key=config.get('apikey'), exception_class=RiskIQError)
test_client = AccountClient(username=self.config.get('user'),
api_key=self.config.get('apikey'), exception_class=RiskIQError)
details = test_client.get_account_details()
self.client_dns = DnsRequest(username=self.config.get('user'),
api_key=self.config.get('apikey'), exception_class=RiskIQError)
self.client_whois = WhoisRequest(username=self.config.get('user'),
api_key=self.config.get('apikey'), exception_class=RiskIQError)
except RiskIQError as e:
self.available = False
if hasattr(e, 'response'):
details = e.response.json()
if 'message' in details:
self.logger.warning(f'RiskIQ not available, {details["message"]}')
self.logger.warning(f'RiskIQ not available: {e}')
return
except Exception as e:
self.available = False
self.logger.warning(f'RiskIQ not available: {e}')
return
details = e.response.json()
if 'message' in details:
self.logger.warning(f'RiskIQ not available: {details["message"]}')
else:
self.logger.warning(f'RiskIQ not available: {details}')
return False
self.client_dns = DnsRequest(username=config.get('user'), api_key=config.get('apikey'), exception_class=RiskIQError)
self.client_whois = WhoisRequest(username=config.get('user'), api_key=config.get('apikey'), exception_class=RiskIQError)
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.default_first_seen = config.get('default_first_seen_in_days', 5)
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.default_first_seen = self.config.get('default_first_seen_in_days', 5)
self.storage_dir_riskiq = get_homedir() / 'riskiq'
self.storage_dir_riskiq.mkdir(parents=True, exist_ok=True)
return True
def get_passivedns(self, query: str) -> Optional[Dict[str, Any]]:
# The query can be IP or Hostname. For now, we only do it on domains.

View File

@ -1,34 +1,33 @@
#!/usr/bin/env python3
import json
import logging
from datetime import date
from typing import Any, Dict, Iterable, List, Union
from typing import Dict, Iterable, List, Union
from pysanejs import SaneJS
from ..default import get_config, get_homedir
from ..default import get_homedir
from .abstractmodule import AbstractModule
class SaneJavaScript():
class SaneJavaScript(AbstractModule):
def module_init(self) -> bool:
if not self.config.get('enabled'):
self.logger.info('Not enabled.')
return False
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('enabled'):
self.available = False
self.logger.info('Module not enabled.')
return
self.client = SaneJS()
if not self.client.is_up:
self.available = False
return
self.available = True
self.allow_auto_trigger = False
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.logger.warning('Not up.')
return False
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir = get_homedir() / 'sanejs'
self.storage_dir.mkdir(parents=True, exist_ok=True)
return True
def hashes_lookup(self, sha512: Union[Iterable[str], str], force: bool=False) -> Dict[str, List[str]]:
if isinstance(sha512, str):

View File

@ -12,22 +12,22 @@ from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
class URLhaus():
def __init__(self, config: Dict[str, Any]):
if not config.get('enabled'):
self.available = False
return
class URLhaus(AbstractModule):
self.available = True
self.allow_auto_trigger = False
self.url = config.get('url')
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
def module_init(self) -> bool:
if not self.config.get('enabled'):
self.logger.info('Not enabled')
return False
self.url = self.config.get('url')
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.storage_dir_uh = get_homedir() / 'urlhaus'
self.storage_dir_uh.mkdir(parents=True, exist_ok=True)
return True
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = get_cache_directory(self.storage_dir_uh, url, 'url')

View File

@ -1,48 +1,41 @@
#!/usr/bin/env python3
import json
import logging
from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
import requests
from ..default import ConfigError, get_config, get_homedir
from ..default import ConfigError, get_homedir
from ..helpers import get_useragent_for_requests, get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
class UrlScan():
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('apikey'):
self.available = False
return
class UrlScan(AbstractModule):
def module_init(self) -> bool:
if not self.config.get('apikey'):
self.logger.info('No API key.')
return False
self.available = True
self.autosubmit = False
self.allow_auto_trigger = False
self.client = requests.session()
self.client.headers['User-Agent'] = get_useragent_for_requests()
self.client.headers['API-Key'] = config['apikey']
self.client.headers['API-Key'] = self.config['apikey']
self.client.headers['Content-Type'] = 'application/json'
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.autosubmit = bool(self.config.get('autosubmit', False))
if config.get('autosubmit'):
self.autosubmit = True
if config.get('force_visibility'):
if self.config.get('force_visibility'):
# Cases:
# 1. False: unlisted for hidden captures / public for others
# 2. "key": default visibility defined on urlscan.io
# 3. "public", "unlisted", "private": is set for all submissions
self.force_visibility = config['force_visibility']
self.force_visibility = self.config['force_visibility']
else:
self.force_visibility = False
@ -52,6 +45,7 @@ class UrlScan():
self.storage_dir_urlscan = get_homedir() / 'urlscan'
self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True)
return True
def get_url_submission(self, capture_info: 'CaptureCache') -> Dict[str, Any]:
url_storage_dir = get_cache_directory(

View File

@ -1,39 +1,33 @@
#!/usr/bin/env python3
import logging
import re
import socket
from typing import Any, Dict, overload, Literal, List, Union
from typing import overload, Literal, List, Union
from har2tree import CrawledTree, Har2TreeError, HostNode
from ..default import get_config
from .abstractmodule import AbstractModule
class UniversalWhois():
class UniversalWhois(AbstractModule):
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('enabled'):
self.available = False
self.logger.info('Module not enabled.')
return
self.server = config.get('ipaddress')
self.port = config.get('port')
self.allow_auto_trigger = False
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
def module_init(self) -> bool:
if not self.config.get('enabled'):
self.logger.info('Not enabled.')
return False
self.server = self.config.get('ipaddress')
self.port = self.config.get('port')
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.server, self.port))
except Exception as e:
self.available = False
self.logger.warning(f'Unable to connect to uwhois ({self.server}:{self.port}): {e}')
return
self.available = True
return False
return True
def query_whois_hostnode(self, hostnode: HostNode) -> None:
if hasattr(hostnode, 'resolved_ips'):

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import json
import logging
import time
from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
@ -9,35 +8,30 @@ from typing import Any, Dict, Optional, TYPE_CHECKING
import vt # type: ignore
from vt.error import APIError # type: ignore
from ..default import ConfigError, get_homedir, get_config
from ..default import ConfigError, get_homedir
from ..helpers import get_cache_directory
if TYPE_CHECKING:
from ..capturecache import CaptureCache
from .abstractmodule import AbstractModule
class VirusTotal():
def __init__(self, config: Dict[str, Any]):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
if not config.get('apikey'):
self.available = False
return
class VirusTotal(AbstractModule):
self.available = True
self.autosubmit = False
self.allow_auto_trigger = False
self.client = vt.Client(config['apikey'])
def module_init(self) -> bool:
if not self.config.get('apikey'):
self.logger.info('Not enabled')
return False
if config.get('allow_auto_trigger'):
self.allow_auto_trigger = True
self.client = vt.Client(self.config['apikey'])
if config.get('autosubmit'):
self.autosubmit = True
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
self.autosubmit = bool(self.config.get('autosubmit', False))
self.storage_dir_vt = get_homedir() / 'vt_url'
self.storage_dir_vt.mkdir(parents=True, exist_ok=True)
return True
def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]:
url_storage_dir = get_cache_directory(self.storage_dir_vt, vt.url_id(url))