From 1da0d5adc148e994d47c3964f857b014db3b630c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 1 Feb 2024 14:40:12 +0100 Subject: [PATCH] chg: Add more strict typing, not done yet. --- mypy.ini | 2 +- pymisp/__init__.py | 1 - pymisp/abstract.py | 8 +- pymisp/api.py | 2 +- pymisp/mispevent.py | 18 +- pymisp/tools/_psl_faup.py | 119 +++++---- pymisp/tools/emailobject.py | 8 +- pymisp/tools/ext_lookups.py | 8 +- pymisp/tools/fail2banobject.py | 9 +- pymisp/tools/feed.py | 3 +- pymisp/tools/fileobject.py | 10 +- pymisp/tools/genericgenerator.py | 2 +- pymisp/tools/geolocationobject.py | 9 +- pymisp/tools/git_vuln_finder_object.py | 9 +- pymisp/tools/load_warninglists.py | 17 +- pymisp/tools/neo4j.py | 11 +- pymisp/tools/sbsignatureobject.py | 4 +- pymisp/tools/sshauthkeyobject.py | 15 +- pymisp/tools/stix.py | 35 --- pymisp/tools/update_objects.py | 2 +- pymisp/tools/urlobject.py | 8 +- pymisp/tools/vehicleobject.py | 16 +- pymisp/tools/vtreportobject.py | 12 +- tests/test_emailobject.py | 28 +- tests/test_fileobject.py | 2 +- tests/test_mispevent.py | 134 +++++----- tests/testlive_comprehensive.py | 343 ++++++++++++++----------- 27 files changed, 442 insertions(+), 393 deletions(-) delete mode 100644 pymisp/tools/stix.py diff --git a/mypy.ini b/mypy.ini index f00223e..b4fb74d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -3,7 +3,7 @@ strict = True warn_return_any = False show_error_context = True pretty = True -exclude = feed-generator|examples|pymisp/tools|pymisp/data|tests|docs +exclude = tests/testlive_comprehensive.py|tests/testlive_sync.py|feed-generator|examples|pymisp/data|docs|pymisp/tools/openioc.py|pymisp/tools/reportlab_generator.py|tests/test_reportlab.py # Stuff to remove gradually # disallow_untyped_defs = False diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 9599bdc..a235ce9 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -42,7 +42,6 @@ try: MISPGalaxyClusterElement, MISPGalaxyClusterRelation) from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa - from .tools import stix # noqa from .tools import openioc # noqa from .tools import ext_lookups # noqa from .tools import update_objects # noqa diff --git a/pymisp/abstract.py b/pymisp/abstract.py index c4de2e9..8ca7cae 100644 --- a/pymisp/abstract.py +++ b/pymisp/abstract.py @@ -8,7 +8,7 @@ from deprecated import deprecated # type: ignore from json import JSONEncoder from uuid import UUID from abc import ABCMeta -from enum import Enum +from enum import Enum, IntEnum from typing import Any, Mapping from collections.abc import MutableMapping from functools import lru_cache @@ -46,7 +46,7 @@ class MISPFileCache: return data -class Distribution(Enum): +class Distribution(IntEnum): your_organisation_only = 0 this_community_only = 1 connected_communities = 2 @@ -55,14 +55,14 @@ class Distribution(Enum): inherit = 5 -class ThreatLevel(Enum): +class ThreatLevel(IntEnum): high = 1 medium = 2 low = 3 undefined = 4 -class Analysis(Enum): +class Analysis(IntEnum): initial = 0 ongoing = 1 completed = 2 diff --git a/pymisp/api.py b/pymisp/api.py index 583239c..d19a0ba 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1012,7 +1012,7 @@ class PyMISP: to_return.append(s) return to_return - def add_sighting(self, sighting: MISPSighting, + def add_sighting(self, sighting: MISPSighting | dict[str, Any], attribute: MISPAttribute | int | str | UUID | None = None, pythonify: bool = False) -> dict[str, Any] | MISPSighting: """Add a new sighting (globally, or to a specific attribute): https://www.misp-project.org/openapi/#tag/Sightings/operation/addSighting and https://www.misp-project.org/openapi/#tag/Sightings/operation/getSightingsByEventId diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index ac1361c..a283201 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -13,7 +13,7 @@ from collections import defaultdict import logging import hashlib from pathlib import Path -from typing import IO, Any +from typing import IO, Any, Sequence import warnings try: @@ -1010,15 +1010,17 @@ class MISPObject(AbstractMISP): self.edited = True return attribute - def add_attributes(self, object_relation: str, *attributes: list[dict[str, Any] | MISPAttribute]) -> list[MISPAttribute | None]: + def add_attributes(self, object_relation: str, *attributes: Sequence[str | dict[str, Any] | MISPAttribute]) -> list[MISPAttribute | None]: '''Add multiple attributes with the same object_relation. Helper for object_relation when multiple is True in the template. It is the same as calling multiple times add_attribute with the same object_relation. ''' to_return = [] for attribute in attributes: - if isinstance(attribute, dict): - a = self.add_attribute(object_relation, **attribute) + if isinstance(attribute, MISPAttribute): + a = self.add_attribute(object_relation, **attribute.to_dict()) + elif isinstance(attribute, dict): + a = self.add_attribute(object_relation, **attribute) # type: ignore[misc] else: a = self.add_attribute(object_relation, value=attribute) to_return.append(a) @@ -1256,6 +1258,10 @@ class MISPGalaxyCluster(AbstractMISP): :type cluster_relations: list[MISPGalaxyClusterRelation], optional """ + id: int | str | None + tag_name: str + galaxy_id: str | None + def __init__(self) -> None: super().__init__() self.Galaxy: MISPGalaxy @@ -1405,6 +1411,8 @@ class MISPGalaxyCluster(AbstractMISP): class MISPGalaxy(AbstractMISP): """Galaxy class, used to view a galaxy and respective clusters""" + id: str | None + def __init__(self) -> None: super().__init__() self.GalaxyCluster: list[MISPGalaxyCluster] = [] @@ -2048,6 +2056,8 @@ class MISPObjectTemplate(AbstractMISP): class MISPUser(AbstractMISP): + authkey: str + def __init__(self, **kwargs: dict[str, Any]) -> None: super().__init__(**kwargs) self.email: str diff --git a/pymisp/tools/_psl_faup.py b/pymisp/tools/_psl_faup.py index 9a33bfd..388fed4 100644 --- a/pymisp/tools/_psl_faup.py +++ b/pymisp/tools/_psl_faup.py @@ -6,7 +6,7 @@ import ipaddress import socket import idna from publicsuffixlist import PublicSuffixList # type: ignore -from urllib.parse import urlparse, urlunparse +from urllib.parse import urlparse, urlunparse, ParseResult class UrlNotDecoded(Exception): @@ -18,20 +18,20 @@ class PSLFaup: Fake Faup Python Library using PSL for Windows support """ - def __init__(self): + def __init__(self) -> None: self.decoded = False self.psl = PublicSuffixList() - self._url = None - self._retval = {} - self.ip_as_host = False + self._url: ParseResult | None = None + self._retval: dict[str, str | int | None | bytes] = {} + self.ip_as_host = '' - def _clear(self): + def _clear(self) -> None: self.decoded = False self._url = None self._retval = {} - self.ip_as_host = False + self.ip_as_host = '' - def decode(self, url) -> None: + def decode(self, url: str) -> None: """ This function creates a dict of all the url fields. :param url: The URL to normalize @@ -43,10 +43,15 @@ class PSLFaup: url = '//' + url self._url = urlparse(url) - self.ip_as_host = False + if self._url is None: + raise UrlNotDecoded("Unable to parse URL") + + self.ip_as_host = '' + if self._url.hostname is None: + raise UrlNotDecoded("Unable to parse URL") hostname = _ensure_str(self._url.hostname) try: - ipv4_bytes = socket.inet_aton(_ensure_str(hostname)) + ipv4_bytes = socket.inet_aton(hostname) ipv4 = ipaddress.IPv4Address(ipv4_bytes) self.ip_as_host = ipv4.compressed except (OSError, ValueError): @@ -61,61 +66,70 @@ class PSLFaup: self._retval = {} @property - def url(self): - if not self.decoded: + def url(self) -> bytes | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") - netloc = self.get_host() + ('' if self.get_port() is None else f':{self.get_port()}') - return _ensure_bytes( - urlunparse( - (self.get_scheme(), netloc, self.get_resource_path(), - '', self.get_query_string(), self.get_fragment(),) + if host := self.get_host(): + netloc = host + ('' if self.get_port() is None else f':{self.get_port()}') + return _ensure_bytes( + urlunparse( + (self.get_scheme(), netloc, self.get_resource_path(), + '', self.get_query_string(), self.get_fragment(),) + ) ) - ) + return None - def get_scheme(self): + def get_scheme(self) -> str: """ Get the scheme of the url given in the decode function :returns: The URL scheme """ - if not self.decoded: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") - return _ensure_str(self._url.scheme) + return _ensure_str(self._url.scheme if self._url.scheme else '') - def get_credential(self): - if not self.decoded: + def get_credential(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") - if self._url.password: + if self._url.username and self._url.password: return _ensure_str(self._url.username) + ':' + _ensure_str(self._url.password) if self._url.username: return _ensure_str(self._url.username) + return None - def get_subdomain(self): - if not self.decoded: + def get_subdomain(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") if self.get_host() is not None and not self.ip_as_host: - if self.get_domain() in self.get_host(): - return self.get_host().rsplit(self.get_domain(), 1)[0].rstrip('.') or None + domain = self.get_domain() + host = self.get_host() + if domain and host and domain in host: + return host.rsplit(domain, 1)[0].rstrip('.') or None + return None - def get_domain(self): - if not self.decoded: + def get_domain(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") if self.get_host() is not None and not self.ip_as_host: return self.psl.privatesuffix(self.get_host()) + return None - def get_domain_without_tld(self): - if not self.decoded: + def get_domain_without_tld(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") if self.get_tld() is not None and not self.ip_as_host: - return self.get_domain().rsplit(self.get_tld(), 1)[0].rstrip('.') + if domain := self.get_domain(): + return domain.rsplit(self.get_tld(), 1)[0].rstrip('.') + return None - def get_host(self): - if not self.decoded: + def get_host(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") if self._url.hostname is None: @@ -125,45 +139,48 @@ class PSLFaup: else: return _ensure_str(idna.encode(self._url.hostname, uts46=True)) - def get_unicode_host(self): - if not self.decoded: + def get_unicode_host(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") if not self.ip_as_host: - return idna.decode(self.get_host(), uts46=True) + if host := self.get_host(): + return idna.decode(host, uts46=True) + return None - def get_tld(self): - if not self.decoded: + def get_tld(self) -> str | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") if self.get_host() is not None and not self.ip_as_host: return self.psl.publicsuffix(self.get_host()) + return None - def get_port(self): - if not self.decoded: + def get_port(self) -> int | None: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") return self._url.port - def get_resource_path(self): - if not self.decoded: + def get_resource_path(self) -> str: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") return _ensure_str(self._url.path) - def get_query_string(self): - if not self.decoded: + def get_query_string(self) -> str: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") return _ensure_str(self._url.query) - def get_fragment(self): - if not self.decoded: + def get_fragment(self) -> str: + if not self.decoded or not self._url: raise UrlNotDecoded("You must call faup.decode() first") return _ensure_str(self._url.fragment) - def get(self): + def get(self) -> dict[str, str | int | None | bytes]: self._retval["scheme"] = self.get_scheme() self._retval["tld"] = self.get_tld() self._retval["domain"] = self.get_domain() @@ -178,14 +195,14 @@ class PSLFaup: return self._retval -def _ensure_bytes(binary) -> bytes: +def _ensure_bytes(binary: str | bytes) -> bytes: if isinstance(binary, bytes): return binary else: return binary.encode('utf-8') -def _ensure_str(string) -> str: +def _ensure_str(string: str | bytes) -> str: if isinstance(string, str): return string else: diff --git a/pymisp/tools/emailobject.py b/pymisp/tools/emailobject.py index 25a85a3..731a3bc 100644 --- a/pymisp/tools/emailobject.py +++ b/pymisp/tools/emailobject.py @@ -31,7 +31,7 @@ class MISPMsgConverstionError(MISPObjectException): class EMailObject(AbstractMISPObjectGenerator): - def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None, # type: ignore[no-untyped-def] + def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | bytes | None=None, # type: ignore[no-untyped-def] attach_original_email: bool = True, **kwargs) -> None: super().__init__('email', **kwargs) @@ -79,11 +79,11 @@ class EMailObject(AbstractMISPObjectGenerator): return eml except UnicodeDecodeError: pass - raise PyMISPNotImplementedYet("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.") + raise InvalidMISPObject("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.") @staticmethod def create_pseudofile(filepath: Path | str | None = None, - pseudofile: BytesIO | None = None) -> BytesIO: + pseudofile: BytesIO | bytes | None = None) -> BytesIO: """Creates a pseudofile using directly passed data or data loaded from file path. """ if filepath: @@ -91,6 +91,8 @@ class EMailObject(AbstractMISPObjectGenerator): return BytesIO(f.read()) elif pseudofile and isinstance(pseudofile, BytesIO): return pseudofile + elif pseudofile and isinstance(pseudofile, bytes): + return BytesIO(pseudofile) else: raise InvalidMISPObject('File buffer (BytesIO) or a path is required.') diff --git a/pymisp/tools/ext_lookups.py b/pymisp/tools/ext_lookups.py index e1bf7c6..c4e81ac 100644 --- a/pymisp/tools/ext_lookups.py +++ b/pymisp/tools/ext_lookups.py @@ -15,7 +15,7 @@ except ImportError: has_pymispgalaxies = False -def revert_tag_from_galaxies(tag): +def revert_tag_from_galaxies(tag: str) -> list[str]: clusters = Clusters() try: return clusters.revert_machinetag(tag) @@ -23,7 +23,7 @@ def revert_tag_from_galaxies(tag): return [] -def revert_tag_from_taxonomies(tag): +def revert_tag_from_taxonomies(tag: str) -> list[str]: taxonomies = Taxonomies() try: return taxonomies.revert_machinetag(tag) @@ -31,7 +31,7 @@ def revert_tag_from_taxonomies(tag): return [] -def search_taxonomies(query): +def search_taxonomies(query: str) -> list[str]: taxonomies = Taxonomies() found = taxonomies.search(query) if not found: @@ -39,6 +39,6 @@ def search_taxonomies(query): return found -def search_galaxies(query): +def search_galaxies(query: str) -> list[str]: clusters = Clusters() return clusters.search(query) diff --git a/pymisp/tools/fail2banobject.py b/pymisp/tools/fail2banobject.py index b714e27..a8e3fda 100644 --- a/pymisp/tools/fail2banobject.py +++ b/pymisp/tools/fail2banobject.py @@ -2,20 +2,23 @@ from __future__ import annotations -from .abstractgenerator import AbstractMISPObjectGenerator import logging +from typing import Any + +from .abstractgenerator import AbstractMISPObjectGenerator + logger = logging.getLogger('pymisp') class Fail2BanObject(AbstractMISPObjectGenerator): - def __init__(self, parameters: dict, strict: bool = True, **kwargs): + def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs): # type: ignore[no-untyped-def] super().__init__('fail2ban', strict=strict, **kwargs) self._parameters = parameters self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: timestamp = self._sanitize_timestamp(self._parameters.pop('processing-timestamp', None)) self._parameters['processing-timestamp'] = timestamp super().generate_attributes() diff --git a/pymisp/tools/feed.py b/pymisp/tools/feed.py index 9f7c084..0452a25 100644 --- a/pymisp/tools/feed.py +++ b/pymisp/tools/feed.py @@ -5,10 +5,9 @@ from __future__ import annotations from pathlib import Path from pymisp import MISPEvent import json -from typing import List -def feed_meta_generator(path: Path): +def feed_meta_generator(path: Path) -> None: manifests = {} hashes: list[str] = [] diff --git a/pymisp/tools/fileobject.py b/pymisp/tools/fileobject.py index 9a8c8be..f8b277e 100644 --- a/pymisp/tools/fileobject.py +++ b/pymisp/tools/fileobject.py @@ -30,7 +30,9 @@ except ImportError: class FileObject(AbstractMISPObjectGenerator): - def __init__(self, filepath: Path | str | None = None, pseudofile: BytesIO | bytes | None = None, filename: str | None = None, **kwargs) -> None: + def __init__(self, filepath: Path | str | None = None, # type: ignore[no-untyped-def] + pseudofile: BytesIO | bytes | None = None, + filename: str | None = None, **kwargs) -> None: super().__init__('file', **kwargs) if not HAS_PYDEEP: logger.warning("pydeep is missing, please install pymisp this way: pip install pymisp[fileobjects]") @@ -55,10 +57,10 @@ class FileObject(AbstractMISPObjectGenerator): self.__data = self.__pseudofile.getvalue() self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: self.add_attribute('filename', value=self.__filename) - size = self.add_attribute('size-in-bytes', value=len(self.__data)) - if int(size.value) > 0: + self.add_attribute('size-in-bytes', value=len(self.__data)) + if len(self.__data) > 0: self.add_attribute('entropy', value=self.__entropy_H(self.__data)) self.add_attribute('md5', value=md5(self.__data).hexdigest()) self.add_attribute('sha1', value=sha1(self.__data).hexdigest()) diff --git a/pymisp/tools/genericgenerator.py b/pymisp/tools/genericgenerator.py index 7279ca3..811f604 100644 --- a/pymisp/tools/genericgenerator.py +++ b/pymisp/tools/genericgenerator.py @@ -10,7 +10,7 @@ from .abstractgenerator import AbstractMISPObjectGenerator class GenericObjectGenerator(AbstractMISPObjectGenerator): # FIXME: this method is different from the master one, and that's probably not a good idea. - def generate_attributes(self, attributes: list[dict[str, Any]]) -> None: + def generate_attributes(self, attributes: list[dict[str, Any]]) -> None: # type: ignore[override] """Generates MISPObjectAttributes from a list of dictionaries. Each entry if the list must be in one of the two following formats: * {: } diff --git a/pymisp/tools/geolocationobject.py b/pymisp/tools/geolocationobject.py index 80a2aa1..fc995c9 100644 --- a/pymisp/tools/geolocationobject.py +++ b/pymisp/tools/geolocationobject.py @@ -2,20 +2,23 @@ from __future__ import annotations -from .abstractgenerator import AbstractMISPObjectGenerator import logging +from typing import Any + +from .abstractgenerator import AbstractMISPObjectGenerator + logger = logging.getLogger('pymisp') class GeolocationObject(AbstractMISPObjectGenerator): - def __init__(self, parameters: dict, strict: bool = True, **kwargs): + def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('geolocation', strict=strict, **kwargs) self._parameters = parameters self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: first = self._sanitize_timestamp(self._parameters.pop('first-seen', None)) self._parameters['first-seen'] = first last = self._sanitize_timestamp(self._parameters.pop('last-seen', None)) diff --git a/pymisp/tools/git_vuln_finder_object.py b/pymisp/tools/git_vuln_finder_object.py index 50e3b72..21ec512 100644 --- a/pymisp/tools/git_vuln_finder_object.py +++ b/pymisp/tools/git_vuln_finder_object.py @@ -2,20 +2,23 @@ from __future__ import annotations -from .abstractgenerator import AbstractMISPObjectGenerator import logging +from typing import Any + +from .abstractgenerator import AbstractMISPObjectGenerator + logger = logging.getLogger('pymisp') class GitVulnFinderObject(AbstractMISPObjectGenerator): - def __init__(self, parameters: dict, strict: bool = True, **kwargs): + def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('git-vuln-finder', strict=strict, **kwargs) self._parameters = parameters self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: authored_date = self._sanitize_timestamp(self._parameters.pop('authored_date', None)) self._parameters['authored_date'] = authored_date committed_date = self._sanitize_timestamp(self._parameters.pop('committed_date', None)) diff --git a/pymisp/tools/load_warninglists.py b/pymisp/tools/load_warninglists.py index 8224a6c..feb5ea3 100644 --- a/pymisp/tools/load_warninglists.py +++ b/pymisp/tools/load_warninglists.py @@ -2,26 +2,29 @@ from __future__ import annotations +from ..api import PyMISP + try: - from pymispwarninglists import WarningLists # type: ignore + from pymispwarninglists import WarningLists, WarningList # type: ignore has_pymispwarninglists = True except ImportError: has_pymispwarninglists = False -def from_instance(pymisp_instance, slow_search=False): +def from_instance(pymisp_instance: PyMISP, slow_search: bool=False) -> WarningLists: """Load the warnindlist from an existing MISP instance :pymisp_instance: Already instantialized PyMISP instance.""" - warninglists_index = pymisp_instance.get_warninglists()['Warninglists'] + warninglists_index = pymisp_instance.warninglists(pythonify=True) all_warningslists = [] for warninglist in warninglists_index: - wl = pymisp_instance.get_warninglist(warninglist['Warninglist']['id'])['Warninglist'] - wl['list'] = wl.pop('WarninglistEntry') - all_warningslists.append(wl) + if isinstance(warninglist, WarningList): + wl = pymisp_instance.get_warninglist(warninglist['Warninglist']['id'])['Warninglist'] + wl['list'] = wl.pop('WarninglistEntry') + all_warningslists.append(wl) return WarningLists(slow_search, all_warningslists) -def from_package(slow_search=False): +def from_package(slow_search: bool=False) -> WarningLists: return WarningLists(slow_search) diff --git a/pymisp/tools/neo4j.py b/pymisp/tools/neo4j.py index ce479c1..7a71978 100644 --- a/pymisp/tools/neo4j.py +++ b/pymisp/tools/neo4j.py @@ -2,6 +2,7 @@ from __future__ import annotations import glob import os + from .. import MISPEvent try: @@ -13,23 +14,23 @@ except ImportError: class Neo4j(): - def __init__(self, host='localhost:7474', username='neo4j', password='neo4j'): + def __init__(self, host: str='localhost:7474', username: str='neo4j', password: str='neo4j') -> None: if not has_py2neo: raise Exception('py2neo is required, please install: pip install py2neo') authenticate(host, username, password) self.graph = Graph(f"http://{host}/db/data/") - def load_events_directory(self, directory): - self.events = [] + def load_events_directory(self, directory: str) -> None: + self.events: list[MISPEvent] = [] for path in glob.glob(os.path.join(directory, '*.json')): e = MISPEvent() e.load(path) self.import_event(e) - def del_all(self): + def del_all(self) -> None: self.graph.delete_all() - def import_event(self, event): + def import_event(self, event: MISPEvent) -> None: tx = self.graph.begin() event_node = Node('Event', uuid=event.uuid, name=event.info) # event_node['distribution'] = event.distribution diff --git a/pymisp/tools/sbsignatureobject.py b/pymisp/tools/sbsignatureobject.py index 35d8147..a7356a3 100644 --- a/pymisp/tools/sbsignatureobject.py +++ b/pymisp/tools/sbsignatureobject.py @@ -9,13 +9,13 @@ class SBSignatureObject(AbstractMISPObjectGenerator): ''' Sandbox Analyzer ''' - def __init__(self, software: str, report: list, **kwargs): + def __init__(self, software: str, report: list[tuple[str, str]], **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('sb-signature', **kwargs) self._software = software self._report = report self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: ''' Parse the report for relevant attributes ''' self.add_attribute("software", value=self._software) for (signature_name, description) in self._report: diff --git a/pymisp/tools/sshauthkeyobject.py b/pymisp/tools/sshauthkeyobject.py index d66cb1f..ce6c2fb 100644 --- a/pymisp/tools/sshauthkeyobject.py +++ b/pymisp/tools/sshauthkeyobject.py @@ -2,20 +2,21 @@ from __future__ import annotations +import logging + +from io import StringIO +from pathlib import Path + from ..exceptions import InvalidMISPObject from .abstractgenerator import AbstractMISPObjectGenerator -from io import StringIO -import logging -from typing import Optional, Union -from pathlib import Path logger = logging.getLogger('pymisp') class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator): - def __init__(self, authorized_keys_path: Path | str | None = None, authorized_keys_pseudofile: StringIO | None = None, **kwargs): - # PY3 way: + def __init__(self, authorized_keys_path: Path | str | None = None, # type: ignore[no-untyped-def] + authorized_keys_pseudofile: StringIO | None = None, **kwargs): super().__init__('ssh-authorized-keys', **kwargs) if authorized_keys_path: with open(authorized_keys_path) as f: @@ -27,7 +28,7 @@ class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator): self.__data = self.__pseudofile.getvalue() self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: for line in self.__pseudofile: if line.startswith('ssh') or line.startswith('ecdsa'): key = line.split(' ')[1] diff --git a/pymisp/tools/stix.py b/pymisp/tools/stix.py deleted file mode 100644 index 8f82459..0000000 --- a/pymisp/tools/stix.py +++ /dev/null @@ -1,35 +0,0 @@ -from __future__ import annotations - -try: - from misp_stix_converter.converters.buildMISPAttribute import buildEvent # type: ignore - from misp_stix_converter.converters import convert # type: ignore - from misp_stix_converter.converters.convert import MISPtoSTIX # type: ignore - has_misp_stix_converter = True -except ImportError: - has_misp_stix_converter = False - - -def load_stix(stix, distribution: int = 3, threat_level_id: int = 2, analysis: int = 0): - '''Returns a MISPEvent object from a STIX package''' - if not has_misp_stix_converter: - raise Exception('You need to install misp_stix_converter: pip install git+https://github.com/MISP/MISP-STIX-Converter.git') - stix = convert.load_stix(stix) - return buildEvent(stix, distribution=distribution, - threat_level_id=threat_level_id, analysis=analysis) - - -def make_stix_package(misp_event, to_json: bool = False, to_xml: bool = False): - '''Returns a STIXPackage from a MISPEvent. - - Optionally can return the package in json or xml. - - ''' - if not has_misp_stix_converter: - raise Exception('You need to install misp_stix_converter: pip install git+https://github.com/MISP/MISP-STIX-Converter.git') - package = MISPtoSTIX(misp_event) - if to_json: - return package.to_json() - elif to_xml: - return package.to_xml() - else: - return package diff --git a/pymisp/tools/update_objects.py b/pymisp/tools/update_objects.py index abe1835..7f1dd25 100644 --- a/pymisp/tools/update_objects.py +++ b/pymisp/tools/update_objects.py @@ -13,7 +13,7 @@ from ..abstract import resources_path static_repo = "https://github.com/MISP/misp-objects/archive/main.zip" -def update_objects(): +def update_objects() -> None: r = requests.get(static_repo) zipped_repo = BytesIO(r.content) diff --git a/pymisp/tools/urlobject.py b/pymisp/tools/urlobject.py index 956b0c9..3465541 100644 --- a/pymisp/tools/urlobject.py +++ b/pymisp/tools/urlobject.py @@ -2,10 +2,12 @@ from __future__ import annotations -from .abstractgenerator import AbstractMISPObjectGenerator import logging + from urllib.parse import unquote_plus +from .abstractgenerator import AbstractMISPObjectGenerator + try: from pyfaup.faup import Faup # type: ignore except (OSError, ImportError): @@ -18,13 +20,13 @@ faup = Faup() class URLObject(AbstractMISPObjectGenerator): - def __init__(self, url: str, generate_all=False, **kwargs): + def __init__(self, url: str, generate_all=False, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('url', **kwargs) self._generate_all = True if generate_all is True else False faup.decode(unquote_plus(url)) self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: self.add_attribute('url', value=faup.url.decode()) if faup.get_host(): self.add_attribute('host', value=faup.get_host()) diff --git a/pymisp/tools/vehicleobject.py b/pymisp/tools/vehicleobject.py index da72f78..75c9a4a 100644 --- a/pymisp/tools/vehicleobject.py +++ b/pymisp/tools/vehicleobject.py @@ -5,6 +5,8 @@ from __future__ import annotations import requests import json +from typing import Any + from .abstractgenerator import AbstractMISPObjectGenerator # Original sourcecode: https://github.com/hayk57/MISP_registration_check @@ -13,14 +15,16 @@ from .abstractgenerator import AbstractMISPObjectGenerator class VehicleObject(AbstractMISPObjectGenerator): '''Vehicle object generator out of regcheck.org.uk''' - country_urls = { + country_urls: dict[str, str] = { 'fr': "http://www.regcheck.org.uk/api/reg.asmx/CheckFrance", 'es': "http://www.regcheck.org.uk/api/reg.asmx/CheckSpain", 'uk': "http://www.regcheck.org.uk/api/reg.asmx/Check" } - def __init__(self, country: str, registration: str, username: str, **kwargs): + def __init__(self, country: str, registration: str, username: str, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('vehicle', **kwargs) + if country not in self.country_urls: + raise ValueError(f"Country {country} not supportet, must be one of {self.country_urls.keys()}") self._country = country self._registration = registration self._username = username @@ -28,10 +32,10 @@ class VehicleObject(AbstractMISPObjectGenerator): self.generate_attributes() @property - def report(self): + def report(self) -> dict[str, Any]: return self._report - def generate_attributes(self): + def generate_attributes(self) -> None: carDescription = self._report["Description"] carMake = self._report["CarMake"]["CurrentTextValue"] carModel = self._report["CarModel"]["CurrentTextValue"] @@ -67,14 +71,14 @@ class VehicleObject(AbstractMISPObjectGenerator): self.add_attribute('date-first-registration', type='text', value=firstRegistration) self.add_attribute('image-url', type='text', value=ImageUrl) - def _query(self): + def _query(self) -> dict[str, Any]: payload = f"RegistrationNumber={self._registration}&username={self._username}" headers = { 'Content-Type': "application/x-www-form-urlencoded", 'cache-control': "no-cache", } - response = requests.request("POST", self.country_urls.get(self._country), data=payload, headers=headers) + response = requests.request("POST", self.country_urls[self._country], data=payload, headers=headers) # FIXME: Clean that up. for item in response.text.split(""): if "" in item: diff --git a/pymisp/tools/vtreportobject.py b/pymisp/tools/vtreportobject.py index 1ad7654..47f917d 100644 --- a/pymisp/tools/vtreportobject.py +++ b/pymisp/tools/vtreportobject.py @@ -3,7 +3,7 @@ from __future__ import annotations import re -from typing import Optional +from typing import Any import requests try: @@ -25,7 +25,7 @@ class VTReportObject(AbstractMISPObjectGenerator): :indicator: IOC to search VirusTotal for ''' - def __init__(self, apikey: str, indicator: str, vt_proxies: dict | None = None, **kwargs): + def __init__(self, apikey: str, indicator: str, vt_proxies: dict[str, str] | None = None, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('virustotal-report', **kwargs) indicator = indicator.strip() self._resource_type = self.__validate_resource(indicator) @@ -37,17 +37,17 @@ class VTReportObject(AbstractMISPObjectGenerator): error_msg = f"A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{indicator}' instead" raise InvalidMISPObject(error_msg) - def get_report(self): + def get_report(self) -> dict[str, Any]: return self._report - def generate_attributes(self): + def generate_attributes(self) -> None: ''' Parse the VirusTotal report for relevant attributes ''' self.add_attribute("last-submission", value=self._report["scan_date"]) self.add_attribute("permalink", value=self._report["permalink"]) ratio = "{}/{}".format(self._report["positives"], self._report["total"]) self.add_attribute("detection-ratio", value=ratio) - def __validate_resource(self, ioc: str): + def __validate_resource(self, ioc: str) -> str | bool: ''' Validate the data type of an indicator. Domains and IP addresses aren't supported because @@ -63,7 +63,7 @@ class VTReportObject(AbstractMISPObjectGenerator): return "file" return False - def __query_virustotal(self, apikey: str, resource: str): + def __query_virustotal(self, apikey: str, resource: str) -> dict[str, Any]: ''' Query VirusTotal for information about an indicator diff --git a/tests/test_emailobject.py b/tests/test_emailobject.py index bd3ae93..01cb9d0 100644 --- a/tests/test_emailobject.py +++ b/tests/test_emailobject.py @@ -7,22 +7,26 @@ from email.message import EmailMessage from io import BytesIO from os import urandom from pathlib import Path -from typing import List +from typing import TypeVar, Type from zipfile import ZipFile from pymisp.tools import EMailObject from pymisp.exceptions import PyMISPNotImplementedYet, InvalidMISPObject +T = TypeVar('T', bound='TestEmailObject') + class TestEmailObject(unittest.TestCase): + eml_1: BytesIO + @classmethod - def setUpClass(cls): + def setUpClass(cls: type[T]) -> None: with ZipFile(Path("tests/email_testfiles/mail_1.eml.zip"), 'r') as myzip: with myzip.open('mail_1.eml', pwd=b'AVs are dumb') as myfile: cls.eml_1 = BytesIO(myfile.read()) - def test_mail_1(self): + def test_mail_1(self) -> None: email_object = EMailObject(pseudofile=self.eml_1) self.assertEqual(self._get_values(email_object, "subject")[0], "письмо уведом-е") self.assertEqual(self._get_values(email_object, "to")[0], "kinney@noth.com") @@ -39,7 +43,7 @@ class TestEmailObject(unittest.TestCase): self.assertIsInstance(file_name, str) self.assertIsInstance(file_content, BytesIO) - def test_mail_1_headers_only(self): + def test_mail_1_headers_only(self) -> None: email_object = EMailObject(Path("tests/email_testfiles/mail_1_headers_only.eml")) self.assertEqual(self._get_values(email_object, "subject")[0], "письмо уведом-е") self.assertEqual(self._get_values(email_object, "to")[0], "kinney@noth.com") @@ -50,7 +54,7 @@ class TestEmailObject(unittest.TestCase): self.assertIsInstance(email_object.email, EmailMessage) self.assertEqual(len(email_object.attachments), 0) - def test_mail_multiple_to(self): + def test_mail_multiple_to(self) -> None: email_object = EMailObject(Path("tests/email_testfiles/mail_multiple_to.eml")) to = self._get_values(email_object, "to") @@ -60,7 +64,7 @@ class TestEmailObject(unittest.TestCase): self.assertEqual(to[1], "jan.marek@example.com") self.assertEqual(to_display_name[1], "Marek, Jan") - def test_msg(self): + def test_msg(self) -> None: # Test result of eml converted to msg is the same eml_email_object = EMailObject(pseudofile=self.eml_1) email_object = EMailObject(Path("tests/email_testfiles/mail_1.msg")) @@ -83,7 +87,7 @@ class TestEmailObject(unittest.TestCase): self.assertEqual(self._get_values(email_object, "received-header-ip"), self._get_values(eml_email_object, "received-header-ip")) - def test_bom_encoded(self): + def test_bom_encoded(self) -> None: """Test utf-8-sig encoded email""" bom_email_object = EMailObject(Path("tests/email_testfiles/mail_1_bom.eml")) eml_email_object = EMailObject(pseudofile=self.eml_1) @@ -106,7 +110,7 @@ class TestEmailObject(unittest.TestCase): self.assertEqual(self._get_values(bom_email_object, "received-header-ip"), self._get_values(eml_email_object, "received-header-ip")) - def test_handling_of_various_email_types(self): + def test_handling_of_various_email_types(self) -> None: self._does_not_fail(Path("tests/email_testfiles/mail_2.eml"), "ensuring all headers work") self._does_not_fail(Path('tests/email_testfiles/mail_3.eml'), @@ -118,7 +122,7 @@ class TestEmailObject(unittest.TestCase): self._does_not_fail(Path('tests/email_testfiles/mail_5.msg'), "Check encapsulated HTML works") - def _does_not_fail(self, path, test_type="test"): + def _does_not_fail(self, path: Path, test_type: str="test") -> None: found_error = None try: EMailObject(path) @@ -130,7 +134,7 @@ class TestEmailObject(unittest.TestCase): path, test_type)) - def test_random_binary_blob(self): + def test_random_binary_blob(self) -> None: """Email parser fails correctly on random binary blob.""" random_data = urandom(1024) random_blob = BytesIO(random_data) @@ -145,8 +149,8 @@ class TestEmailObject(unittest.TestCase): broken_obj = EMailObject(pseudofile=random_blob) except Exception as _e: found_error = _e - if not isinstance(found_error, PyMISPNotImplementedYet): - self.fail("Expected PyMISPNotImplementedYet when EmailObject receives completely unknown binary input data in a pseudofile. But, did not get that exception.") + if not isinstance(found_error, InvalidMISPObject): + self.fail("Expected InvalidMISPObject when EmailObject receives completely unknown binary input data in a pseudofile. But, did not get that exception.") @staticmethod def _get_values(obj: EMailObject, relation: str) -> list[str]: diff --git a/tests/test_fileobject.py b/tests/test_fileobject.py index 6d4b5aa..1299b3a 100644 --- a/tests/test_fileobject.py +++ b/tests/test_fileobject.py @@ -9,7 +9,7 @@ import pathlib class TestFileObject(unittest.TestCase): - def test_mimeType(self): + def test_mimeType(self) -> None: file_object = FileObject(filepath=pathlib.Path(__file__)) attributes = json.loads(file_object.to_json())['Attribute'] mime = next(attr for attr in attributes if attr['object_relation'] == 'mimetype') diff --git a/tests/test_mispevent.py b/tests/test_mispevent.py index d0a3bbd..8c9564c 100644 --- a/tests/test_mispevent.py +++ b/tests/test_mispevent.py @@ -17,24 +17,24 @@ from pymisp.tools import GitVulnFinderObject class TestMISPEvent(unittest.TestCase): - def setUp(self): + def setUp(self) -> None: self.maxDiff = None self.mispevent = MISPEvent() - def init_event(self): + def init_event(self) -> None: self.mispevent.info = 'This is a test' self.mispevent.distribution = 1 self.mispevent.threat_level_id = 1 self.mispevent.analysis = 1 self.mispevent.set_date("2017-12-31") # test the set date method - def test_simple(self): + def test_simple(self) -> None: with open('tests/mispevent_testfiles/simple.json') as f: ref_json = json.load(f) del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_event(self): + def test_event(self) -> None: self.init_event() self.mispevent.publish() with open('tests/mispevent_testfiles/event.json') as f: @@ -42,22 +42,22 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_loadfile(self): + def test_loadfile(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/event.json') with open('tests/mispevent_testfiles/event.json') as f: ref_json = json.load(f) del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_loadfile_validate(self): + def test_loadfile_validate(self) -> None: misp_event = MISPEvent() misp_event.load_file('tests/mispevent_testfiles/event.json', validate=True) - def test_loadfile_validate_strict(self): + def test_loadfile_validate_strict(self) -> None: misp_event = MISPEvent(strict_validation=True) misp_event.load_file('tests/mispevent_testfiles/event.json', validate=True) - def test_event_tag(self): + def test_event_tag(self) -> None: self.init_event() self.mispevent.add_tag('bar') self.mispevent.add_tag(name='baz') @@ -69,7 +69,7 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_event_galaxy(self): + def test_event_galaxy(self) -> None: self.init_event() with open('tests/mispevent_testfiles/galaxy.json') as f: galaxy = json.load(f) @@ -78,11 +78,11 @@ class TestMISPEvent(unittest.TestCase): self.mispevent.add_galaxy(misp_galaxy) self.assertEqual(self.mispevent.galaxies[0].to_json(sort_keys=True, indent=2), json.dumps(galaxy, sort_keys=True, indent=2)) - def test_attribute(self): + def test_attribute(self) -> None: self.init_event() - a = self.mispevent.add_attribute('filename', 'bar.exe') + a: MISPAttribute = self.mispevent.add_attribute('filename', 'bar.exe') # type: ignore[assignment] del a.uuid - a = self.mispevent.add_attribute_tag('osint', 'bar.exe') + a = self.mispevent.add_attribute_tag('osint', 'bar.exe') # type: ignore[assignment] attr_tags = self.mispevent.get_attribute_tag('bar.exe') self.assertEqual(self.mispevent.attributes[0].tags[0].name, 'osint') self.assertEqual(attr_tags[0].name, 'osint') @@ -97,7 +97,7 @@ class TestMISPEvent(unittest.TestCase): ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_attribute_galaxy(self): + def test_attribute_galaxy(self) -> None: self.init_event() with open('tests/mispevent_testfiles/galaxy.json') as f: galaxy = json.load(f) @@ -112,7 +112,7 @@ class TestMISPEvent(unittest.TestCase): json.dumps(galaxy, sort_keys=True, indent=2) ) - def test_to_dict_json_format(self): + def test_to_dict_json_format(self) -> None: misp_event = MISPEvent() av_signature_object = MISPObject("av-signature") av_signature_object.add_attribute("signature", "EICAR") @@ -121,19 +121,19 @@ class TestMISPEvent(unittest.TestCase): self.assertEqual(json.loads(misp_event.to_json()), misp_event.to_dict(json_format=True)) - def test_object_tag(self): + def test_object_tag(self) -> None: self.mispevent.add_object(name='file', strict=True) - a = self.mispevent.objects[0].add_attribute('filename', value='') + a: MISPAttribute = self.mispevent.objects[0].add_attribute('filename', value='') # type: ignore[assignment] self.assertEqual(a, None) - a = self.mispevent.objects[0].add_attribute('filename', value=None) + a = self.mispevent.objects[0].add_attribute('filename', value=None) # type: ignore[assignment] self.assertEqual(a, None) - a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) + a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) # type: ignore[assignment] del a.uuid self.assertEqual(self.mispevent.objects[0].attributes[0].tags[0].name, 'blah') self.assertTrue(self.mispevent.objects[0].has_attributes_by_relation(['filename'])) self.assertEqual(len(self.mispevent.objects[0].get_attributes_by_relation('filename')), 1) self.mispevent.add_object(name='url', strict=True) - a = self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu') + a = self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu') # type: ignore[assignment] del a.uuid self.mispevent.objects[0].uuid = 'a' self.mispevent.objects[1].uuid = 'b' @@ -146,16 +146,16 @@ class TestMISPEvent(unittest.TestCase): self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) @unittest.skip("Not supported on MISP: https://github.com/MISP/MISP/issues/2638 - https://github.com/MISP/PyMISP/issues/168") - def test_object_level_tag(self): + def test_object_level_tag(self) -> None: self.mispevent.add_object(name='file', strict=True) self.mispevent.objects[0].add_attribute('filename', value='bar') - self.mispevent.objects[0].add_tag('osint') + self.mispevent.objects[0].add_tag('osint') # type: ignore[attr-defined] self.mispevent.objects[0].uuid = 'a' with open('tests/mispevent_testfiles/event_obj_tag.json') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_object_galaxy(self): + def test_object_galaxy(self) -> None: self.init_event() misp_object = MISPObject('github-user') misp_object.add_attribute('username', 'adulau') @@ -171,11 +171,11 @@ class TestMISPEvent(unittest.TestCase): json.dumps(galaxy, sort_keys=True, indent=2) ) - def test_malware(self): + def test_malware(self) -> None: with open('tests/mispevent_testfiles/simple.json', 'rb') as f: pseudofile = BytesIO(f.read()) self.init_event() - a = self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile) + a: MISPAttribute = self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile) # type: ignore[assignment] del a.uuid attribute = self.mispevent.attributes[0] self.assertEqual(attribute.malware_binary, pseudofile) @@ -184,40 +184,40 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_existing_malware(self): + def test_existing_malware(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/malware_exist.json') with open('tests/mispevent_testfiles/simple.json', 'rb') as f: pseudofile = BytesIO(f.read()) - self.assertEqual( - self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary.read(), - pseudofile.read()) + self.assertTrue(self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary) + if _mb := self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary: + self.assertEqual(_mb.read(), pseudofile.read()) - def test_sighting(self): + def test_sighting(self) -> None: sighting = MISPSighting() sighting.from_dict(value='1', type='bar', timestamp=11111111) with open('tests/mispevent_testfiles/sighting.json') as f: ref_json = json.load(f) self.assertEqual(sighting.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_existing_event(self): + def test_existing_event(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') with open('tests/mispevent_testfiles/existing_event.json') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_shadow_attributes_existing(self): + def test_shadow_attributes_existing(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/shadow.json') with open('tests/mispevent_testfiles/shadow.json') as f: ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) @unittest.skip("Not supported on MISP.") - def test_shadow_attributes(self): + def test_shadow_attributes(self) -> None: self.init_event() p = self.mispevent.add_proposal(type='filename', value='baz.jpg') del p.uuid - a = self.mispevent.add_attribute('filename', 'bar.exe') + a: MISPAttribute = self.mispevent.add_attribute('filename', 'bar.exe') # type: ignore[assignment] del a.uuid p = self.mispevent.attributes[0].add_proposal(type='filename', value='bar.pdf') del p.uuid @@ -225,15 +225,15 @@ class TestMISPEvent(unittest.TestCase): ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_default_attributes(self): + def test_default_attributes(self) -> None: self.mispevent.add_object(name='file', strict=True) - a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) + a: MISPAttribute = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) # type: ignore[assignment] del a.uuid - a = self.mispevent.objects[0].add_attribute('pattern-in-file', value='baz') + a = self.mispevent.objects[0].add_attribute('pattern-in-file', value='baz') # type: ignore[assignment] self.assertEqual(a.category, 'Artifacts dropped') del a.uuid self.mispevent.add_object(name='file', strict=False, default_attributes_parameters=self.mispevent.objects[0].attributes[0]) - a = self.mispevent.objects[1].add_attribute('filename', value='baz') + a = self.mispevent.objects[1].add_attribute('filename', value='baz') # type: ignore[assignment] del a.uuid self.mispevent.objects[0].uuid = 'a' self.mispevent.objects[1].uuid = 'b' @@ -242,16 +242,16 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_obj_default_values(self): + def test_obj_default_values(self) -> None: self.init_event() self.mispevent.add_object(name='whois', strict=True) - a = self.mispevent.objects[0].add_attribute('registrar', value='registar.example.com') + a: MISPAttribute = self.mispevent.objects[0].add_attribute('registrar', value='registar.example.com') # type: ignore[assignment] del a.uuid - a = self.mispevent.objects[0].add_attribute('domain', value='domain.example.com') + a = self.mispevent.objects[0].add_attribute('domain', value='domain.example.com') # type: ignore[assignment] del a.uuid - a = self.mispevent.objects[0].add_attribute('nameserver', value='ns1.example.com') + a = self.mispevent.objects[0].add_attribute('nameserver', value='ns1.example.com') # type: ignore[assignment] del a.uuid - a = self.mispevent.objects[0].add_attribute('nameserver', value='ns2.example.com', disable_correlation=False, to_ids=True, category='External analysis') + a = self.mispevent.objects[0].add_attribute('nameserver', value='ns2.example.com', disable_correlation=False, to_ids=True, category='External analysis') # type: ignore[assignment] del a.uuid self.mispevent.objects[0].uuid = 'a' with open('tests/mispevent_testfiles/def_param.json') as f: @@ -259,7 +259,7 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_obj_references_export(self): + def test_obj_references_export(self) -> None: self.init_event() obj1 = MISPObject(name="file") obj2 = MISPObject(name="url", standalone=False) @@ -272,29 +272,29 @@ class TestMISPEvent(unittest.TestCase): self.assertTrue("ObjectReference" in obj1.jsonable()) self.assertFalse("ObjectReference" in obj2.jsonable()) - def test_event_not_edited(self): + def test_event_not_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) - def test_event_edited(self): + def test_event_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.mispevent.info = 'blah' self.assertTrue(self.mispevent.edited) - def test_event_tag_edited(self): + def test_event_tag_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.add_tag('foo') self.assertTrue(self.mispevent.edited) - def test_event_attribute_edited(self): + def test_event_attribute_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.mispevent.attributes[0].value = 'blah' self.assertTrue(self.mispevent.attributes[0].edited) self.assertFalse(self.mispevent.attributes[1].edited) self.assertTrue(self.mispevent.edited) - def test_event_attribute_tag_edited(self): + def test_event_attribute_tag_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.attributes[0].tags[0].name = 'blah' @@ -303,14 +303,14 @@ class TestMISPEvent(unittest.TestCase): self.assertTrue(self.mispevent.attributes[0].edited) self.assertTrue(self.mispevent.edited) - def test_event_attribute_tag_edited_second(self): + def test_event_attribute_tag_edited_second(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.attributes[0].add_tag(name='blah') self.assertTrue(self.mispevent.attributes[0].edited) self.assertTrue(self.mispevent.edited) - def test_event_object_edited(self): + def test_event_object_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.objects[0].comment = 'blah' @@ -318,7 +318,7 @@ class TestMISPEvent(unittest.TestCase): self.assertFalse(self.mispevent.objects[1].edited) self.assertTrue(self.mispevent.edited) - def test_event_object_attribute_edited(self): + def test_event_object_attribute_edited(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.objects[0].attributes[0].comment = 'blah' @@ -326,7 +326,7 @@ class TestMISPEvent(unittest.TestCase): self.assertTrue(self.mispevent.objects[0].edited) self.assertTrue(self.mispevent.edited) - def test_event_object_attribute_edited_tag(self): + def test_event_object_attribute_edited_tag(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.assertFalse(self.mispevent.edited) self.mispevent.objects[0].attributes[0].add_tag('blah') @@ -337,12 +337,12 @@ class TestMISPEvent(unittest.TestCase): ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_obj_by_id(self): + def test_obj_by_id(self) -> None: self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') misp_obj = self.mispevent.get_object_by_id(1556) self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f') - def test_userdefined_object_custom_template(self): + def test_userdefined_object_custom_template(self) -> None: self.init_event() with open('tests/mispevent_testfiles/test_object_template/definition.json') as f: template = json.load(f) @@ -353,16 +353,16 @@ class TestMISPEvent(unittest.TestCase): self.mispevent.to_json(sort_keys=True, indent=2) self.assertEqual(e.exception.message, '{\'member3\'} are required.') - a = self.mispevent.objects[0].add_attribute('member3', value='foo') + a: MISPAttribute = self.mispevent.objects[0].add_attribute('member3', value='foo') # type: ignore[assignment] del a.uuid with self.assertRaises(InvalidMISPObject) as e: # Fail on requiredOneOf self.mispevent.to_json(sort_keys=True, indent=2) self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2') - a = self.mispevent.objects[0].add_attribute('member1', value='bar') + a = self.mispevent.objects[0].add_attribute('member1', value='bar') # type: ignore[assignment] del a.uuid - a = self.mispevent.objects[0].add_attribute('member1', value='baz') + a = self.mispevent.objects[0].add_attribute('member1', value='baz') # type: ignore[assignment] del a.uuid with self.assertRaises(InvalidMISPObject) as e: # member1 is not a multiple @@ -376,7 +376,7 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_userdefined_object_custom_dir(self): + def test_userdefined_object_custom_dir(self) -> None: self.init_event() self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles') with self.assertRaises(InvalidMISPObject) as e: @@ -384,16 +384,16 @@ class TestMISPEvent(unittest.TestCase): self.mispevent.to_json(sort_keys=True, indent=2) self.assertEqual(e.exception.message, '{\'member3\'} are required.') - a = self.mispevent.objects[0].add_attribute('member3', value='foo') + a: MISPAttribute = self.mispevent.objects[0].add_attribute('member3', value='foo') # type: ignore[assignment] del a.uuid with self.assertRaises(InvalidMISPObject) as e: # Fail on requiredOneOf self.mispevent.to_json(sort_keys=True, indent=2) self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2') - a = self.mispevent.objects[0].add_attribute('member1', value='bar') + a = self.mispevent.objects[0].add_attribute('member1', value='bar') # type: ignore[assignment] del a.uuid - a = self.mispevent.objects[0].add_attribute('member1', value='baz') + a = self.mispevent.objects[0].add_attribute('member1', value='baz') # type: ignore[assignment] del a.uuid with self.assertRaises(InvalidMISPObject) as e: # member1 is not a multiple @@ -407,10 +407,10 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) - def test_first_last_seen(self): + def test_first_last_seen(self) -> None: me = MISPEvent() me.info = 'Test First and Last Seen' - me.date = '2020.01.12' + me.date = '2020.01.12' # type: ignore[assignment] self.assertEqual(me.date.day, 12) me.add_attribute('ip-dst', '8.8.8.8', first_seen='06-21-1998', last_seen=1580213607.469571) self.assertEqual(me.attributes[0].first_seen.year, 1998) @@ -418,11 +418,11 @@ class TestMISPEvent(unittest.TestCase): now = datetime.now().astimezone() me.attributes[0].last_seen = now today = date.today() - me.attributes[0].first_seen = today + me.attributes[0].first_seen = today # type: ignore[assignment] self.assertEqual(me.attributes[0].first_seen.year, today.year) self.assertEqual(me.attributes[0].last_seen, now) - def test_feed(self): + def test_feed(self) -> None: me = MISPEvent() me.info = 'Test feed' org = MISPOrganisation() @@ -440,7 +440,7 @@ class TestMISPEvent(unittest.TestCase): self.assertEqual(feed['Event']['_manifest'][me.uuid]['info'], 'Test feed') self.assertEqual(len(feed['Event']['Object'][0]['Attribute']), 2) - def test_object_templates(self): + def test_object_templates(self) -> None: me = MISPEvent() for template in glob.glob(str(me.misp_objects_path / '*' / 'definition.json')): with open(template) as f: @@ -459,7 +459,7 @@ class TestMISPEvent(unittest.TestCase): subset = set(entry['categories']).issubset(me.describe_types['categories']) self.assertTrue(subset, f'{t_json["name"]} - {obj_relation}') - def test_git_vuln_finder(self): + def test_git_vuln_finder(self) -> None: with open('tests/git-vuln-finder-quagga.json') as f: dump = json.load(f) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 852d7a3..e2b7198 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -2,35 +2,31 @@ from __future__ import annotations +import hashlib +import json +import logging import os -import sys - +import time import unittest -from pymisp.tools import make_binary_objects from datetime import datetime, timedelta, date, timezone from io import BytesIO -import json from pathlib import Path -import hashlib - -import urllib3 -import time +from typing import TypeVar, Type, Any from uuid import uuid4 -import email - -from collections import defaultdict - -import logging -logging.disable(logging.CRITICAL) -logger = logging.getLogger('pymisp') +import urllib3 +from pymisp.tools import make_binary_objects try: - from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster + from pymisp import (register_user, PyMISP, MISPEvent, MISPOrganisation, + MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, + MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, + MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, + MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster, + MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist) from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator - from pymisp.exceptions import MISPServerError except ImportError: raise @@ -43,6 +39,8 @@ except ImportError as e: key = 'sL9hrjIyY405RyGQHLx5DoCAM92BNmmGa8P4ck1E' verifycert = False +logging.disable(logging.CRITICAL) +logger = logging.getLogger('pymisp') urllib3.disable_warnings() @@ -56,11 +54,23 @@ if not test_file_path.exists(): print('The test files are missing, pulling it.') os.system('git clone https://github.com/viper-framework/viper-test-files.git tests/viper-test-files') +T = TypeVar('T', bound='TestComprehensive') + class TestComprehensive(unittest.TestCase): + admin_misp_connector: PyMISP + user_misp_connector: PyMISP + test_usr: MISPUser + test_pub: MISPUser + test_org: MISPOrganisation + test_org_delegate: MISPOrganisation + delegate_user_misp_connector: PyMISP + pub_misp_connector: PyMISP + test_usr_delegate: MISPUser + @classmethod - def setUpClass(cls): + def setUpClass(cls: type[T]) -> None: cls.maxDiff = None # Connect as admin cls.admin_misp_connector = PyMISP(url, key, verifycert, debug=False) @@ -72,18 +82,18 @@ class TestComprehensive(unittest.TestCase): # Creates an org organisation = MISPOrganisation() organisation.name = 'Test Org' - cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) + cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) # type: ignore[assignment] # Create an org to delegate to organisation = MISPOrganisation() organisation.name = 'Test Org - delegate' - cls.test_org_delegate = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) + cls.test_org_delegate = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) # type: ignore[assignment] # Set the refault role (id 3 on the VM) cls.admin_misp_connector.set_default_role(3) # Creates a user user = MISPUser() user.email = 'testusr@user.local' user.org_id = cls.test_org.id - cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True) + cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True) # type: ignore[assignment] cls.user_misp_connector = PyMISP(url, cls.test_usr.authkey, verifycert, debug=True) cls.user_misp_connector.toggle_global_pythonify() # Creates a publisher @@ -91,14 +101,14 @@ class TestComprehensive(unittest.TestCase): user.email = 'testpub@user.local' user.org_id = cls.test_org.id user.role_id = 4 - cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True) + cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True) # type: ignore[assignment] cls.pub_misp_connector = PyMISP(url, cls.test_pub.authkey, verifycert) # Creates a user that can accept a delegation request user = MISPUser() user.email = 'testusr@delegate.recipient.local' user.org_id = cls.test_org_delegate.id user.role_id = 2 - cls.test_usr_delegate = cls.admin_misp_connector.add_user(user, pythonify=True) + cls.test_usr_delegate = cls.admin_misp_connector.add_user(user, pythonify=True) # type: ignore[assignment] cls.delegate_user_misp_connector = PyMISP(url, cls.test_usr_delegate.authkey, verifycert, debug=False) cls.delegate_user_misp_connector.toggle_global_pythonify() if not fast_mode: @@ -111,7 +121,7 @@ class TestComprehensive(unittest.TestCase): cls.admin_misp_connector.load_default_feeds() @classmethod - def tearDownClass(cls): + def tearDownClass(cls) -> None: # Delete publisher cls.admin_misp_connector.delete_user(cls.test_pub) # Delete user @@ -121,16 +131,16 @@ class TestComprehensive(unittest.TestCase): cls.admin_misp_connector.delete_organisation(cls.test_org) cls.admin_misp_connector.delete_organisation(cls.test_org_delegate) - def create_simple_event(self, force_timestamps=False): + def create_simple_event(self, force_timestamps: bool=False) -> MISPEvent: mispevent = MISPEvent(force_timestamps=force_timestamps) mispevent.info = 'This is a super simple test' mispevent.distribution = Distribution.your_organisation_only - mispevent.threat_level_id = ThreatLevel.low - mispevent.analysis = Analysis.completed + mispevent.threat_level_id = int(ThreatLevel.low) + mispevent.analysis = int(Analysis.completed) mispevent.add_attribute('text', str(uuid4())) return mispevent - def environment(self): + def environment(self) -> tuple[MISPEvent, MISPEvent, MISPEvent]: first_event = MISPEvent() first_event.info = 'First event - org only - low - completed' first_event.distribution = Distribution.your_organisation_only @@ -172,13 +182,13 @@ class TestComprehensive(unittest.TestCase): # Create first and third event as admin # usr won't be able to see the first one - first = self.admin_misp_connector.add_event(first_event, pythonify=True) - third = self.admin_misp_connector.add_event(third_event, pythonify=True) + first: MISPEvent = self.admin_misp_connector.add_event(first_event, pythonify=True) # type: ignore[assignment] + third: MISPEvent = self.admin_misp_connector.add_event(third_event, pythonify=True) # type: ignore[assignment] # Create second event as user - second = self.user_misp_connector.add_event(second_event) + second: MISPEvent = self.user_misp_connector.add_event(second_event) # type: ignore[assignment] return first, second, third - def test_server_settings(self): + def test_server_settings(self) -> None: settings = self.admin_misp_connector.server_settings() for final_setting in settings['finalSettings']: if final_setting['setting'] == 'MISP.max_correlations_per_event': @@ -203,7 +213,7 @@ class TestComprehensive(unittest.TestCase): setting = self.admin_misp_connector.get_server_setting('MISP.live') self.assertTrue(setting['value']) - def test_search_value_event(self): + def test_search_value_event(self) -> None: '''Search a value on the event controller * Test ACL admin user vs normal user in an other org * Make sure we have one match @@ -211,17 +221,17 @@ class TestComprehensive(unittest.TestCase): try: first, second, third = self.environment() # Search as admin - events = self.admin_misp_connector.search(value=first.attributes[0].value, pythonify=True) + events: list[MISPEvent] = self.admin_misp_connector.search(value=first.attributes[0].value, pythonify=True) # type: ignore[assignment] self.assertEqual(len(events), 2) for e in events: self.assertIn(e.id, [first.id, second.id]) # Search as user - events = self.user_misp_connector.search(value=first.attributes[0].value) + events = self.user_misp_connector.search(value=first.attributes[0].value) # type: ignore[assignment] self.assertEqual(len(events), 1) for e in events: self.assertIn(e.id, [second.id]) # Non-existing value - events = self.user_misp_connector.search(value=str(uuid4())) + events = self.user_misp_connector.search(value=str(uuid4())) # type: ignore[assignment] self.assertEqual(events, []) finally: # Delete events @@ -229,37 +239,37 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_value_attribute(self): + def test_search_value_attribute(self) -> None: '''Search value in attributes controller''' try: first, second, third = self.environment() # Search as admin - attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True) + attributes: list[MISPAttribute] = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True) # type: ignore[assignment] self.assertEqual(len(attributes), 2) for a in attributes: self.assertIn(a.event_id, [first.id, second.id]) # Search as user - attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value) + attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value) # type: ignore[assignment] self.assertEqual(len(attributes), 1) for a in attributes: self.assertIn(a.event_id, [second.id]) # Non-existing value - attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4())) + attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4())) # type: ignore[assignment] self.assertEqual(attributes, []) # Include context - search as user (can only see one event) - attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_context=True, pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_context=True, pythonify=True) # type: ignore[assignment] self.assertTrue(isinstance(attributes[0].Event, MISPEvent)) self.assertEqual(attributes[0].Event.uuid, second.uuid) # Include context - search as admin (can see both event) - attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_context=True, pythonify=True) + attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_context=True, pythonify=True) # type: ignore[assignment] self.assertTrue(isinstance(attributes[0].Event, MISPEvent)) self.assertEqual(attributes[0].Event.uuid, first.uuid) self.assertEqual(attributes[1].Event.uuid, second.uuid) # Include correlations - search as admin (can see both event) - attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_correlations=True, pythonify=True) + attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_correlations=True, pythonify=True) # type: ignore[assignment] self.assertTrue(isinstance(attributes[0].Event, MISPEvent)) self.assertEqual(attributes[0].Event.uuid, first.uuid) self.assertEqual(attributes[1].Event.uuid, second.uuid) @@ -267,8 +277,9 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(attributes[1].RelatedAttribute[0].Event.uuid, first.uuid) # Include sightings - search as admin (can see both event) - self.admin_misp_connector.add_sighting({'value': first.attributes[0].value}) - attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_sightings=True, pythonify=True) + s: dict[str, Any] = {'value': first.attributes[0].value} + self.admin_misp_connector.add_sighting(s) + attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_sightings=True, pythonify=True) # type: ignore[assignment] self.assertTrue(isinstance(attributes[0].Event, MISPEvent)) self.assertEqual(attributes[0].Event.uuid, first.uuid) self.assertEqual(attributes[1].Event.uuid, second.uuid) @@ -280,17 +291,21 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_type_event(self): + def test_search_type_event(self) -> None: '''Search multiple events, search events containing attributes with specific types''' try: first, second, third = self.environment() # Search as admin - events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(), pythonify=True) + if isinstance(first.timestamp, datetime): + ts = first.timestamp.timestamp() + else: + ts = first.timestamp + events: list[MISPEvent] = self.admin_misp_connector.search(timestamp=ts, pythonify=True) # type: ignore[assignment] self.assertEqual(len(events), 3) for e in events: self.assertIn(e.id, [first.id, second.id, third.id]) attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst']) - events = self.admin_misp_connector.search(timestamp=first.timestamp.timestamp(), + events = self.admin_misp_connector.search(timestamp=ts, # type: ignore[assignment,type-var] type_attribute=attributes_types_search, pythonify=True) self.assertEqual(len(events), 2) for e in events: @@ -301,28 +316,32 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_index(self): + def test_search_index(self) -> None: try: first, second, third = self.environment() # Search as admin - events = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), pythonify=True) + if isinstance(first.timestamp, datetime): + ts = first.timestamp.timestamp() + else: + ts = first.timestamp + events: MISPEvent = self.admin_misp_connector.search_index(timestamp=ts, pythonify=True) # type: ignore[assignment] self.assertEqual(len(events), 3) for e in events: self.assertIn(e.id, [first.id, second.id, third.id]) # Test limit and pagination - event_one = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=1, pythonify=True)[0] - event_two = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=2, pythonify=True)[0] + event_one: MISPEvent = self.admin_misp_connector.search_index(timestamp=ts, limit=1, page=1, pythonify=True)[0] # type: ignore[index,assignment] + event_two: MISPEvent = self.admin_misp_connector.search_index(timestamp=ts, limit=1, page=2, pythonify=True)[0] # type: ignore[index,assignment] self.assertTrue(event_one.id != event_two.id) two_events = self.admin_misp_connector.search_index(limit=2) self.assertTrue(len(two_events), 2) # Test ordering by the Info field. Can't use timestamp as each will likely have the same - event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=True, limit=1, pythonify=True)[0] + event: MISPEvent = self.admin_misp_connector.search_index(timestamp=ts, sort="info", desc=True, limit=1, pythonify=True)[0] # type: ignore[index,assignment] # First|Second|*Third* event self.assertEqual(event.id, third.id) # *First*|Second|Third event - event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=False, limit=1, pythonify=True)[0] + event = self.admin_misp_connector.search_index(timestamp=ts, sort="info", desc=False, limit=1, pythonify=True)[0] # type: ignore[index,assignment] self.assertEqual(event.id, first.id) finally: # Delete event @@ -330,7 +349,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_objects(self): + def test_search_objects(self) -> None: '''Search for objects''' try: first = self.create_simple_event() @@ -348,7 +367,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_search_type_attribute(self): + def test_search_type_attribute(self) -> None: '''Search multiple attributes, search attributes with specific types''' try: first, second, third = self.environment() @@ -372,7 +391,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_tag_event(self): + def test_search_tag_event(self) -> None: '''Search Tags at events level''' try: first, second, third = self.environment() @@ -406,7 +425,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_tag_attribute(self): + def test_search_tag_attribute(self) -> None: '''Search Tags at attributes level''' try: first, second, third = self.environment() @@ -433,7 +452,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_tag_advanced_event(self): + def test_search_tag_advanced_event(self) -> None: '''Advanced search Tags at events level''' try: first, second, third = self.environment() @@ -463,7 +482,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_tag_advanced_attributes(self): + def test_search_tag_advanced_attributes(self) -> None: '''Advanced search Tags at attributes level''' try: first, second, third = self.environment() @@ -482,7 +501,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_search_timestamp_event(self): + def test_search_timestamp_event(self) -> None: '''Search specific update timestamps at events level''' # Creating event 1 - timestamp 5 min ago first = self.create_simple_event(force_timestamps=True) @@ -518,7 +537,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_search_timestamp_attribute(self): + def test_search_timestamp_attribute(self) -> None: '''Search specific update timestamps at attributes level''' # Creating event 1 - timestamp 5 min ago first = self.create_simple_event(force_timestamps=True) @@ -556,7 +575,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_user_perms(self): + def test_user_perms(self) -> None: '''Test publish rights''' try: first = self.create_simple_event() @@ -572,7 +591,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_delete_with_update(self): + def test_delete_with_update(self) -> None: try: first = self.create_simple_event() obj = MISPObject('file') @@ -597,14 +616,14 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_get_non_exists_event(self): + def test_get_non_exists_event(self) -> None: event = self.user_misp_connector.get_event(0) # non exists id self.assertEqual(event['errors'][0], 404) event = self.user_misp_connector.get_event("ab2b6e28-fda5-4282-bf60-22b81de77851") # non exists uuid self.assertEqual(event['errors'][0], 404) - def test_delete_by_uuid(self): + def test_delete_by_uuid(self) -> None: try: first = self.create_simple_event() obj = MISPObject('file') @@ -641,7 +660,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_search_publish_timestamp(self): + def test_search_publish_timestamp(self) -> None: '''Search for a specific publication timestamp, an interval, and invalid values.''' # Creating event 1 first = self.create_simple_event() @@ -680,7 +699,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_search_decay(self): + def test_search_decay(self) -> None: # Creating event 1 first = self.create_simple_event() first.add_attribute('ip-dst', '8.8.8.8') @@ -705,7 +724,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_default_distribution(self): + def test_default_distribution(self) -> None: '''The default distributions on the VM are This community only for the events and Inherit from event for attr/obj)''' first = self.create_simple_event() del first.distribution @@ -747,7 +766,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_exists(self): + def test_exists(self) -> None: """Check event, attribute and object existence""" event = self.create_simple_event() misp_object = MISPObject('domain-ip') @@ -784,7 +803,7 @@ class TestComprehensive(unittest.TestCase): self.assertFalse(self.user_misp_connector.object_exists(misp_object)) self.assertFalse(self.user_misp_connector.object_exists(misp_object.id)) - def test_simple_event(self): + def test_simple_event(self) -> None: '''Search a bunch of parameters: * Value not existing * only return metadata @@ -1003,7 +1022,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_event_add_update_metadata(self): + def test_event_add_update_metadata(self) -> None: event = self.create_simple_event() event.add_attribute('ip-src', '9.9.9.9') try: @@ -1017,7 +1036,7 @@ class TestComprehensive(unittest.TestCase): finally: # cleanup self.admin_misp_connector.delete_event(event) - def test_extend_event(self): + def test_extend_event(self) -> None: first = self.create_simple_event() first.info = 'parent event' first.add_tag('tlp:amber___test') @@ -1038,7 +1057,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_edit_attribute(self): + def test_edit_attribute(self) -> None: first = self.create_simple_event() try: first.attributes[0].comment = 'This is the original comment' @@ -1058,7 +1077,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_sightings(self): + def test_sightings(self) -> None: first = self.create_simple_event() second = self.create_simple_event() try: @@ -1133,7 +1152,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_search_csv(self): + def test_search_csv(self) -> None: first = self.create_simple_event() first.attributes[0].comment = 'This is the original comment' second = self.create_simple_event() @@ -1213,7 +1232,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_search_text(self): + def test_search_text(self) -> None: first = self.create_simple_event() first.add_attribute('ip-src', '8.8.8.8') first.publish() @@ -1227,7 +1246,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_search_stix(self): + def test_search_stix(self) -> None: first = self.create_simple_event() first.add_attribute('ip-src', '8.8.8.8') try: @@ -1242,7 +1261,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_update_object(self): + def test_update_object(self) -> None: first = self.create_simple_event() ip_dom = MISPObject('domain-ip') ip_dom.add_attribute('domain', value='google.fr') @@ -1346,7 +1365,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_custom_template(self): + def test_custom_template(self) -> None: first = self.create_simple_event() try: with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f: @@ -1388,7 +1407,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_unknown_template(self): + def test_unknown_template(self) -> None: first = self.create_simple_event() attributeAsDict = [{'MyCoolAttribute': {'value': 'critical thing', 'type': 'text'}}, {'MyCoolerAttribute': {'value': 'even worse', 'type': 'text', 'disable_correlation': True}}] @@ -1422,7 +1441,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_domain_ip_object(self): + def test_domain_ip_object(self) -> None: first = self.create_simple_event() try: dom_ip_obj = DomainIPObject({'ip': ['1.1.1.1', {'value': '2.2.2.2', 'to_ids': False}], @@ -1436,7 +1455,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_asn_object(self): + def test_asn_object(self) -> None: first = self.create_simple_event() try: dom_ip_obj = ASNObject({'asn': '12345', @@ -1449,7 +1468,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_object_template(self): + def test_object_template(self) -> None: r = self.admin_misp_connector.update_object_templates() self.assertEqual(type(r), list) object_templates = self.admin_misp_connector.object_templates(pythonify=True) @@ -1468,7 +1487,7 @@ class TestComprehensive(unittest.TestCase): mo.add_attribute('domain', 'google.fr') self.assertEqual(mo.template_uuid, '4') - def test_tags(self): + def test_tags(self) -> None: # Get list tags = self.admin_misp_connector.tags(pythonify=True) self.assertTrue(isinstance(tags, list)) @@ -1557,7 +1576,7 @@ class TestComprehensive(unittest.TestCase): response = self.admin_misp_connector.delete_tag(tag_org_restricted) response = self.admin_misp_connector.delete_tag(tag_user_restricted) - def test_add_event_with_attachment_object_controller(self): + def test_add_event_with_attachment_object_controller(self) -> None: first = self.create_simple_event() try: first = self.user_misp_connector.add_event(first) @@ -1597,7 +1616,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_add_event_with_attachment_object_controller__hard(self): + def test_add_event_with_attachment_object_controller__hard(self) -> None: first = self.create_simple_event() try: first = self.user_misp_connector.add_event(first) @@ -1638,7 +1657,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_lief_and_sign(self): + def test_lief_and_sign(self) -> None: first = self.create_simple_event() try: first = self.user_misp_connector.add_event(first) @@ -1682,7 +1701,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_add_event_with_attachment(self): + def test_add_event_with_attachment(self) -> None: first = self.create_simple_event() try: first = self.user_misp_connector.add_event(first) @@ -1700,7 +1719,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_taxonomies(self): + def test_taxonomies(self) -> None: # Make sure we're up-to-date r = self.admin_misp_connector.update_taxonomies() self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.') @@ -1740,7 +1759,7 @@ class TestComprehensive(unittest.TestCase): # Return back to default required status r = self.admin_misp_connector.set_taxonomy_required(tax, not tax.required) - def test_warninglists(self): + def test_warninglists(self) -> None: # Make sure we're up-to-date r = self.admin_misp_connector.update_warninglists() self.assertTrue('name' in r, msg=r) @@ -1769,7 +1788,7 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.disable_warninglist(testwl) self.assertEqual(r['success'], '1 warninglist(s) disabled') - def test_noticelists(self): + def test_noticelists(self) -> None: # Make sure we're up-to-date r = self.admin_misp_connector.update_noticelists() self.assertEqual(r['name'], 'All noticelists are up to date already.') @@ -1790,7 +1809,7 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.disable_noticelist(testnl) self.assertFalse(r['Noticelist']['enabled'], r) - def test_correlation_exclusions(self): + def test_correlation_exclusions(self) -> None: newce = MISPCorrelationExclusion() newce.value = "test-correlation-exclusion" r = self.admin_misp_connector.add_correlation_exclusion(newce, pythonify=True) @@ -1805,7 +1824,7 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.clean_correlation_exclusions() self.assertTrue(r['success']) - def test_galaxies(self): + def test_galaxies(self) -> None: # Make sure we're up-to-date r = self.admin_misp_connector.update_galaxies() self.assertEqual(r['name'], 'Galaxies updated.') @@ -1821,7 +1840,7 @@ class TestComprehensive(unittest.TestCase): # FIXME: Fails due to https://github.com/MISP/MISP/issues/4855 # self.assertTrue('GalaxyCluster' in r) - def test_zmq(self): + def test_zmq(self) -> None: first = self.create_simple_event() try: first = self.user_misp_connector.add_event(first) @@ -1831,7 +1850,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_csv_loader(self): + def test_csv_loader(self) -> None: csv1 = CSVLoader(template_name='file', csv_path=Path('tests/csv_testfiles/valid_fieldnames.csv')) event = MISPEvent() event.info = 'Test event from CSV loader' @@ -1849,7 +1868,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_user(self): + def test_user(self) -> None: # Get list users = self.admin_misp_connector.users(pythonify=True) self.assertTrue(isinstance(users, list)) @@ -1871,7 +1890,7 @@ class TestComprehensive(unittest.TestCase): key = self.user_misp_connector.get_new_authkey() self.assertTrue(isinstance(key, str)) - def test_organisation(self): + def test_organisation(self) -> None: # Get list orgs = self.admin_misp_connector.organisations(pythonify=True) self.assertTrue(isinstance(orgs, list)) @@ -1888,7 +1907,7 @@ class TestComprehensive(unittest.TestCase): organisation = self.admin_misp_connector.update_organisation(organisation, pythonify=True) self.assertEqual(organisation.name, 'blah', organisation) - def test_org_search(self): + def test_org_search(self) -> None: orgs = self.admin_misp_connector.organisations(pythonify=True) org_name = 'ORGNAME' # Search by the org name @@ -1899,7 +1918,7 @@ class TestComprehensive(unittest.TestCase): # This org should have the name ORGNAME self.assertEqual(orgs[0].name, org_name) - def test_user_search(self): + def test_user_search(self) -> None: users = self.admin_misp_connector.users(pythonify=True) emailAddr = users[0].email @@ -1915,7 +1934,7 @@ class TestComprehensive(unittest.TestCase): self.assertTrue(len(users) == 1) self.assertEqual(users[0].email, emailAddr) - def test_attribute(self): + def test_attribute(self) -> None: first = self.create_simple_event() second = self.create_simple_event() a = second.add_attribute('ip-src', '11.11.11.11') @@ -2084,7 +2103,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_search_type_event_csv(self): + def test_search_type_event_csv(self) -> None: try: first, second, third = self.environment() # Search as admin @@ -2103,7 +2122,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(third) @unittest.skip("Not very important, skip for now.") - def test_search_logs(self): + def test_search_logs(self) -> None: r = self.admin_misp_connector.update_user({'email': 'testusr-changed@user.local'}, self.test_usr) r = self.admin_misp_connector.search_logs(model='User', created=date.today(), pythonify=True) for entry in r[-1:]: @@ -2121,22 +2140,22 @@ class TestComprehensive(unittest.TestCase): else: raise Exception('Unable to find log entry after updating the user') - def test_db_schema(self): + def test_db_schema(self) -> None: diag = self.admin_misp_connector.db_schema_diagnostic() self.assertEqual(diag['actual_db_version'], diag['expected_db_version'], diag) - def test_live_acl(self): + def test_live_acl(self) -> None: missing_acls = self.admin_misp_connector.remote_acl() self.assertEqual(missing_acls, [], msg=missing_acls) - def test_roles(self): + def test_roles(self) -> None: role = self.admin_misp_connector.set_default_role(4) self.assertEqual(role['message'], 'Default role set.') self.admin_misp_connector.set_default_role(3) roles = self.admin_misp_connector.roles(pythonify=True) self.assertTrue(isinstance(roles, list)) - def test_describe_types(self): + def test_describe_types(self) -> None: remote = self.admin_misp_connector.describe_types_remote remote_types = remote.pop('types') remote_categories = remote.pop('categories') @@ -2155,12 +2174,12 @@ class TestComprehensive(unittest.TestCase): for typ in mapping: self.assertIn(typ, remote_types) - def test_versions(self): + def test_versions(self) -> None: self.assertEqual(self.user_misp_connector.version, self.user_misp_connector.pymisp_version_master) self.assertEqual(self.user_misp_connector.misp_instance_version['version'], self.user_misp_connector.misp_instance_version_master['version']) - def test_statistics(self): + def test_statistics(self) -> None: try: # Attributes first, second, third = self.environment() @@ -2209,7 +2228,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(second) self.admin_misp_connector.delete_event(third) - def test_direct(self): + def test_direct(self) -> None: try: r = self.user_misp_connector.direct_call('events/add', data={'info': 'foo'}) event = MISPEvent() @@ -2226,7 +2245,7 @@ class TestComprehensive(unittest.TestCase): finally: self.admin_misp_connector.delete_event(event) - def test_freetext(self): + def test_freetext(self) -> None: first = self.create_simple_event() try: self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%', force_enable=True) @@ -2262,7 +2281,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_sharing_groups(self): + def test_sharing_groups(self) -> None: # add sg = MISPSharingGroup() sg.name = 'Testcases SG' @@ -2338,7 +2357,7 @@ class TestComprehensive(unittest.TestCase): self.assertFalse(self.admin_misp_connector.sharing_group_exists(sharing_group.id)) self.assertFalse(self.admin_misp_connector.sharing_group_exists(sharing_group.uuid)) - def test_sharing_group(self): + def test_sharing_group(self) -> None: # add sg = MISPSharingGroup() sg.name = 'Testcases SG' @@ -2363,7 +2382,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_sharing_group(sharing_group.id) self.assertFalse(self.admin_misp_connector.sharing_group_exists(sharing_group)) - def test_sharing_group_search(self): + def test_sharing_group_search(self) -> None: # Add sharing group sg = MISPSharingGroup() sg.name = 'Testcases SG' @@ -2410,7 +2429,7 @@ class TestComprehensive(unittest.TestCase): self.assertFalse(self.admin_misp_connector.sharing_group_exists(sharing_group)) - def test_feeds(self): + def test_feeds(self) -> None: # Add feed = MISPFeed() feed.name = 'TestFeed' @@ -2493,7 +2512,7 @@ class TestComprehensive(unittest.TestCase): self.assertFalse(updated_feed.enabled) self.assertEqual(updated_feed.settings, e_thread_csv_feed.settings) - def test_servers(self): + def test_servers(self) -> None: # add server = MISPServer() server.name = 'Test Server' @@ -2513,7 +2532,7 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.delete_server(server) self.assertEqual(r['name'], 'Server deleted') - def test_roles_expanded(self): + def test_roles_expanded(self) -> None: '''Test all possible things regarding roles 1. Use existing roles (ID in test VM): * Read only (6): Can only connect via API and see events visible by its organisation @@ -2631,7 +2650,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_user(test_roles_user) self.admin_misp_connector.delete_tag(test_tag) - def test_expansion(self): + def test_expansion(self) -> None: first = self.create_simple_event() try: md5_disk = hashlib.md5() @@ -2668,7 +2687,7 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) - def test_user_settings(self): + def test_user_settings(self) -> None: first = self.create_simple_event() first.distribution = 3 first.add_tag('test_publish_filter') @@ -2729,7 +2748,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_communities(self): + def test_communities(self) -> None: communities = self.admin_misp_connector.communities(pythonify=True) self.assertEqual(communities[0].name, 'CIRCL Private Sector Information Sharing Community - aka MISPPRIV') community = self.admin_misp_connector.get_community(communities[1], pythonify=True) @@ -2743,7 +2762,7 @@ class TestComprehensive(unittest.TestCase): # if k == 'To': # self.assertEqual(v, 'info@circl.lu') - def test_upload_stix(self): + def test_upload_stix(self) -> None: # FIXME https://github.com/MISP/MISP/issues/4892 try: r1 = self.user_misp_connector.upload_stix('tests/stix1.xml-utf8', version='1') @@ -2774,7 +2793,7 @@ class TestComprehensive(unittest.TestCase): except Exception: pass - def test_toggle_global_pythonify(self): + def test_toggle_global_pythonify(self) -> None: first = self.create_simple_event() second = self.create_simple_event() try: @@ -2789,7 +2808,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - def test_first_last_seen(self): + def test_first_last_seen(self) -> None: event = MISPEvent() event.info = 'Test First Last seen' event.add_attribute('ip-dst', '8.8.8.8', first_seen='2020-01-03', last_seen='2020-01-04T12:30:34.323242+0800') @@ -2836,7 +2855,7 @@ class TestComprehensive(unittest.TestCase): finally: self.admin_misp_connector.delete_event(first) - def test_registrations(self): + def test_registrations(self) -> None: r = register_user(url, 'self_register@user.local', organisation=self.test_org, org_name=self.test_org.name, verify=verifycert) self.assertTrue(r['saved']) @@ -2866,7 +2885,7 @@ class TestComprehensive(unittest.TestCase): m = self.admin_misp_connector.discard_user_registration(registrations[1].id) self.assertEqual(m['name'], '1 registration(s) discarded.') - def test_search_workflow(self): + def test_search_workflow(self) -> None: first = self.create_simple_event() first.add_attribute('domain', 'google.com') tag = MISPTag() @@ -2915,7 +2934,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_tag(tag) - def test_search_workflow_ts(self): + def test_search_workflow_ts(self) -> None: first = self.create_simple_event() first.add_attribute('domain', 'google.com') tag = MISPTag() @@ -2964,14 +2983,14 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_tag(tag) - def test_blocklists(self): + def test_blocklists(self) -> None: first = self.create_simple_event() second = self.create_simple_event() second.Orgc = self.test_org - to_delete = {'bl_events': [], 'bl_organisations': []} + to_delete: dict[str, MISPOrganisationBlocklist | MISPEventBlocklist] = {'bl_events': [], 'bl_organisations': []} try: # test events BL - ebl = self.admin_misp_connector.add_event_blocklist(uuids=[first.uuid]) + ebl: MISPEventBlocklist = self.admin_misp_connector.add_event_blocklist(uuids=[first.uuid]) self.assertEqual(ebl['result']['successes'][0], first.uuid, ebl) bl_events = self.admin_misp_connector.event_blocklists(pythonify=True) for ble in bl_events: @@ -3005,7 +3024,7 @@ class TestComprehensive(unittest.TestCase): blo.comment = 'This is a test' blo.org_name = 'bar' - blo = self.admin_misp_connector.update_organisation_blocklist(blo, pythonify=True) + blo: MISPOrganisationBlocklist = self.admin_misp_connector.update_organisation_blocklist(blo, pythonify=True) self.assertEqual(blo.org_name, 'bar') r = self.admin_misp_connector.delete_organisation_blocklist(blo) self.assertTrue(r['success']) @@ -3016,15 +3035,15 @@ class TestComprehensive(unittest.TestCase): for blo in to_delete['bl_organisations']: self.admin_misp_connector.delete_organisation_blocklist(blo) - def test_event_report(self): + def test_event_report(self) -> None: event = self.create_simple_event() - new_event_report = MISPEventReport() + new_event_report: MISPEventReport = MISPEventReport() new_event_report.name = "Test Event Report" new_event_report.content = "# Example report markdown" new_event_report.distribution = 5 # Inherit try: event = self.user_misp_connector.add_event(event) - new_event_report = self.user_misp_connector.add_event_report(event.id, new_event_report) + new_event_report = self.user_misp_connector.add_event_report(event.id, new_event_report) # type: ignore[assignment] # The event report should be linked by Event ID self.assertEqual(event.id, new_event_report.event_id) @@ -3034,12 +3053,12 @@ class TestComprehensive(unittest.TestCase): new_event_report.name = "Updated Event Report" new_event_report.content = "Updated content" - new_event_report = self.user_misp_connector.update_event_report(new_event_report) + new_event_report = self.user_misp_connector.update_event_report(new_event_report) # type: ignore[assignment] # The event report should be updatable self.assertTrue(new_event_report.name == "Updated Event Report") self.assertTrue(new_event_report.content == "Updated content") - event_reports = self.user_misp_connector.get_event_reports(event.id) + event_reports: list[MISPEventReport] = self.user_misp_connector.get_event_reports(event.id) # type: ignore[assignment] # The event report should be requestable by the Event ID self.assertEqual(new_event_report.id, event_reports[0].id) @@ -3054,33 +3073,41 @@ class TestComprehensive(unittest.TestCase): self.user_misp_connector.delete_event(event) self.user_misp_connector.delete_event_report(new_event_report) - def test_search_galaxy(self): + def test_search_galaxy(self) -> None: self.admin_misp_connector.toggle_global_pythonify() - galaxy = self.admin_misp_connector.galaxies()[0] + galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies() # type: ignore[assignment] + galaxy: MISPGalaxy = galaxies[0] ret = self.admin_misp_connector.search_galaxy(value=galaxy.name) self.assertEqual(len(ret), 1) self.admin_misp_connector.toggle_global_pythonify() - def test_galaxy_cluster(self): + def test_galaxy_cluster(self) -> None: self.admin_misp_connector.toggle_global_pythonify() - galaxy = self.admin_misp_connector.galaxies()[0] - new_galaxy_cluster = MISPGalaxyCluster() + galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies() # type: ignore[assignment] + galaxy: MISPGalaxy = galaxies[0] + new_galaxy_cluster: MISPGalaxyCluster = MISPGalaxyCluster() new_galaxy_cluster.value = "Test Cluster" new_galaxy_cluster.authors = ["MISP"] new_galaxy_cluster.distribution = 1 new_galaxy_cluster.description = "Example test cluster" try: - galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True) + if gid := galaxy.id: + galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True) # type: ignore[assignment] + else: + raise Exception("No galaxy found") existing_galaxy_cluster = galaxy.clusters[0] - new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(galaxy.id, new_galaxy_cluster) + if gid := galaxy.id: + new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(gid, new_galaxy_cluster) # type: ignore[assignment] + else: + raise Exception("No galaxy found") # The new galaxy cluster should be under the selected galaxy self.assertEqual(galaxy.id, new_galaxy_cluster.galaxy_id) # The cluster should have the right value self.assertEqual(new_galaxy_cluster.value, "Test Cluster") new_galaxy_cluster.add_cluster_element("synonyms", "Test2") - new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] # The cluster should have one element that is a synonym self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1) @@ -3093,22 +3120,22 @@ class TestComprehensive(unittest.TestCase): # The cluster element should be updatable element.value = "Test3" - new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] element = new_galaxy_cluster.cluster_elements[0] self.assertEqual(element.value, "Test3") new_galaxy_cluster.add_cluster_element("synonyms", "ToDelete") - new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] # The cluster should have two elements self.assertEqual(len(new_galaxy_cluster.cluster_elements), 2) new_galaxy_cluster.cluster_elements = [e for e in new_galaxy_cluster.cluster_elements if e.value != "ToDelete"] - new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] # The cluster elements should be deletable self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1) new_galaxy_cluster.add_cluster_relation(existing_galaxy_cluster, "is-tested-by") - new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] # The cluster should have a relationship self.assertEqual(len(new_galaxy_cluster.cluster_relations), 1) relation = new_galaxy_cluster.cluster_relations[0] @@ -3116,7 +3143,7 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(relation.referenced_galaxy_cluster_uuid, existing_galaxy_cluster.uuid) relation.add_tag("tlp:amber") - new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] relation = new_galaxy_cluster.cluster_relations[0] # The relationship should have a tag of tlp:amber self.assertEqual(len(relation.tags), 1) @@ -3126,32 +3153,36 @@ class TestComprehensive(unittest.TestCase): resp = self.admin_misp_connector.delete_galaxy_cluster_relation(relation) self.assertTrue(resp['success']) # The cluster relation should no longer be present - new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] self.assertEqual(len(new_galaxy_cluster.cluster_relations), 0) resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster) # Galaxy clusters should be soft deletable self.assertTrue(resp['success']) - new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) + new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] self.assertTrue(isinstance(new_galaxy_cluster, MISPGalaxyCluster)) resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True) # Galaxy clusters should be hard deletable self.assertTrue(resp['success']) - resp = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) + resp = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster) # type: ignore[assignment] self.assertTrue("errors" in resp) finally: self.admin_misp_connector.delete_galaxy_cluster_relation(relation) self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True) self.admin_misp_connector.toggle_global_pythonify() - def test_event_galaxy(self): + def test_event_galaxy(self) -> None: self.admin_misp_connector.toggle_global_pythonify() event = self.create_simple_event() try: - galaxy = self.admin_misp_connector.galaxies()[0] - galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True) - galaxy_cluster = galaxy.clusters[0] + galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies() # type: ignore[assignment] + galaxy: MISPGalaxy = galaxies[0] + if gid := galaxy.id: + galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True) # type: ignore[assignment] + else: + raise Exception("No galaxy found") + galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[0] event.add_tag(galaxy_cluster.tag_name) event = self.admin_misp_connector.add_event(event) # The event should have a galaxy attached @@ -3166,7 +3197,7 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.toggle_global_pythonify() @unittest.skip("Internal use only") - def missing_methods(self): + def missing_methods(self) -> None: skip = [ "attributes/download", "attributes/add_attachment",