chg: cleanup with annotations

pull/862/head
Raphaël Vinot 2024-01-13 01:24:32 +01:00
parent a26e80b093
commit bd6a0f2d22
32 changed files with 89 additions and 81 deletions

View File

@ -12,7 +12,6 @@ import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Set
from redis import Redis
import s3fs # type: ignore

View File

@ -9,12 +9,11 @@ import logging.config
import signal
from pathlib import Path
from typing import Optional, Set, Union
from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy # type: ignore[attr-defined]
from lookyloo.lookyloo import Lookyloo, CaptureSettings
from lookyloo import Lookyloo, CaptureSettings
from lookyloo.default import AbstractManager, get_config
from lookyloo.helpers import get_captures_dir

View File

@ -9,11 +9,10 @@ import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from lookyloo import Lookyloo
from lookyloo.default import AbstractManager, get_config
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.lookyloo import Lookyloo
from lookyloo.helpers import is_locked, get_sorted_captures_from_disk, make_dirs_list

View File

@ -8,11 +8,13 @@ import logging
import logging.config
from collections import Counter
from datetime import date, timedelta
from typing import Any, Dict, Optional
from typing import Any
from lookyloo.lookyloo import Lookyloo, CaptureStatusCore, CaptureStatusPy # type: ignore[attr-defined]
from lacuscore import CaptureStatus as CaptureStatusCore
from lookyloo import Lookyloo
from lookyloo.default import AbstractManager, get_config, get_homedir, safe_create_dir
from lookyloo.helpers import ParsedUserAgent, serialize_to_json
from pylacus import CaptureStatus as CaptureStatusPy # type: ignore[attr-defined]
logging.config.dictConfig(get_config('logging'))

View File

@ -7,7 +7,6 @@ import os
import time
from pathlib import Path
from subprocess import Popen
from typing import Optional, Dict
from redis import Redis
from redis.exceptions import ConnectionError

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3
from __future__ import annotations
import logging
import logging.config
from subprocess import Popen
from typing import Optional
from lookyloo.default import get_config, get_homedir, AbstractManager
@ -13,7 +14,7 @@ logging.config.dictConfig(get_config('logging'))
class Website(AbstractManager):
def __init__(self, loglevel: Optional[int]=None) -> None:
def __init__(self, loglevel: int | None=None) -> None:
super().__init__(loglevel)
self.script_name = 'website'
self.process: Popen = self._launch_website() # type: ignore[type-arg]

View File

@ -1,8 +1,12 @@
import logging
from .lookyloo import Lookyloo # noqa
from .context import Context # noqa
from .indexing import Indexing # noqa
from .lookyloo import Lookyloo, CaptureSettings # noqa
logging.getLogger(__name__).addHandler(logging.NullHandler())
__all__ = ['Lookyloo', 'Indexing']
__all__ = ['Lookyloo',
'Indexing',
'Context',
'CaptureSettings']

View File

@ -18,7 +18,7 @@ from datetime import datetime
from functools import lru_cache, _CacheInfo as CacheInfo
from logging import Logger, LoggerAdapter
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, Set, MutableMapping, Iterator
from typing import Any, MutableMapping, Iterator
import dns.rdatatype
import dns.resolver

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import fnmatch
import logging
from typing import Dict, Any, Union, List, Optional, TypedDict, Tuple
from typing import Any, TypedDict
from har2tree import URLNode # type: ignore[attr-defined]
@ -117,10 +117,9 @@ class Comparator():
raise MissingUUID(f'{capture_right} does not exists.')
different: bool = False
to_return: dict[str, dict[str, (str |
list[str | dict[str, Any]] |
dict[str, (int | str |
list[int | str | dict[str, Any]])])]] = {}
to_return: dict[str, dict[str,
(str | list[str | dict[str, Any]]
| dict[str, (int | str | list[int | str | dict[str, Any]])])]] = {}
to_return['lookyloo_urls'] = {'left': f'https://{self.public_domain}/tree/{capture_left}',
'right': f'https://{self.public_domain}/tree/{capture_right}'}
left = self.get_comparables_capture(capture_left)

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any
from urllib.parse import urlsplit
from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined]

View File

@ -7,7 +7,7 @@ import logging
import os
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any
from . import env_global_name
from .exceptions import ConfigError, CreateDirectoryException, MissingEnv

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3
from __future__ import annotations
import hashlib
import json
import logging
@ -10,7 +13,7 @@ from functools import lru_cache
from importlib.metadata import version
from io import BufferedIOBase
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union, Tuple
from typing import Any
from urllib.parse import urlparse
@ -29,12 +32,12 @@ logger = logging.getLogger('Lookyloo - Helpers')
# This method is used in json.dump or json.dumps calls as the default parameter:
# json.dumps(..., default=dump_to_json)
def serialize_to_json(obj: Union[Set[Any]]) -> Union[List[Any]]:
def serialize_to_json(obj: set[Any]) -> list[Any]:
if isinstance(obj, set):
return sorted(obj)
def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
def get_resources_hashes(har2tree_container: CrawledTree | HostNode | URLNode) -> set[str]:
if isinstance(har2tree_container, CrawledTree):
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
elif isinstance(har2tree_container, HostNode):
@ -43,7 +46,7 @@ def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNod
urlnodes = [har2tree_container]
else:
raise LookylooException(f'har2tree_container cannot be {type(har2tree_container)}')
all_ressources_hashes: Set[str] = set()
all_ressources_hashes: set[str] = set()
for urlnode in urlnodes:
if hasattr(urlnode, 'resources_hashes'):
all_ressources_hashes.update(urlnode.resources_hashes)
@ -75,7 +78,7 @@ def get_email_template() -> str:
return f.read()
def make_dirs_list(root_dir: Path) -> List[Path]:
def make_dirs_list(root_dir: Path) -> list[Path]:
directories = []
year_now = date.today().year
oldest_year = year_now - 10
@ -99,14 +102,14 @@ def make_ts_from_dirname(dirname: str) -> datetime:
def get_sorted_captures_from_disk(captures_dir: Path, /, *,
cut_time: Optional[Union[datetime, date]]=None,
keep_more_recent: bool=True) -> List[Tuple[datetime, Path]]:
cut_time: datetime | date | None=None,
keep_more_recent: bool=True) -> list[tuple[datetime, Path]]:
'''Recursively gets all the captures present in a specific directory, doesn't use the indexes.
NOTE: this method should never be used on archived captures as it's going to take forever on S3
'''
all_paths: List[Tuple[datetime, Path]] = []
all_paths: list[tuple[datetime, Path]] = []
for entry in captures_dir.iterdir():
if not entry.is_dir():
# index file
@ -173,14 +176,14 @@ class UserAgents:
self.most_recent_uas[platform_key][browser_key].insert(0, parsed_ua.string)
@property
def user_agents(self) -> Dict[str, Dict[str, List[str]]]:
def user_agents(self) -> dict[str, dict[str, list[str]]]:
ua_files_path = sorted(self.path.glob('**/*.json'), reverse=True)
if ua_files_path[0] != self.most_recent_ua_path:
self._load_newest_ua_file(ua_files_path[0])
return self.most_recent_uas
@property
def default(self) -> Dict[str, str]:
def default(self) -> dict[str, str]:
'''The default useragent for desktop chrome from playwright'''
parsed_ua = ParsedUserAgent(self.playwright_devices['desktop']['default']['Desktop Chrome']['user_agent'])
platform_key = parsed_ua.platform
@ -196,16 +199,16 @@ class UserAgents:
'useragent': parsed_ua.string}
def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, Any]]:
to_return: Dict[str, Dict[str, Any]] = {}
def load_known_content(directory: str='known_content') -> dict[str, dict[str, Any]]:
to_return: dict[str, dict[str, Any]] = {}
for known_content_file in (get_homedir() / directory).glob('*.json'):
with known_content_file.open() as f:
to_return[known_content_file.stem] = json.load(f)
return to_return
def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, List[Dict[str, Union[str, bool]]]]]=None) -> List[Dict[str, Union[str, bool]]]:
cookies: List[Dict[str, Union[str, bool]]]
def load_cookies(cookie_pseudofile: BufferedIOBase | str | bytes | list[dict[str, str | bool]] | None=None) -> list[dict[str, str | bool]]:
cookies: list[dict[str, str | bool]]
if cookie_pseudofile:
if isinstance(cookie_pseudofile, (str, bytes)):
try:
@ -229,10 +232,10 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, L
with (get_homedir() / 'cookies.json').open() as f:
cookies = json.load(f)
to_return: List[Dict[str, Union[str, bool]]] = []
to_return: list[dict[str, str | bool]] = []
try:
for cookie in cookies:
to_add: Dict[str, Union[str, bool]]
to_add: dict[str, str | bool]
if 'Host raw' in cookie and isinstance(cookie['Host raw'], str):
# Cookie export format for Cookie Quick Manager
u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0]
@ -253,7 +256,7 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, L
return to_return
def uniq_domains(uniq_urls: List[str]) -> Set[str]:
def uniq_domains(uniq_urls: list[str]) -> set[str]:
domains = set()
for url in uniq_urls:
splitted = urlparse(url)
@ -267,7 +270,7 @@ def get_useragent_for_requests() -> str:
return f'Lookyloo / {version("lookyloo")}'
def get_cache_directory(root: Path, identifier: str, namespace: Optional[Union[str, Path]] = None) -> Path:
def get_cache_directory(root: Path, identifier: str, namespace: str | Path | None = None) -> Path:
m = hashlib.md5()
m.update(identifier.encode())
digest = m.hexdigest()
@ -331,26 +334,26 @@ class ParsedUserAgent(UserAgent):
# from https://python.tutorialink.com/how-do-i-get-the-user-agent-with-flask/
@cached_property
def _details(self) -> Dict[str, Any]:
def _details(self) -> dict[str, Any]:
return user_agent_parser.Parse(self.string)
@property
def platform(self) -> Optional[str]: # type: ignore[override]
def platform(self) -> str | None: # type: ignore[override]
return self._details['os'].get('family')
@property
def platform_version(self) -> Optional[str]:
def platform_version(self) -> str | None:
return self._aggregate_version(self._details['os'])
@property
def browser(self) -> Optional[str]: # type: ignore[override]
def browser(self) -> str | None: # type: ignore[override]
return self._details['user_agent'].get('family')
@property
def version(self) -> Optional[str]: # type: ignore[override]
def version(self) -> str | None: # type: ignore[override]
return self._aggregate_version(self._details['user_agent'])
def _aggregate_version(self, details: Dict[str, str]) -> Optional[str]:
def _aggregate_version(self, details: dict[str, str]) -> str | None:
return '.'.join(
part
for key in ('major', 'minor', 'patch', 'patch_minor')

View File

@ -6,7 +6,7 @@ import hashlib
import logging
# import re
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Tuple
from typing import Iterable
from urllib.parse import urlsplit
from har2tree import CrawledTree # type: ignore[attr-defined]

View File

@ -18,12 +18,12 @@ from email.message import EmailMessage
from functools import cached_property
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, TYPE_CHECKING, overload, Literal
from typing import Any, Iterable, TYPE_CHECKING, overload, Literal
from urllib.parse import urlparse
from uuid import uuid4
from zipfile import ZipFile
from defang import defang # type: ignore
from defang import defang # type: ignore[import-untyped]
from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined]
from lacuscore import (LacusCore,
CaptureStatus as CaptureStatusCore,
@ -997,8 +997,8 @@ class Lookyloo():
for capture in captures[:limit]:
ct = self.get_crawled_tree(capture.uuid)
to_append: dict[str, str | dict[str, Any]] = {'capture_uuid': capture.uuid,
'start_timestamp': capture.timestamp.isoformat(),
'title': capture.title}
'start_timestamp': capture.timestamp.isoformat(),
'title': capture.title}
urlnodes: dict[str, dict[str, str]] = {}
for urlnode in ct.root_hartree.url_tree.search_nodes(name=url):
urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),

View File

@ -1,9 +1,11 @@
#!/usr/bin/env python3
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from typing import Any
from ..default import get_config
@ -13,11 +15,11 @@ logging.config.dictConfig(get_config('logging'))
class AbstractModule(ABC):
'''Just a simple abstract for the modules to catch issues with initialization'''
def __init__(self, /, *, config_name: Optional[str]=None,
config: Optional[Dict[str, Any]]=None):
def __init__(self, /, *, config_name: str | None=None,
config: dict[str, Any] | None=None):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.config: Dict[str, Any] = {}
self.config: dict[str, Any] = {}
self._available = False
if config_name:
try:

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import json
from datetime import date
from typing import Dict, List, Optional, TYPE_CHECKING
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from pypdns import PyPDNS, PDNSRecord # type: ignore[attr-defined]

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python3
from __future__ import annotations
import ipaddress
from typing import Dict, Set
import requests
@ -35,12 +36,12 @@ class Cloudflare(AbstractModule):
self.v6_list = [ipaddress.ip_network(net) for net in ipv6_list.split('\n')]
return True
def ips_lookup(self, ips: Set[str]) -> Dict[str, bool]:
def ips_lookup(self, ips: set[str]) -> dict[str, bool]:
'''Lookup a list of IPs. True means it is a known Cloudflare IP'''
if not self.available:
raise ConfigError('Hashlookup not available, probably not enabled.')
to_return: Dict[str, bool] = {}
to_return: dict[str, bool] = {}
for ip_s, ip_p in [(ip, ipaddress.ip_address(ip)) for ip in ips]:
if ip_p.version == 4:
to_return[ip_s] = any(ip_p in net for net in self.v4_list)

View File

@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Dict, Any
from typing import Any
import requests

View File

@ -3,7 +3,6 @@
from __future__ import annotations
import json
from typing import Dict, List
from har2tree import CrawledTree # type: ignore[attr-defined]
from pyhashlookup import Hashlookup # type: ignore[attr-defined]

View File

@ -7,7 +7,7 @@ import re
from io import BytesIO
from collections import defaultdict
from collections.abc import Mapping
from typing import Any, Dict, List, Optional, Set, Union, TYPE_CHECKING, Iterator
from typing import Any, TYPE_CHECKING, Iterator
import requests
from har2tree import HostNode, URLNode, Har2TreeError # type: ignore[attr-defined]
@ -209,7 +209,7 @@ class MISP(AbstractModule):
events = self._prepare_push(to_push, allow_duplicates, auto_publish)
if not events:
return {'error': 'All the events are already on the MISP instance.'}
if isinstance(events, Dict):
if isinstance(events, dict):
return {'error': events}
to_return = []
for event in events:

View File

@ -3,7 +3,7 @@
from __future__ import annotations
from io import BytesIO
from typing import Dict, Any
from typing import Any
from pypandora import PyPandora # type: ignore[attr-defined]

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import json
from datetime import date, datetime, timedelta, timezone
from typing import Any, Dict, Optional, List, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
from pyphishtanklookup import PhishtankLookup # type: ignore[attr-defined]

View File

@ -6,7 +6,7 @@ import json
import time
from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
from pyeupi import PyEUPI # type: ignore[attr-defined]

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import json
from datetime import date, datetime, timedelta
from typing import Any, Dict, Optional, Union, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
from urllib.parse import urlparse
from passivetotal import AccountClient, DnsRequest, WhoisRequest # type: ignore

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import json
from datetime import date
from typing import Dict, Iterable, List, Union
from typing import Iterable
from pysanejs import SaneJS # type: ignore[attr-defined]

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import json
from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
import requests

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import json
from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
import requests

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import re
import socket
from typing import overload, Literal, List, Union
from typing import overload, Literal
from har2tree import CrawledTree, Har2TreeError, HostNode # type: ignore[attr-defined]

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import json
import time
from datetime import date
from typing import Any, Dict, Optional, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
import vt # type: ignore
from vt.error import APIError # type: ignore

View File

@ -18,7 +18,7 @@ import filetype # type: ignore
from datetime import date, datetime, timedelta, timezone
from importlib.metadata import version
from io import BytesIO, StringIO
from typing import Any, Dict, List, Optional, Union, TypedDict, Set, Tuple
from typing import Any, TypedDict
from urllib.parse import quote_plus, unquote_plus, urlparse
from uuid import uuid4
from zipfile import ZipFile
@ -34,10 +34,10 @@ from pymisp import MISPEvent, MISPServerError # type: ignore[attr-defined]
from werkzeug.security import check_password_hash
from werkzeug.wrappers.response import Response as WerkzeugResponse
from lookyloo import Lookyloo, CaptureSettings
from lookyloo.default import get_config
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.helpers import get_taxonomies, UserAgents, load_cookies
from lookyloo.lookyloo import Lookyloo, CaptureSettings
if sys.version_info < (3, 9):
from pytz import all_timezones_set

View File

@ -7,7 +7,7 @@ import hashlib
import json
from io import BytesIO
from typing import Any, Dict, Optional, Tuple, List
from typing import Any
from zipfile import ZipFile
import flask_login # type: ignore
@ -17,9 +17,9 @@ from werkzeug.security import check_password_hash
from lacuscore import CaptureStatus as CaptureStatusCore
from pylacus import CaptureStatus as CaptureStatusPy # type: ignore[attr-defined]
from lookyloo import CaptureSettings, Lookyloo
from lookyloo.comparator import Comparator
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.lookyloo import CaptureSettings, Lookyloo
from .helpers import build_users_table, load_user_from_request, src_request_ip, get_lookyloo_instance

View File

@ -1,18 +1,19 @@
#!/usr/bin/env python3
from __future__ import annotations
import hashlib
import json
import os
from functools import lru_cache
from pathlib import Path
from typing import Dict, List, Union
import flask_login # type: ignore
from flask import Request
from werkzeug.security import generate_password_hash
from lookyloo import Lookyloo
from lookyloo.default import get_config, get_homedir
from lookyloo.lookyloo import Lookyloo
__global_lookyloo_instance = None
@ -50,7 +51,7 @@ def load_user_from_request(request: Request) -> User | None:
@lru_cache(64)
def build_keys_table() -> Dict[str, str]:
def build_keys_table() -> dict[str, str]:
keys_table = {}
for username, authstuff in build_users_table().items():
if 'authkey' in authstuff:
@ -59,7 +60,7 @@ def build_keys_table() -> Dict[str, str]:
@lru_cache(64)
def get_users() -> Dict[str, Union[str, List[str]]]:
def get_users() -> dict[str, str | list[str]]:
try:
# Use legacy user mgmt, no need to print a warning, and it will fail on new install.
return get_config('generic', 'cache_clean_user', quiet=True)
@ -68,8 +69,8 @@ def get_users() -> Dict[str, Union[str, List[str]]]:
@lru_cache(64)
def build_users_table() -> Dict[str, Dict[str, str]]:
users_table: Dict[str, Dict[str, str]] = {}
def build_users_table() -> dict[str, dict[str, str]]:
users_table: dict[str, dict[str, str]] = {}
for username, authstuff in get_users().items():
if isinstance(authstuff, str):
# just a password, make a key
@ -101,6 +102,6 @@ def get_secret_key() -> bytes:
@lru_cache(64)
def sri_load() -> Dict[str, Dict[str, str]]:
def sri_load() -> dict[str, dict[str, str]]:
with (get_homedir() / 'website' / 'web' / 'sri.txt').open() as f:
return json.load(f)