chg: Add more strict typing, not done yet.

pull/1144/head
Raphaël Vinot 2024-02-01 14:40:12 +01:00
parent 9853f23683
commit 1da0d5adc1
27 changed files with 442 additions and 393 deletions

View File

@ -3,7 +3,7 @@ strict = True
warn_return_any = False
show_error_context = True
pretty = True
exclude = feed-generator|examples|pymisp/tools|pymisp/data|tests|docs
exclude = tests/testlive_comprehensive.py|tests/testlive_sync.py|feed-generator|examples|pymisp/data|docs|pymisp/tools/openioc.py|pymisp/tools/reportlab_generator.py|tests/test_reportlab.py
# Stuff to remove gradually
# disallow_untyped_defs = False

View File

@ -42,7 +42,6 @@ try:
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa
from .tools import openioc # noqa
from .tools import ext_lookups # noqa
from .tools import update_objects # noqa

View File

@ -8,7 +8,7 @@ from deprecated import deprecated # type: ignore
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum
from enum import Enum, IntEnum
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache
@ -46,7 +46,7 @@ class MISPFileCache:
return data
class Distribution(Enum):
class Distribution(IntEnum):
your_organisation_only = 0
this_community_only = 1
connected_communities = 2
@ -55,14 +55,14 @@ class Distribution(Enum):
inherit = 5
class ThreatLevel(Enum):
class ThreatLevel(IntEnum):
high = 1
medium = 2
low = 3
undefined = 4
class Analysis(Enum):
class Analysis(IntEnum):
initial = 0
ongoing = 1
completed = 2

View File

@ -1012,7 +1012,7 @@ class PyMISP:
to_return.append(s)
return to_return
def add_sighting(self, sighting: MISPSighting,
def add_sighting(self, sighting: MISPSighting | dict[str, Any],
attribute: MISPAttribute | int | str | UUID | None = None,
pythonify: bool = False) -> dict[str, Any] | MISPSighting:
"""Add a new sighting (globally, or to a specific attribute): https://www.misp-project.org/openapi/#tag/Sightings/operation/addSighting and https://www.misp-project.org/openapi/#tag/Sightings/operation/getSightingsByEventId

View File

@ -13,7 +13,7 @@ from collections import defaultdict
import logging
import hashlib
from pathlib import Path
from typing import IO, Any
from typing import IO, Any, Sequence
import warnings
try:
@ -1010,15 +1010,17 @@ class MISPObject(AbstractMISP):
self.edited = True
return attribute
def add_attributes(self, object_relation: str, *attributes: list[dict[str, Any] | MISPAttribute]) -> list[MISPAttribute | None]:
def add_attributes(self, object_relation: str, *attributes: Sequence[str | dict[str, Any] | MISPAttribute]) -> list[MISPAttribute | None]:
'''Add multiple attributes with the same object_relation.
Helper for object_relation when multiple is True in the template.
It is the same as calling multiple times add_attribute with the same object_relation.
'''
to_return = []
for attribute in attributes:
if isinstance(attribute, dict):
a = self.add_attribute(object_relation, **attribute)
if isinstance(attribute, MISPAttribute):
a = self.add_attribute(object_relation, **attribute.to_dict())
elif isinstance(attribute, dict):
a = self.add_attribute(object_relation, **attribute) # type: ignore[misc]
else:
a = self.add_attribute(object_relation, value=attribute)
to_return.append(a)
@ -1256,6 +1258,10 @@ class MISPGalaxyCluster(AbstractMISP):
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
"""
id: int | str | None
tag_name: str
galaxy_id: str | None
def __init__(self) -> None:
super().__init__()
self.Galaxy: MISPGalaxy
@ -1405,6 +1411,8 @@ class MISPGalaxyCluster(AbstractMISP):
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters"""
id: str | None
def __init__(self) -> None:
super().__init__()
self.GalaxyCluster: list[MISPGalaxyCluster] = []
@ -2048,6 +2056,8 @@ class MISPObjectTemplate(AbstractMISP):
class MISPUser(AbstractMISP):
authkey: str
def __init__(self, **kwargs: dict[str, Any]) -> None:
super().__init__(**kwargs)
self.email: str

View File

@ -6,7 +6,7 @@ import ipaddress
import socket
import idna
from publicsuffixlist import PublicSuffixList # type: ignore
from urllib.parse import urlparse, urlunparse
from urllib.parse import urlparse, urlunparse, ParseResult
class UrlNotDecoded(Exception):
@ -18,20 +18,20 @@ class PSLFaup:
Fake Faup Python Library using PSL for Windows support
"""
def __init__(self):
def __init__(self) -> None:
self.decoded = False
self.psl = PublicSuffixList()
self._url = None
self._retval = {}
self.ip_as_host = False
self._url: ParseResult | None = None
self._retval: dict[str, str | int | None | bytes] = {}
self.ip_as_host = ''
def _clear(self):
def _clear(self) -> None:
self.decoded = False
self._url = None
self._retval = {}
self.ip_as_host = False
self.ip_as_host = ''
def decode(self, url) -> None:
def decode(self, url: str) -> None:
"""
This function creates a dict of all the url fields.
:param url: The URL to normalize
@ -43,10 +43,15 @@ class PSLFaup:
url = '//' + url
self._url = urlparse(url)
self.ip_as_host = False
if self._url is None:
raise UrlNotDecoded("Unable to parse URL")
self.ip_as_host = ''
if self._url.hostname is None:
raise UrlNotDecoded("Unable to parse URL")
hostname = _ensure_str(self._url.hostname)
try:
ipv4_bytes = socket.inet_aton(_ensure_str(hostname))
ipv4_bytes = socket.inet_aton(hostname)
ipv4 = ipaddress.IPv4Address(ipv4_bytes)
self.ip_as_host = ipv4.compressed
except (OSError, ValueError):
@ -61,61 +66,70 @@ class PSLFaup:
self._retval = {}
@property
def url(self):
if not self.decoded:
def url(self) -> bytes | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
netloc = self.get_host() + ('' if self.get_port() is None else f':{self.get_port()}')
return _ensure_bytes(
urlunparse(
(self.get_scheme(), netloc, self.get_resource_path(),
'', self.get_query_string(), self.get_fragment(),)
if host := self.get_host():
netloc = host + ('' if self.get_port() is None else f':{self.get_port()}')
return _ensure_bytes(
urlunparse(
(self.get_scheme(), netloc, self.get_resource_path(),
'', self.get_query_string(), self.get_fragment(),)
)
)
)
return None
def get_scheme(self):
def get_scheme(self) -> str:
"""
Get the scheme of the url given in the decode function
:returns: The URL scheme
"""
if not self.decoded:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.scheme)
return _ensure_str(self._url.scheme if self._url.scheme else '')
def get_credential(self):
if not self.decoded:
def get_credential(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if self._url.password:
if self._url.username and self._url.password:
return _ensure_str(self._url.username) + ':' + _ensure_str(self._url.password)
if self._url.username:
return _ensure_str(self._url.username)
return None
def get_subdomain(self):
if not self.decoded:
def get_subdomain(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_host() is not None and not self.ip_as_host:
if self.get_domain() in self.get_host():
return self.get_host().rsplit(self.get_domain(), 1)[0].rstrip('.') or None
domain = self.get_domain()
host = self.get_host()
if domain and host and domain in host:
return host.rsplit(domain, 1)[0].rstrip('.') or None
return None
def get_domain(self):
if not self.decoded:
def get_domain(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_host() is not None and not self.ip_as_host:
return self.psl.privatesuffix(self.get_host())
return None
def get_domain_without_tld(self):
if not self.decoded:
def get_domain_without_tld(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_tld() is not None and not self.ip_as_host:
return self.get_domain().rsplit(self.get_tld(), 1)[0].rstrip('.')
if domain := self.get_domain():
return domain.rsplit(self.get_tld(), 1)[0].rstrip('.')
return None
def get_host(self):
if not self.decoded:
def get_host(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if self._url.hostname is None:
@ -125,45 +139,48 @@ class PSLFaup:
else:
return _ensure_str(idna.encode(self._url.hostname, uts46=True))
def get_unicode_host(self):
if not self.decoded:
def get_unicode_host(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if not self.ip_as_host:
return idna.decode(self.get_host(), uts46=True)
if host := self.get_host():
return idna.decode(host, uts46=True)
return None
def get_tld(self):
if not self.decoded:
def get_tld(self) -> str | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_host() is not None and not self.ip_as_host:
return self.psl.publicsuffix(self.get_host())
return None
def get_port(self):
if not self.decoded:
def get_port(self) -> int | None:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
return self._url.port
def get_resource_path(self):
if not self.decoded:
def get_resource_path(self) -> str:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.path)
def get_query_string(self):
if not self.decoded:
def get_query_string(self) -> str:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.query)
def get_fragment(self):
if not self.decoded:
def get_fragment(self) -> str:
if not self.decoded or not self._url:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.fragment)
def get(self):
def get(self) -> dict[str, str | int | None | bytes]:
self._retval["scheme"] = self.get_scheme()
self._retval["tld"] = self.get_tld()
self._retval["domain"] = self.get_domain()
@ -178,14 +195,14 @@ class PSLFaup:
return self._retval
def _ensure_bytes(binary) -> bytes:
def _ensure_bytes(binary: str | bytes) -> bytes:
if isinstance(binary, bytes):
return binary
else:
return binary.encode('utf-8')
def _ensure_str(string) -> str:
def _ensure_str(string: str | bytes) -> str:
if isinstance(string, str):
return string
else:

View File

@ -31,7 +31,7 @@ class MISPMsgConverstionError(MISPObjectException):
class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None, # type: ignore[no-untyped-def]
def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | bytes | None=None, # type: ignore[no-untyped-def]
attach_original_email: bool = True, **kwargs) -> None:
super().__init__('email', **kwargs)
@ -79,11 +79,11 @@ class EMailObject(AbstractMISPObjectGenerator):
return eml
except UnicodeDecodeError:
pass
raise PyMISPNotImplementedYet("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.")
raise InvalidMISPObject("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.")
@staticmethod
def create_pseudofile(filepath: Path | str | None = None,
pseudofile: BytesIO | None = None) -> BytesIO:
pseudofile: BytesIO | bytes | None = None) -> BytesIO:
"""Creates a pseudofile using directly passed data or data loaded from file path.
"""
if filepath:
@ -91,6 +91,8 @@ class EMailObject(AbstractMISPObjectGenerator):
return BytesIO(f.read())
elif pseudofile and isinstance(pseudofile, BytesIO):
return pseudofile
elif pseudofile and isinstance(pseudofile, bytes):
return BytesIO(pseudofile)
else:
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')

View File

@ -15,7 +15,7 @@ except ImportError:
has_pymispgalaxies = False
def revert_tag_from_galaxies(tag):
def revert_tag_from_galaxies(tag: str) -> list[str]:
clusters = Clusters()
try:
return clusters.revert_machinetag(tag)
@ -23,7 +23,7 @@ def revert_tag_from_galaxies(tag):
return []
def revert_tag_from_taxonomies(tag):
def revert_tag_from_taxonomies(tag: str) -> list[str]:
taxonomies = Taxonomies()
try:
return taxonomies.revert_machinetag(tag)
@ -31,7 +31,7 @@ def revert_tag_from_taxonomies(tag):
return []
def search_taxonomies(query):
def search_taxonomies(query: str) -> list[str]:
taxonomies = Taxonomies()
found = taxonomies.search(query)
if not found:
@ -39,6 +39,6 @@ def search_taxonomies(query):
return found
def search_galaxies(query):
def search_galaxies(query: str) -> list[str]:
clusters = Clusters()
return clusters.search(query)

View File

@ -2,20 +2,23 @@
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging
from typing import Any
from .abstractgenerator import AbstractMISPObjectGenerator
logger = logging.getLogger('pymisp')
class Fail2BanObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool = True, **kwargs):
def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs): # type: ignore[no-untyped-def]
super().__init__('fail2ban', strict=strict, **kwargs)
self._parameters = parameters
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
timestamp = self._sanitize_timestamp(self._parameters.pop('processing-timestamp', None))
self._parameters['processing-timestamp'] = timestamp
super().generate_attributes()

View File

@ -5,10 +5,9 @@ from __future__ import annotations
from pathlib import Path
from pymisp import MISPEvent
import json
from typing import List
def feed_meta_generator(path: Path):
def feed_meta_generator(path: Path) -> None:
manifests = {}
hashes: list[str] = []

View File

@ -30,7 +30,9 @@ except ImportError:
class FileObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Path | str | None = None, pseudofile: BytesIO | bytes | None = None, filename: str | None = None, **kwargs) -> None:
def __init__(self, filepath: Path | str | None = None, # type: ignore[no-untyped-def]
pseudofile: BytesIO | bytes | None = None,
filename: str | None = None, **kwargs) -> None:
super().__init__('file', **kwargs)
if not HAS_PYDEEP:
logger.warning("pydeep is missing, please install pymisp this way: pip install pymisp[fileobjects]")
@ -55,10 +57,10 @@ class FileObject(AbstractMISPObjectGenerator):
self.__data = self.__pseudofile.getvalue()
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
self.add_attribute('filename', value=self.__filename)
size = self.add_attribute('size-in-bytes', value=len(self.__data))
if int(size.value) > 0:
self.add_attribute('size-in-bytes', value=len(self.__data))
if len(self.__data) > 0:
self.add_attribute('entropy', value=self.__entropy_H(self.__data))
self.add_attribute('md5', value=md5(self.__data).hexdigest())
self.add_attribute('sha1', value=sha1(self.__data).hexdigest())

View File

@ -10,7 +10,7 @@ from .abstractgenerator import AbstractMISPObjectGenerator
class GenericObjectGenerator(AbstractMISPObjectGenerator):
# FIXME: this method is different from the master one, and that's probably not a good idea.
def generate_attributes(self, attributes: list[dict[str, Any]]) -> None:
def generate_attributes(self, attributes: list[dict[str, Any]]) -> None: # type: ignore[override]
"""Generates MISPObjectAttributes from a list of dictionaries.
Each entry if the list must be in one of the two following formats:
* {<object_relation>: <value>}

View File

@ -2,20 +2,23 @@
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging
from typing import Any
from .abstractgenerator import AbstractMISPObjectGenerator
logger = logging.getLogger('pymisp')
class GeolocationObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool = True, **kwargs):
def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('geolocation', strict=strict, **kwargs)
self._parameters = parameters
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
first = self._sanitize_timestamp(self._parameters.pop('first-seen', None))
self._parameters['first-seen'] = first
last = self._sanitize_timestamp(self._parameters.pop('last-seen', None))

View File

@ -2,20 +2,23 @@
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging
from typing import Any
from .abstractgenerator import AbstractMISPObjectGenerator
logger = logging.getLogger('pymisp')
class GitVulnFinderObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool = True, **kwargs):
def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('git-vuln-finder', strict=strict, **kwargs)
self._parameters = parameters
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
authored_date = self._sanitize_timestamp(self._parameters.pop('authored_date', None))
self._parameters['authored_date'] = authored_date
committed_date = self._sanitize_timestamp(self._parameters.pop('committed_date', None))

View File

@ -2,26 +2,29 @@
from __future__ import annotations
from ..api import PyMISP
try:
from pymispwarninglists import WarningLists # type: ignore
from pymispwarninglists import WarningLists, WarningList # type: ignore
has_pymispwarninglists = True
except ImportError:
has_pymispwarninglists = False
def from_instance(pymisp_instance, slow_search=False):
def from_instance(pymisp_instance: PyMISP, slow_search: bool=False) -> WarningLists:
"""Load the warnindlist from an existing MISP instance
:pymisp_instance: Already instantialized PyMISP instance."""
warninglists_index = pymisp_instance.get_warninglists()['Warninglists']
warninglists_index = pymisp_instance.warninglists(pythonify=True)
all_warningslists = []
for warninglist in warninglists_index:
wl = pymisp_instance.get_warninglist(warninglist['Warninglist']['id'])['Warninglist']
wl['list'] = wl.pop('WarninglistEntry')
all_warningslists.append(wl)
if isinstance(warninglist, WarningList):
wl = pymisp_instance.get_warninglist(warninglist['Warninglist']['id'])['Warninglist']
wl['list'] = wl.pop('WarninglistEntry')
all_warningslists.append(wl)
return WarningLists(slow_search, all_warningslists)
def from_package(slow_search=False):
def from_package(slow_search: bool=False) -> WarningLists:
return WarningLists(slow_search)

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import glob
import os
from .. import MISPEvent
try:
@ -13,23 +14,23 @@ except ImportError:
class Neo4j():
def __init__(self, host='localhost:7474', username='neo4j', password='neo4j'):
def __init__(self, host: str='localhost:7474', username: str='neo4j', password: str='neo4j') -> None:
if not has_py2neo:
raise Exception('py2neo is required, please install: pip install py2neo')
authenticate(host, username, password)
self.graph = Graph(f"http://{host}/db/data/")
def load_events_directory(self, directory):
self.events = []
def load_events_directory(self, directory: str) -> None:
self.events: list[MISPEvent] = []
for path in glob.glob(os.path.join(directory, '*.json')):
e = MISPEvent()
e.load(path)
self.import_event(e)
def del_all(self):
def del_all(self) -> None:
self.graph.delete_all()
def import_event(self, event):
def import_event(self, event: MISPEvent) -> None:
tx = self.graph.begin()
event_node = Node('Event', uuid=event.uuid, name=event.info)
# event_node['distribution'] = event.distribution

View File

@ -9,13 +9,13 @@ class SBSignatureObject(AbstractMISPObjectGenerator):
'''
Sandbox Analyzer
'''
def __init__(self, software: str, report: list, **kwargs):
def __init__(self, software: str, report: list[tuple[str, str]], **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('sb-signature', **kwargs)
self._software = software
self._report = report
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
''' Parse the report for relevant attributes '''
self.add_attribute("software", value=self._software)
for (signature_name, description) in self._report:

View File

@ -2,20 +2,21 @@
from __future__ import annotations
import logging
from io import StringIO
from pathlib import Path
from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
from io import StringIO
import logging
from typing import Optional, Union
from pathlib import Path
logger = logging.getLogger('pymisp')
class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
def __init__(self, authorized_keys_path: Path | str | None = None, authorized_keys_pseudofile: StringIO | None = None, **kwargs):
# PY3 way:
def __init__(self, authorized_keys_path: Path | str | None = None, # type: ignore[no-untyped-def]
authorized_keys_pseudofile: StringIO | None = None, **kwargs):
super().__init__('ssh-authorized-keys', **kwargs)
if authorized_keys_path:
with open(authorized_keys_path) as f:
@ -27,7 +28,7 @@ class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
self.__data = self.__pseudofile.getvalue()
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
for line in self.__pseudofile:
if line.startswith('ssh') or line.startswith('ecdsa'):
key = line.split(' ')[1]

View File

@ -1,35 +0,0 @@
from __future__ import annotations
try:
from misp_stix_converter.converters.buildMISPAttribute import buildEvent # type: ignore
from misp_stix_converter.converters import convert # type: ignore
from misp_stix_converter.converters.convert import MISPtoSTIX # type: ignore
has_misp_stix_converter = True
except ImportError:
has_misp_stix_converter = False
def load_stix(stix, distribution: int = 3, threat_level_id: int = 2, analysis: int = 0):
'''Returns a MISPEvent object from a STIX package'''
if not has_misp_stix_converter:
raise Exception('You need to install misp_stix_converter: pip install git+https://github.com/MISP/MISP-STIX-Converter.git')
stix = convert.load_stix(stix)
return buildEvent(stix, distribution=distribution,
threat_level_id=threat_level_id, analysis=analysis)
def make_stix_package(misp_event, to_json: bool = False, to_xml: bool = False):
'''Returns a STIXPackage from a MISPEvent.
Optionally can return the package in json or xml.
'''
if not has_misp_stix_converter:
raise Exception('You need to install misp_stix_converter: pip install git+https://github.com/MISP/MISP-STIX-Converter.git')
package = MISPtoSTIX(misp_event)
if to_json:
return package.to_json()
elif to_xml:
return package.to_xml()
else:
return package

View File

@ -13,7 +13,7 @@ from ..abstract import resources_path
static_repo = "https://github.com/MISP/misp-objects/archive/main.zip"
def update_objects():
def update_objects() -> None:
r = requests.get(static_repo)
zipped_repo = BytesIO(r.content)

View File

@ -2,10 +2,12 @@
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging
from urllib.parse import unquote_plus
from .abstractgenerator import AbstractMISPObjectGenerator
try:
from pyfaup.faup import Faup # type: ignore
except (OSError, ImportError):
@ -18,13 +20,13 @@ faup = Faup()
class URLObject(AbstractMISPObjectGenerator):
def __init__(self, url: str, generate_all=False, **kwargs):
def __init__(self, url: str, generate_all=False, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('url', **kwargs)
self._generate_all = True if generate_all is True else False
faup.decode(unquote_plus(url))
self.generate_attributes()
def generate_attributes(self):
def generate_attributes(self) -> None:
self.add_attribute('url', value=faup.url.decode())
if faup.get_host():
self.add_attribute('host', value=faup.get_host())

View File

@ -5,6 +5,8 @@ from __future__ import annotations
import requests
import json
from typing import Any
from .abstractgenerator import AbstractMISPObjectGenerator
# Original sourcecode: https://github.com/hayk57/MISP_registration_check
@ -13,14 +15,16 @@ from .abstractgenerator import AbstractMISPObjectGenerator
class VehicleObject(AbstractMISPObjectGenerator):
'''Vehicle object generator out of regcheck.org.uk'''
country_urls = {
country_urls: dict[str, str] = {
'fr': "http://www.regcheck.org.uk/api/reg.asmx/CheckFrance",
'es': "http://www.regcheck.org.uk/api/reg.asmx/CheckSpain",
'uk': "http://www.regcheck.org.uk/api/reg.asmx/Check"
}
def __init__(self, country: str, registration: str, username: str, **kwargs):
def __init__(self, country: str, registration: str, username: str, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('vehicle', **kwargs)
if country not in self.country_urls:
raise ValueError(f"Country {country} not supportet, must be one of {self.country_urls.keys()}")
self._country = country
self._registration = registration
self._username = username
@ -28,10 +32,10 @@ class VehicleObject(AbstractMISPObjectGenerator):
self.generate_attributes()
@property
def report(self):
def report(self) -> dict[str, Any]:
return self._report
def generate_attributes(self):
def generate_attributes(self) -> None:
carDescription = self._report["Description"]
carMake = self._report["CarMake"]["CurrentTextValue"]
carModel = self._report["CarModel"]["CurrentTextValue"]
@ -67,14 +71,14 @@ class VehicleObject(AbstractMISPObjectGenerator):
self.add_attribute('date-first-registration', type='text', value=firstRegistration)
self.add_attribute('image-url', type='text', value=ImageUrl)
def _query(self):
def _query(self) -> dict[str, Any]:
payload = f"RegistrationNumber={self._registration}&username={self._username}"
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'cache-control': "no-cache",
}
response = requests.request("POST", self.country_urls.get(self._country), data=payload, headers=headers)
response = requests.request("POST", self.country_urls[self._country], data=payload, headers=headers)
# FIXME: Clean that up.
for item in response.text.split("</vehicleJson>"):
if "<vehicleJson>" in item:

View File

@ -3,7 +3,7 @@
from __future__ import annotations
import re
from typing import Optional
from typing import Any
import requests
try:
@ -25,7 +25,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
:indicator: IOC to search VirusTotal for
'''
def __init__(self, apikey: str, indicator: str, vt_proxies: dict | None = None, **kwargs):
def __init__(self, apikey: str, indicator: str, vt_proxies: dict[str, str] | None = None, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('virustotal-report', **kwargs)
indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator)
@ -37,17 +37,17 @@ class VTReportObject(AbstractMISPObjectGenerator):
error_msg = f"A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{indicator}' instead"
raise InvalidMISPObject(error_msg)
def get_report(self):
def get_report(self) -> dict[str, Any]:
return self._report
def generate_attributes(self):
def generate_attributes(self) -> None:
''' Parse the VirusTotal report for relevant attributes '''
self.add_attribute("last-submission", value=self._report["scan_date"])
self.add_attribute("permalink", value=self._report["permalink"])
ratio = "{}/{}".format(self._report["positives"], self._report["total"])
self.add_attribute("detection-ratio", value=ratio)
def __validate_resource(self, ioc: str):
def __validate_resource(self, ioc: str) -> str | bool:
'''
Validate the data type of an indicator.
Domains and IP addresses aren't supported because
@ -63,7 +63,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
return "file"
return False
def __query_virustotal(self, apikey: str, resource: str):
def __query_virustotal(self, apikey: str, resource: str) -> dict[str, Any]:
'''
Query VirusTotal for information about an indicator

View File

@ -7,22 +7,26 @@ from email.message import EmailMessage
from io import BytesIO
from os import urandom
from pathlib import Path
from typing import List
from typing import TypeVar, Type
from zipfile import ZipFile
from pymisp.tools import EMailObject
from pymisp.exceptions import PyMISPNotImplementedYet, InvalidMISPObject
T = TypeVar('T', bound='TestEmailObject')
class TestEmailObject(unittest.TestCase):
eml_1: BytesIO
@classmethod
def setUpClass(cls):
def setUpClass(cls: type[T]) -> None:
with ZipFile(Path("tests/email_testfiles/mail_1.eml.zip"), 'r') as myzip:
with myzip.open('mail_1.eml', pwd=b'AVs are dumb') as myfile:
cls.eml_1 = BytesIO(myfile.read())
def test_mail_1(self):
def test_mail_1(self) -> None:
email_object = EMailObject(pseudofile=self.eml_1)
self.assertEqual(self._get_values(email_object, "subject")[0], "письмо уведом-е")
self.assertEqual(self._get_values(email_object, "to")[0], "kinney@noth.com")
@ -39,7 +43,7 @@ class TestEmailObject(unittest.TestCase):
self.assertIsInstance(file_name, str)
self.assertIsInstance(file_content, BytesIO)
def test_mail_1_headers_only(self):
def test_mail_1_headers_only(self) -> None:
email_object = EMailObject(Path("tests/email_testfiles/mail_1_headers_only.eml"))
self.assertEqual(self._get_values(email_object, "subject")[0], "письмо уведом-е")
self.assertEqual(self._get_values(email_object, "to")[0], "kinney@noth.com")
@ -50,7 +54,7 @@ class TestEmailObject(unittest.TestCase):
self.assertIsInstance(email_object.email, EmailMessage)
self.assertEqual(len(email_object.attachments), 0)
def test_mail_multiple_to(self):
def test_mail_multiple_to(self) -> None:
email_object = EMailObject(Path("tests/email_testfiles/mail_multiple_to.eml"))
to = self._get_values(email_object, "to")
@ -60,7 +64,7 @@ class TestEmailObject(unittest.TestCase):
self.assertEqual(to[1], "jan.marek@example.com")
self.assertEqual(to_display_name[1], "Marek, Jan")
def test_msg(self):
def test_msg(self) -> None:
# Test result of eml converted to msg is the same
eml_email_object = EMailObject(pseudofile=self.eml_1)
email_object = EMailObject(Path("tests/email_testfiles/mail_1.msg"))
@ -83,7 +87,7 @@ class TestEmailObject(unittest.TestCase):
self.assertEqual(self._get_values(email_object, "received-header-ip"),
self._get_values(eml_email_object, "received-header-ip"))
def test_bom_encoded(self):
def test_bom_encoded(self) -> None:
"""Test utf-8-sig encoded email"""
bom_email_object = EMailObject(Path("tests/email_testfiles/mail_1_bom.eml"))
eml_email_object = EMailObject(pseudofile=self.eml_1)
@ -106,7 +110,7 @@ class TestEmailObject(unittest.TestCase):
self.assertEqual(self._get_values(bom_email_object, "received-header-ip"),
self._get_values(eml_email_object, "received-header-ip"))
def test_handling_of_various_email_types(self):
def test_handling_of_various_email_types(self) -> None:
self._does_not_fail(Path("tests/email_testfiles/mail_2.eml"),
"ensuring all headers work")
self._does_not_fail(Path('tests/email_testfiles/mail_3.eml'),
@ -118,7 +122,7 @@ class TestEmailObject(unittest.TestCase):
self._does_not_fail(Path('tests/email_testfiles/mail_5.msg'),
"Check encapsulated HTML works")
def _does_not_fail(self, path, test_type="test"):
def _does_not_fail(self, path: Path, test_type: str="test") -> None:
found_error = None
try:
EMailObject(path)
@ -130,7 +134,7 @@ class TestEmailObject(unittest.TestCase):
path,
test_type))
def test_random_binary_blob(self):
def test_random_binary_blob(self) -> None:
"""Email parser fails correctly on random binary blob."""
random_data = urandom(1024)
random_blob = BytesIO(random_data)
@ -145,8 +149,8 @@ class TestEmailObject(unittest.TestCase):
broken_obj = EMailObject(pseudofile=random_blob)
except Exception as _e:
found_error = _e
if not isinstance(found_error, PyMISPNotImplementedYet):
self.fail("Expected PyMISPNotImplementedYet when EmailObject receives completely unknown binary input data in a pseudofile. But, did not get that exception.")
if not isinstance(found_error, InvalidMISPObject):
self.fail("Expected InvalidMISPObject when EmailObject receives completely unknown binary input data in a pseudofile. But, did not get that exception.")
@staticmethod
def _get_values(obj: EMailObject, relation: str) -> list[str]:

View File

@ -9,7 +9,7 @@ import pathlib
class TestFileObject(unittest.TestCase):
def test_mimeType(self):
def test_mimeType(self) -> None:
file_object = FileObject(filepath=pathlib.Path(__file__))
attributes = json.loads(file_object.to_json())['Attribute']
mime = next(attr for attr in attributes if attr['object_relation'] == 'mimetype')

View File

@ -17,24 +17,24 @@ from pymisp.tools import GitVulnFinderObject
class TestMISPEvent(unittest.TestCase):
def setUp(self):
def setUp(self) -> None:
self.maxDiff = None
self.mispevent = MISPEvent()
def init_event(self):
def init_event(self) -> None:
self.mispevent.info = 'This is a test'
self.mispevent.distribution = 1
self.mispevent.threat_level_id = 1
self.mispevent.analysis = 1
self.mispevent.set_date("2017-12-31") # test the set date method
def test_simple(self):
def test_simple(self) -> None:
with open('tests/mispevent_testfiles/simple.json') as f:
ref_json = json.load(f)
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event(self):
def test_event(self) -> None:
self.init_event()
self.mispevent.publish()
with open('tests/mispevent_testfiles/event.json') as f:
@ -42,22 +42,22 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_loadfile(self):
def test_loadfile(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/event.json')
with open('tests/mispevent_testfiles/event.json') as f:
ref_json = json.load(f)
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_loadfile_validate(self):
def test_loadfile_validate(self) -> None:
misp_event = MISPEvent()
misp_event.load_file('tests/mispevent_testfiles/event.json', validate=True)
def test_loadfile_validate_strict(self):
def test_loadfile_validate_strict(self) -> None:
misp_event = MISPEvent(strict_validation=True)
misp_event.load_file('tests/mispevent_testfiles/event.json', validate=True)
def test_event_tag(self):
def test_event_tag(self) -> None:
self.init_event()
self.mispevent.add_tag('bar')
self.mispevent.add_tag(name='baz')
@ -69,7 +69,7 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event_galaxy(self):
def test_event_galaxy(self) -> None:
self.init_event()
with open('tests/mispevent_testfiles/galaxy.json') as f:
galaxy = json.load(f)
@ -78,11 +78,11 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.add_galaxy(misp_galaxy)
self.assertEqual(self.mispevent.galaxies[0].to_json(sort_keys=True, indent=2), json.dumps(galaxy, sort_keys=True, indent=2))
def test_attribute(self):
def test_attribute(self) -> None:
self.init_event()
a = self.mispevent.add_attribute('filename', 'bar.exe')
a: MISPAttribute = self.mispevent.add_attribute('filename', 'bar.exe') # type: ignore[assignment]
del a.uuid
a = self.mispevent.add_attribute_tag('osint', 'bar.exe')
a = self.mispevent.add_attribute_tag('osint', 'bar.exe') # type: ignore[assignment]
attr_tags = self.mispevent.get_attribute_tag('bar.exe')
self.assertEqual(self.mispevent.attributes[0].tags[0].name, 'osint')
self.assertEqual(attr_tags[0].name, 'osint')
@ -97,7 +97,7 @@ class TestMISPEvent(unittest.TestCase):
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_attribute_galaxy(self):
def test_attribute_galaxy(self) -> None:
self.init_event()
with open('tests/mispevent_testfiles/galaxy.json') as f:
galaxy = json.load(f)
@ -112,7 +112,7 @@ class TestMISPEvent(unittest.TestCase):
json.dumps(galaxy, sort_keys=True, indent=2)
)
def test_to_dict_json_format(self):
def test_to_dict_json_format(self) -> None:
misp_event = MISPEvent()
av_signature_object = MISPObject("av-signature")
av_signature_object.add_attribute("signature", "EICAR")
@ -121,19 +121,19 @@ class TestMISPEvent(unittest.TestCase):
self.assertEqual(json.loads(misp_event.to_json()), misp_event.to_dict(json_format=True))
def test_object_tag(self):
def test_object_tag(self) -> None:
self.mispevent.add_object(name='file', strict=True)
a = self.mispevent.objects[0].add_attribute('filename', value='')
a: MISPAttribute = self.mispevent.objects[0].add_attribute('filename', value='') # type: ignore[assignment]
self.assertEqual(a, None)
a = self.mispevent.objects[0].add_attribute('filename', value=None)
a = self.mispevent.objects[0].add_attribute('filename', value=None) # type: ignore[assignment]
self.assertEqual(a, None)
a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}])
a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) # type: ignore[assignment]
del a.uuid
self.assertEqual(self.mispevent.objects[0].attributes[0].tags[0].name, 'blah')
self.assertTrue(self.mispevent.objects[0].has_attributes_by_relation(['filename']))
self.assertEqual(len(self.mispevent.objects[0].get_attributes_by_relation('filename')), 1)
self.mispevent.add_object(name='url', strict=True)
a = self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu')
a = self.mispevent.objects[1].add_attribute('url', value='https://www.circl.lu') # type: ignore[assignment]
del a.uuid
self.mispevent.objects[0].uuid = 'a'
self.mispevent.objects[1].uuid = 'b'
@ -146,16 +146,16 @@ class TestMISPEvent(unittest.TestCase):
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
@unittest.skip("Not supported on MISP: https://github.com/MISP/MISP/issues/2638 - https://github.com/MISP/PyMISP/issues/168")
def test_object_level_tag(self):
def test_object_level_tag(self) -> None:
self.mispevent.add_object(name='file', strict=True)
self.mispevent.objects[0].add_attribute('filename', value='bar')
self.mispevent.objects[0].add_tag('osint')
self.mispevent.objects[0].add_tag('osint') # type: ignore[attr-defined]
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/event_obj_tag.json') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_object_galaxy(self):
def test_object_galaxy(self) -> None:
self.init_event()
misp_object = MISPObject('github-user')
misp_object.add_attribute('username', 'adulau')
@ -171,11 +171,11 @@ class TestMISPEvent(unittest.TestCase):
json.dumps(galaxy, sort_keys=True, indent=2)
)
def test_malware(self):
def test_malware(self) -> None:
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
pseudofile = BytesIO(f.read())
self.init_event()
a = self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile)
a: MISPAttribute = self.mispevent.add_attribute('malware-sample', 'bar.exe', data=pseudofile) # type: ignore[assignment]
del a.uuid
attribute = self.mispevent.attributes[0]
self.assertEqual(attribute.malware_binary, pseudofile)
@ -184,40 +184,40 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_malware(self):
def test_existing_malware(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/malware_exist.json')
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
pseudofile = BytesIO(f.read())
self.assertEqual(
self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary.read(),
pseudofile.read())
self.assertTrue(self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary)
if _mb := self.mispevent.objects[0].get_attributes_by_relation('malware-sample')[0].malware_binary:
self.assertEqual(_mb.read(), pseudofile.read())
def test_sighting(self):
def test_sighting(self) -> None:
sighting = MISPSighting()
sighting.from_dict(value='1', type='bar', timestamp=11111111)
with open('tests/mispevent_testfiles/sighting.json') as f:
ref_json = json.load(f)
self.assertEqual(sighting.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_existing_event(self):
def test_existing_event(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
with open('tests/mispevent_testfiles/existing_event.json') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_shadow_attributes_existing(self):
def test_shadow_attributes_existing(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/shadow.json')
with open('tests/mispevent_testfiles/shadow.json') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
@unittest.skip("Not supported on MISP.")
def test_shadow_attributes(self):
def test_shadow_attributes(self) -> None:
self.init_event()
p = self.mispevent.add_proposal(type='filename', value='baz.jpg')
del p.uuid
a = self.mispevent.add_attribute('filename', 'bar.exe')
a: MISPAttribute = self.mispevent.add_attribute('filename', 'bar.exe') # type: ignore[assignment]
del a.uuid
p = self.mispevent.attributes[0].add_proposal(type='filename', value='bar.pdf')
del p.uuid
@ -225,15 +225,15 @@ class TestMISPEvent(unittest.TestCase):
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_default_attributes(self):
def test_default_attributes(self) -> None:
self.mispevent.add_object(name='file', strict=True)
a = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}])
a: MISPAttribute = self.mispevent.objects[0].add_attribute('filename', value='bar', Tag=[{'name': 'blah'}]) # type: ignore[assignment]
del a.uuid
a = self.mispevent.objects[0].add_attribute('pattern-in-file', value='baz')
a = self.mispevent.objects[0].add_attribute('pattern-in-file', value='baz') # type: ignore[assignment]
self.assertEqual(a.category, 'Artifacts dropped')
del a.uuid
self.mispevent.add_object(name='file', strict=False, default_attributes_parameters=self.mispevent.objects[0].attributes[0])
a = self.mispevent.objects[1].add_attribute('filename', value='baz')
a = self.mispevent.objects[1].add_attribute('filename', value='baz') # type: ignore[assignment]
del a.uuid
self.mispevent.objects[0].uuid = 'a'
self.mispevent.objects[1].uuid = 'b'
@ -242,16 +242,16 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_default_values(self):
def test_obj_default_values(self) -> None:
self.init_event()
self.mispevent.add_object(name='whois', strict=True)
a = self.mispevent.objects[0].add_attribute('registrar', value='registar.example.com')
a: MISPAttribute = self.mispevent.objects[0].add_attribute('registrar', value='registar.example.com') # type: ignore[assignment]
del a.uuid
a = self.mispevent.objects[0].add_attribute('domain', value='domain.example.com')
a = self.mispevent.objects[0].add_attribute('domain', value='domain.example.com') # type: ignore[assignment]
del a.uuid
a = self.mispevent.objects[0].add_attribute('nameserver', value='ns1.example.com')
a = self.mispevent.objects[0].add_attribute('nameserver', value='ns1.example.com') # type: ignore[assignment]
del a.uuid
a = self.mispevent.objects[0].add_attribute('nameserver', value='ns2.example.com', disable_correlation=False, to_ids=True, category='External analysis')
a = self.mispevent.objects[0].add_attribute('nameserver', value='ns2.example.com', disable_correlation=False, to_ids=True, category='External analysis') # type: ignore[assignment]
del a.uuid
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/def_param.json') as f:
@ -259,7 +259,7 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_references_export(self):
def test_obj_references_export(self) -> None:
self.init_event()
obj1 = MISPObject(name="file")
obj2 = MISPObject(name="url", standalone=False)
@ -272,29 +272,29 @@ class TestMISPEvent(unittest.TestCase):
self.assertTrue("ObjectReference" in obj1.jsonable())
self.assertFalse("ObjectReference" in obj2.jsonable())
def test_event_not_edited(self):
def test_event_not_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
def test_event_edited(self):
def test_event_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.mispevent.info = 'blah'
self.assertTrue(self.mispevent.edited)
def test_event_tag_edited(self):
def test_event_tag_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.add_tag('foo')
self.assertTrue(self.mispevent.edited)
def test_event_attribute_edited(self):
def test_event_attribute_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.mispevent.attributes[0].value = 'blah'
self.assertTrue(self.mispevent.attributes[0].edited)
self.assertFalse(self.mispevent.attributes[1].edited)
self.assertTrue(self.mispevent.edited)
def test_event_attribute_tag_edited(self):
def test_event_attribute_tag_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.attributes[0].tags[0].name = 'blah'
@ -303,14 +303,14 @@ class TestMISPEvent(unittest.TestCase):
self.assertTrue(self.mispevent.attributes[0].edited)
self.assertTrue(self.mispevent.edited)
def test_event_attribute_tag_edited_second(self):
def test_event_attribute_tag_edited_second(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.attributes[0].add_tag(name='blah')
self.assertTrue(self.mispevent.attributes[0].edited)
self.assertTrue(self.mispevent.edited)
def test_event_object_edited(self):
def test_event_object_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.objects[0].comment = 'blah'
@ -318,7 +318,7 @@ class TestMISPEvent(unittest.TestCase):
self.assertFalse(self.mispevent.objects[1].edited)
self.assertTrue(self.mispevent.edited)
def test_event_object_attribute_edited(self):
def test_event_object_attribute_edited(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.objects[0].attributes[0].comment = 'blah'
@ -326,7 +326,7 @@ class TestMISPEvent(unittest.TestCase):
self.assertTrue(self.mispevent.objects[0].edited)
self.assertTrue(self.mispevent.edited)
def test_event_object_attribute_edited_tag(self):
def test_event_object_attribute_edited_tag(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited)
self.mispevent.objects[0].attributes[0].add_tag('blah')
@ -337,12 +337,12 @@ class TestMISPEvent(unittest.TestCase):
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_by_id(self):
def test_obj_by_id(self) -> None:
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
misp_obj = self.mispevent.get_object_by_id(1556)
self.assertEqual(misp_obj.uuid, '5a3cd604-e11c-4de5-bbbf-c170950d210f')
def test_userdefined_object_custom_template(self):
def test_userdefined_object_custom_template(self) -> None:
self.init_event()
with open('tests/mispevent_testfiles/test_object_template/definition.json') as f:
template = json.load(f)
@ -353,16 +353,16 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, '{\'member3\'} are required.')
a = self.mispevent.objects[0].add_attribute('member3', value='foo')
a: MISPAttribute = self.mispevent.objects[0].add_attribute('member3', value='foo') # type: ignore[assignment]
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# Fail on requiredOneOf
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
a = self.mispevent.objects[0].add_attribute('member1', value='bar')
a = self.mispevent.objects[0].add_attribute('member1', value='bar') # type: ignore[assignment]
del a.uuid
a = self.mispevent.objects[0].add_attribute('member1', value='baz')
a = self.mispevent.objects[0].add_attribute('member1', value='baz') # type: ignore[assignment]
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# member1 is not a multiple
@ -376,7 +376,7 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_userdefined_object_custom_dir(self):
def test_userdefined_object_custom_dir(self) -> None:
self.init_event()
self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles')
with self.assertRaises(InvalidMISPObject) as e:
@ -384,16 +384,16 @@ class TestMISPEvent(unittest.TestCase):
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, '{\'member3\'} are required.')
a = self.mispevent.objects[0].add_attribute('member3', value='foo')
a: MISPAttribute = self.mispevent.objects[0].add_attribute('member3', value='foo') # type: ignore[assignment]
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# Fail on requiredOneOf
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
a = self.mispevent.objects[0].add_attribute('member1', value='bar')
a = self.mispevent.objects[0].add_attribute('member1', value='bar') # type: ignore[assignment]
del a.uuid
a = self.mispevent.objects[0].add_attribute('member1', value='baz')
a = self.mispevent.objects[0].add_attribute('member1', value='baz') # type: ignore[assignment]
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# member1 is not a multiple
@ -407,10 +407,10 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_first_last_seen(self):
def test_first_last_seen(self) -> None:
me = MISPEvent()
me.info = 'Test First and Last Seen'
me.date = '2020.01.12'
me.date = '2020.01.12' # type: ignore[assignment]
self.assertEqual(me.date.day, 12)
me.add_attribute('ip-dst', '8.8.8.8', first_seen='06-21-1998', last_seen=1580213607.469571)
self.assertEqual(me.attributes[0].first_seen.year, 1998)
@ -418,11 +418,11 @@ class TestMISPEvent(unittest.TestCase):
now = datetime.now().astimezone()
me.attributes[0].last_seen = now
today = date.today()
me.attributes[0].first_seen = today
me.attributes[0].first_seen = today # type: ignore[assignment]
self.assertEqual(me.attributes[0].first_seen.year, today.year)
self.assertEqual(me.attributes[0].last_seen, now)
def test_feed(self):
def test_feed(self) -> None:
me = MISPEvent()
me.info = 'Test feed'
org = MISPOrganisation()
@ -440,7 +440,7 @@ class TestMISPEvent(unittest.TestCase):
self.assertEqual(feed['Event']['_manifest'][me.uuid]['info'], 'Test feed')
self.assertEqual(len(feed['Event']['Object'][0]['Attribute']), 2)
def test_object_templates(self):
def test_object_templates(self) -> None:
me = MISPEvent()
for template in glob.glob(str(me.misp_objects_path / '*' / 'definition.json')):
with open(template) as f:
@ -459,7 +459,7 @@ class TestMISPEvent(unittest.TestCase):
subset = set(entry['categories']).issubset(me.describe_types['categories'])
self.assertTrue(subset, f'{t_json["name"]} - {obj_relation}')
def test_git_vuln_finder(self):
def test_git_vuln_finder(self) -> None:
with open('tests/git-vuln-finder-quagga.json') as f:
dump = json.load(f)

File diff suppressed because it is too large Load Diff