chg: Initial changes to use new annotations

pull/1135/head
Raphaël Vinot 2024-01-17 13:13:14 +01:00
parent ac0421a7b1
commit 0562c63cec
38 changed files with 731 additions and 723 deletions

View File

@ -1,6 +1,16 @@
[mypy]
ignore_errors = False
strict = True
warn_return_any = False
show_error_context = True
pretty = True
exclude = pymisp/data|example|docs
exclude = feed-generator|examples
# Stuff to remove gradually
disallow_untyped_defs = False
disallow_untyped_calls = False
check_untyped_defs = False
disable_error_code = attr-defined,type-arg,no-untyped-def
[mypy-docs.source.*]
ignore_errors = True

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import logging
import sys
import warnings
@ -59,4 +61,4 @@ try:
pass
logger.debug('pymisp loaded properly')
except ImportError as e:
logger.warning('Unable to load pymisp properly: {}'.format(e))
logger.warning(f'Unable to load pymisp properly: {e}')

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3
from __future__ import annotations
import logging
from datetime import date, datetime
from deprecated import deprecated # type: ignore
@ -6,14 +9,14 @@ from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum
from typing import Union, Optional, Any, Dict, List, Set, Mapping
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache
from pathlib import Path
try:
import orjson # type: ignore
from orjson import loads, dumps # type: ignore
from orjson import loads, dumps
HAS_ORJSON = True
except ImportError:
from json import loads, dumps
@ -30,12 +33,12 @@ with (resources_path / 'describeTypes.json').open('rb') as f:
describe_types = loads(f.read())['result']
class MISPFileCache(object):
class MISPFileCache:
# cache up to 150 JSON structures in class attribute
@staticmethod
@lru_cache(maxsize=150)
def _load_json(path: Path) -> Optional[dict]:
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
with path.open('rb') as f:
@ -65,7 +68,7 @@ class Analysis(Enum):
completed = 2
def _int_to_str(d: Dict[str, Any]) -> Dict[str, Any]:
def _int_to_str(d: dict[str, Any]) -> dict[str, Any]:
# transform all integer back to string
for k, v in d.items():
if isinstance(v, dict):
@ -94,7 +97,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
__misp_objects_path = misp_objects_path
__describe_types = describe_types
def __init__(self, **kwargs: Dict):
def __init__(self, **kwargs: dict):
"""Abstract class for all the MISP objects.
NOTE: Every method in every classes inheriting this one are doing
changes in memory and do not modify data on a remote MISP instance.
@ -103,9 +106,9 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
"""
super().__init__()
self.__edited: bool = True # As we create a new object, we assume it is edited
self.__not_jsonable: List[str] = []
self._fields_for_feed: Set
self.__self_defined_describe_types: Optional[Dict] = None
self.__not_jsonable: list[str] = []
self._fields_for_feed: set
self.__self_defined_describe_types: dict | None = None
self.uuid: str
if kwargs.get('force_timestamps') is not None:
@ -115,13 +118,13 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
self.__force_timestamps = False
@property
def describe_types(self) -> Dict:
def describe_types(self) -> dict:
if self.__self_defined_describe_types:
return self.__self_defined_describe_types
return self.__describe_types
@describe_types.setter
def describe_types(self, describe_types: Dict):
def describe_types(self, describe_types: dict):
self.__self_defined_describe_types = describe_types
@property
@ -133,7 +136,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
return self.__misp_objects_path
@misp_objects_path.setter
def misp_objects_path(self, misp_objects_path: Union[str, Path]):
def misp_objects_path(self, misp_objects_path: str | Path):
if isinstance(misp_objects_path, str):
misp_objects_path = Path(misp_objects_path)
self.__misp_objects_path = misp_objects_path
@ -155,7 +158,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
"""Add entries to the __not_jsonable list"""
self.__not_jsonable += args
def set_not_jsonable(self, args: List[str]) -> None:
def set_not_jsonable(self, args: list[str]) -> None:
"""Set __not_jsonable to a new list"""
self.__not_jsonable = args
@ -171,7 +174,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
"""Load a JSON string"""
self.from_dict(**loads(json_string))
def to_dict(self, json_format: bool = False) -> Dict:
def to_dict(self, json_format: bool = False) -> dict:
"""Dump the class to a dictionary.
This method automatically removes the timestamp recursively in every object
that has been edited is order to let MISP update the event accordingly."""
@ -213,15 +216,15 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
to_return = _int_to_str(to_return)
return to_return
def jsonable(self) -> Dict:
def jsonable(self) -> dict:
"""This method is used by the JSON encoder"""
return self.to_dict()
def _to_feed(self) -> Dict:
def _to_feed(self) -> dict:
if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed:
raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.')
if hasattr(self, '_set_default') and callable(self._set_default): # type: ignore
self._set_default() # type: ignore
if hasattr(self, '_set_default') and callable(self._set_default):
self._set_default()
to_return = {}
for field in sorted(self._fields_for_feed):
if getattr(self, field, None) is not None:
@ -235,11 +238,11 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
if field in ['data', 'first_seen', 'last_seen', 'deleted']:
# special fields
continue
raise PyMISPError('The field {} is required in {} when generating a feed.'.format(field, self.__class__.__name__))
raise PyMISPError(f'The field {field} is required in {self.__class__.__name__} when generating a feed.')
to_return = _int_to_str(to_return)
return to_return
def to_json(self, sort_keys: bool = False, indent: Optional[int] = None) -> str:
def to_json(self, sort_keys: bool = False, indent: int | None = None) -> str:
"""Dump recursively any class of type MISPAbstract to a json string"""
if HAS_ORJSON:
option = 0
@ -320,14 +323,14 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
self.__edited = True
super().__setattr__(name, value)
def _datetime_to_timestamp(self, d: Union[int, float, str, datetime]) -> int:
def _datetime_to_timestamp(self, d: int | float | str | datetime) -> int:
"""Convert a datetime object to a timestamp (int)"""
if isinstance(d, (int, float, str)):
# Assume we already have a timestamp
return int(d)
return int(d.timestamp())
def _add_tag(self, tag: Optional[Union[str, 'MISPTag', Mapping]] = None, **kwargs):
def _add_tag(self, tag: str | MISPTag | Mapping | None = None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(tag, str):
misp_tag = MISPTag()
@ -347,7 +350,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
self.edited = True
return misp_tag
def _set_tags(self, tags: List['MISPTag']):
def _set_tags(self, tags: list[MISPTag]):
"""Set a list of prepared MISPTag."""
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
@ -363,19 +366,19 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
return False
def __repr__(self) -> str:
return '<{self.__class__.__name__} - please define me>'.format(self=self)
return f'<{self.__class__.__name__} - please define me>'
class MISPTag(AbstractMISP):
_fields_for_feed: set = {'name', 'colour', 'relationship_type', 'local'}
def __init__(self, **kwargs: Dict):
def __init__(self, **kwargs: dict):
super().__init__(**kwargs)
self.name: str
self.exportable: bool
self.local: bool
self.relationship_type: Optional[str]
self.relationship_type: str | None
def from_dict(self, **kwargs):
if kwargs.get('Tag'):
@ -390,7 +393,7 @@ class MISPTag(AbstractMISP):
if not hasattr(self, 'local'):
self.local = False
def _to_feed(self, with_local: bool = True) -> Dict:
def _to_feed(self, with_local: bool = True) -> dict:
if hasattr(self, 'exportable') and not self.exportable:
return {}
if with_local is False and hasattr(self, 'local') and self.local:
@ -404,11 +407,11 @@ class MISPTag(AbstractMISP):
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})>'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)>'.format(self=self)
return f'<{self.__class__.__name__}(NotInitialized)>'
# UUID, datetime, date and Enum is serialized by ORJSON by default
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[Dict, str]:
def pymisp_json_default(obj: AbstractMISP | datetime | date | Enum | UUID) -> dict | str:
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime, date)):

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
class PyMISPError(Exception):
def __init__(self, message):
super(PyMISPError, self).__init__(message)
super().__init__(message)
self.message = message

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from .vtreportobject import VTReportObject # noqa
from .neo4j import Neo4j # noqa
from .fileobject import FileObject # noqa

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
import ipaddress
import socket
@ -12,7 +13,7 @@ class UrlNotDecoded(Exception):
pass
class PSLFaup(object):
class PSLFaup:
"""
Fake Faup Python Library using PSL for Windows support
"""
@ -64,7 +65,7 @@ class PSLFaup(object):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
netloc = self.get_host() + ('' if self.get_port() is None else ':{}'.format(self.get_port()))
netloc = self.get_host() + ('' if self.get_port() is None else f':{self.get_port()}')
return _ensure_bytes(
urlunparse(
(self.get_scheme(), netloc, self.get_resource_path(),

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .. import MISPObject
from ..exceptions import InvalidMISPObject
@ -10,7 +11,7 @@ from typing import Union, Optional
class AbstractMISPObjectGenerator(MISPObject):
def _detect_epoch(self, timestamp: Union[str, int, float]) -> bool:
def _detect_epoch(self, timestamp: str | int | float) -> bool:
try:
tmp = float(timestamp)
if tmp < 30000000:
@ -21,7 +22,7 @@ class AbstractMISPObjectGenerator(MISPObject):
except ValueError:
return False
def _sanitize_timestamp(self, timestamp: Optional[Union[datetime, date, dict, str, int, float]] = None) -> datetime:
def _sanitize_timestamp(self, timestamp: datetime | date | dict | str | int | float | None = None) -> datetime:
if not timestamp:
return datetime.now()

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from io import BytesIO
@ -31,7 +32,7 @@ class FileTypeNotImplemented(MISPObjectException):
pass
def make_binary_objects(filepath: Optional[str] = None, pseudofile: Optional[BytesIO] = None, filename: Optional[str] = None, standalone: bool = True, default_attributes_parameters: dict = {}):
def make_binary_objects(filepath: str | None = None, pseudofile: BytesIO | None = None, filename: str | None = None, standalone: bool = True, default_attributes_parameters: dict = {}):
misp_file = FileObject(filepath=filepath, pseudofile=pseudofile, filename=filename,
standalone=standalone, default_attributes_parameters=default_attributes_parameters)
if HAS_LIEF and (filepath or (pseudofile and filename)):

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from typing import List, Optional
@ -10,7 +11,7 @@ from pymisp import MISPObject
class CSVLoader():
def __init__(self, template_name: str, csv_path: Path, fieldnames: Optional[List[str]] = None, has_fieldnames=False,
def __init__(self, template_name: str, csv_path: Path, fieldnames: list[str] | None = None, has_fieldnames=False,
delimiter: str = ',', quotechar: str = '"'):
self.template_name = template_name
self.delimiter = delimiter

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
from ..exceptions import InvalidMISPObject
@ -32,7 +33,7 @@ def make_elf_objects(lief_parsed: lief.ELF.Binary, misp_file: FileObject, standa
class ELFObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: Optional[lief.ELF.Binary] = None, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, **kwargs):
def __init__(self, parsed: lief.ELF.Binary | None = None, filepath: Path | str | None = None, pseudofile: BytesIO | None = None, **kwargs):
"""Creates an ELF object, with lief"""
super().__init__('elf', **kwargs)
if not HAS_PYDEEP:
@ -43,7 +44,7 @@ class ELFObject(AbstractMISPObjectGenerator):
elif isinstance(pseudofile, bytes):
self.__elf = lief.ELF.parse(raw=pseudofile)
else:
raise InvalidMISPObject('Pseudo file can be BytesIO or bytes got {}'.format(type(pseudofile)))
raise InvalidMISPObject(f'Pseudo file can be BytesIO or bytes got {type(pseudofile)}')
elif filepath:
self.__elf = lief.ELF.parse(filepath)
elif parsed:
@ -51,7 +52,7 @@ class ELFObject(AbstractMISPObjectGenerator):
if isinstance(parsed, lief.ELF.Binary):
self.__elf = parsed
else:
raise InvalidMISPObject('Not a lief.ELF.Binary: {}'.format(type(parsed)))
raise InvalidMISPObject(f'Not a lief.ELF.Binary: {type(parsed)}')
self.generate_attributes()
def generate_attributes(self):
@ -68,7 +69,7 @@ class ELFObject(AbstractMISPObjectGenerator):
if not section.name:
continue
s = ELFSectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', 'Section {} of ELF'.format(pos))
self.add_reference(s.uuid, 'includes', f'Section {pos} of ELF')
pos += 1
self.sections.append(s)
self.add_attribute('number-sections', value=len(self.sections))
@ -80,7 +81,7 @@ class ELFSectionObject(AbstractMISPObjectGenerator):
"""Creates an ELF Section object. Object generated by ELFObject."""
# Python3 way
# super().__init__('pe-section')
super(ELFSectionObject, self).__init__('elf-section', **kwargs)
super().__init__('elf-section', **kwargs)
self.__section = section
self.__data = bytes(self.__section.content)
self.generate_attributes()

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
import logging
@ -29,14 +30,14 @@ class MISPMsgConverstionError(MISPObjectException):
class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None,
def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None,
attach_original_email: bool = True, **kwargs):
super().__init__('email', **kwargs)
self.attach_original_email = attach_original_email
self.encapsulated_body: Union[str, None] = None
self.eml_from_msg: Union[bool, None] = None
self.raw_emails: Dict[str, Union[BytesIO, None]] = {'msg': None,
self.encapsulated_body: str | None = None
self.eml_from_msg: bool | None = None
self.raw_emails: dict[str, BytesIO | None] = {'msg': None,
'eml': None}
self.__pseudofile = self.create_pseudofile(filepath, pseudofile)
@ -66,7 +67,7 @@ class EMailObject(AbstractMISPObjectGenerator):
return message
except ValueError as _e: # Exception
logger.debug("Email not in .msg format or is a corrupted .msg. Attempting to decode email from other formats.")
logger.debug("Error: {} ".format(_e))
logger.debug(f"Error: {_e} ")
try:
if content_in_bytes[:3] == b'\xef\xbb\xbf': # utf-8-sig byte-order mark (BOM)
eml_bytes = content_in_bytes.decode("utf_8_sig").encode("utf-8")
@ -81,8 +82,8 @@ class EMailObject(AbstractMISPObjectGenerator):
raise PyMISPNotImplementedYet("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.")
@staticmethod
def create_pseudofile(filepath: Optional[Union[Path, str]] = None,
pseudofile: Optional[BytesIO] = None) -> BytesIO:
def create_pseudofile(filepath: Path | str | None = None,
pseudofile: BytesIO | None = None) -> BytesIO:
"""Creates a pseudofile using directly passed data or data loaded from file path.
"""
if filepath:
@ -102,7 +103,7 @@ class EMailObject(AbstractMISPObjectGenerator):
eml = self._build_eml(message, body, attachments)
return eml
def _extract_msg_objects(self, msg_obj: MessageBase) -> Tuple[EmailMessage, Dict, List[Any]]:
def _extract_msg_objects(self, msg_obj: MessageBase) -> tuple[EmailMessage, dict, list[Any]]:
"""Extracts email objects needed to construct an eml from a msg."""
message: EmailMessage = email.message_from_string(msg_obj.header.as_string(), policy=policy.default) # type: ignore
body = {}
@ -153,14 +154,14 @@ class EMailObject(AbstractMISPObjectGenerator):
def _build_eml(self, message: EmailMessage, body: dict, attachments: list) -> EmailMessage:
"""Constructs an eml file from objects extracted from a msg."""
# Order the body objects by increasing complexity and toss any missing objects
body_objects: List[dict] = [body.get('text', {}),
body_objects: list[dict] = [body.get('text', {}),
body.get('html', {}),
body.get('rtf', {})]
body_objects = [i for i in body_objects if i != {}]
# If this a non-multipart email then we only need to attach the payload
if message.get_content_maintype() != 'multipart':
for _body in body_objects:
if "text/{0}".format(_body['subtype']) == message.get_content_type():
if "text/{}".format(_body['subtype']) == message.get_content_type():
message.set_content(**_body)
return message
raise MISPMsgConverstionError("Unable to find appropriate eml payload in message body.")
@ -172,7 +173,7 @@ class EMailObject(AbstractMISPObjectGenerator):
if isinstance(body.get('html', None), dict):
_html = body.get('html', {}).get('obj')
for attch in attachments:
if _html.find("cid:{0}".format(attch.cid)) != -1:
if _html.find(f"cid:{attch.cid}") != -1:
_content_type = attch.getStringStream('__substg1.0_370E')
maintype, subtype = _content_type.split("/", 1)
related_content[attch.cid] = (attch,
@ -241,7 +242,7 @@ class EMailObject(AbstractMISPObjectGenerator):
pass
@property
def attachments(self) -> List[Tuple[Optional[str], BytesIO]]:
def attachments(self) -> list[tuple[str | None, BytesIO]]:
to_return = []
try:
for attachment in self.email.iter_attachments():
@ -269,14 +270,14 @@ class EMailObject(AbstractMISPObjectGenerator):
message = self.email
for _pref, body in message._find_body(message, preferencelist=['plain', 'html']):
comment = "{0} body".format(body.get_content_type())
comment = f"{body.get_content_type()} body"
if self.encapsulated_body == body.get_content_type():
comment += " De-Encapsulated from RTF in original msg."
self.add_attribute("email-body",
body.get_content(),
comment=comment)
headers = ["{}: {}".format(k, v) for k, v in message.items()]
headers = [f"{k}: {v}" for k, v in message.items()]
if headers:
self.add_attribute("header", "\n".join(headers))
@ -331,20 +332,20 @@ class EMailObject(AbstractMISPObjectGenerator):
for realname, address in email.utils.getaddresses([data]):
if address and realname:
addresses.append({"value": address, "comment": "{} <{}>".format(realname, address)})
addresses.append({"value": address, "comment": f"{realname} <{address}>"})
elif address:
addresses.append({"value": address})
else: # parsing failed, skip
continue
if realname:
display_names.append({"value": realname, "comment": "{} <{}>".format(realname, address)})
display_names.append({"value": realname, "comment": f"{realname} <{address}>"})
if addresses:
self.add_attributes(typ, *addresses)
if insert_display_names and display_names:
try:
self.add_attributes("{}-display-name".format(typ), *display_names)
self.add_attributes(f"{typ}-display-name", *display_names)
except NewAttributeError:
# email object doesn't support display name for all email addrs
pass

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
try:
from pymispgalaxies import Clusters # type: ignore

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from pymisp import MISPEvent
@ -9,7 +10,7 @@ from typing import List
def feed_meta_generator(path: Path):
manifests = {}
hashes: List[str] = []
hashes: list[str] = []
for f_name in path.glob('*.json'):
if str(f_name.name) == 'manifest.json':

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
@ -22,7 +23,7 @@ except ImportError:
HAS_PYDEEP = False
try:
import magic # type: ignore
import magic
HAS_MAGIC = True
except ImportError:
HAS_MAGIC = False
@ -30,7 +31,7 @@ except ImportError:
class FileObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, filename: Optional[str] = None, **kwargs) -> None:
def __init__(self, filepath: Path | str | None = None, pseudofile: BytesIO | None = None, filename: str | None = None, **kwargs) -> None:
super().__init__('file', **kwargs)
if not HAS_PYDEEP:
logger.warning("pydeep is missing, please install pymisp this way: pip install pymisp[fileobjects]")

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
from typing import List
@ -8,7 +9,7 @@ from typing import List
class GenericObjectGenerator(AbstractMISPObjectGenerator):
# FIXME: this method is different from the master one, and that's probably not a good idea.
def generate_attributes(self, attributes: List[dict]): # type: ignore
def generate_attributes(self, attributes: list[dict]): # type: ignore
"""Generates MISPObjectAttributes from a list of dictionaries.
Each entry if the list must be in one of the two following formats:
* {<object_relation>: <value>}

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
try:
from pymispwarninglists import WarningLists # type: ignore

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
@ -32,7 +33,7 @@ def make_macho_objects(lief_parsed: lief.MachO.Binary, misp_file: FileObject, st
class MachOObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: Optional[lief.MachO.Binary] = None, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, **kwargs):
def __init__(self, parsed: lief.MachO.Binary | None = None, filepath: Path | str | None = None, pseudofile: BytesIO | None = None, **kwargs):
"""Creates an MachO object, with lief"""
super().__init__('macho', **kwargs)
if not HAS_PYDEEP:
@ -43,7 +44,7 @@ class MachOObject(AbstractMISPObjectGenerator):
elif isinstance(pseudofile, bytes):
self.__macho = lief.MachO.parse(raw=pseudofile)
else:
raise InvalidMISPObject('Pseudo file can be BytesIO or bytes got {}'.format(type(pseudofile)))
raise InvalidMISPObject(f'Pseudo file can be BytesIO or bytes got {type(pseudofile)}')
elif filepath:
self.__macho = lief.MachO.parse(filepath)
elif parsed:
@ -51,7 +52,7 @@ class MachOObject(AbstractMISPObjectGenerator):
if isinstance(parsed, lief.MachO.Binary):
self.__macho = parsed
else:
raise InvalidMISPObject('Not a lief.MachO.Binary: {}'.format(type(parsed)))
raise InvalidMISPObject(f'Not a lief.MachO.Binary: {type(parsed)}')
self.generate_attributes()
def generate_attributes(self):
@ -66,7 +67,7 @@ class MachOObject(AbstractMISPObjectGenerator):
pos = 0
for section in self.__macho.sections:
s = MachOSectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', 'Section {} of MachO'.format(pos))
self.add_reference(s.uuid, 'includes', f'Section {pos} of MachO')
pos += 1
self.sections.append(s)
self.add_attribute('number-sections', value=len(self.sections))
@ -78,7 +79,7 @@ class MachOSectionObject(AbstractMISPObjectGenerator):
"""Creates an MachO Section object. Object generated by MachOObject."""
# Python3 way
# super().__init__('pe-section')
super(MachOSectionObject, self).__init__('macho-section', **kwargs)
super().__init__('macho-section', **kwargs)
self.__section = section
self.__data = bytes(self.__section.content)
self.generate_attributes()

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
# NOTE: Reference on how this module is used: https://vvx7.io/posts/2020/05/misp-slack-bot/

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import glob
import os
@ -17,7 +17,7 @@ class Neo4j():
if not has_py2neo:
raise Exception('py2neo is required, please install: pip install py2neo')
authenticate(host, username, password)
self.graph = Graph("http://{}/db/data/".format(host))
self.graph = Graph(f"http://{host}/db/data/")
def load_events_directory(self, directory):
self.events = []

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
@ -156,7 +155,7 @@ def extract_field(report, field_name):
def load_openioc_file(openioc_path):
if not os.path.exists(openioc_path):
raise Exception("Path doesn't exists.")
with open(openioc_path, 'r') as f:
with open(openioc_path) as f:
return load_openioc(f)

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
@ -35,7 +36,7 @@ def make_pe_objects(lief_parsed: lief.PE.Binary, misp_file: FileObject, standalo
class PEObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: Optional[lief.PE.Binary] = None, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, **kwargs):
def __init__(self, parsed: lief.PE.Binary | None = None, filepath: Path | str | None = None, pseudofile: BytesIO | None = None, **kwargs):
"""Creates an PE object, with lief"""
super().__init__('pe', **kwargs)
if not HAS_PYDEEP:
@ -46,7 +47,7 @@ class PEObject(AbstractMISPObjectGenerator):
elif isinstance(pseudofile, bytes):
self.__pe = lief.PE.parse(raw=pseudofile)
else:
raise InvalidMISPObject('Pseudo file can be BytesIO or bytes got {}'.format(type(pseudofile)))
raise InvalidMISPObject(f'Pseudo file can be BytesIO or bytes got {type(pseudofile)}')
elif filepath:
self.__pe = lief.PE.parse(filepath)
elif parsed:
@ -54,7 +55,7 @@ class PEObject(AbstractMISPObjectGenerator):
if isinstance(parsed, lief.PE.Binary):
self.__pe = parsed
else:
raise InvalidMISPObject('Not a lief.PE.Binary: {}'.format(type(parsed)))
raise InvalidMISPObject(f'Not a lief.PE.Binary: {type(parsed)}')
self.generate_attributes()
def _is_exe(self):
@ -67,7 +68,7 @@ class PEObject(AbstractMISPObjectGenerator):
def _is_driver(self):
# List from pefile
system_DLLs = set(('ntoskrnl.exe', 'hal.dll', 'ndis.sys', 'bootvid.dll', 'kdcom.dll'))
system_DLLs = {'ntoskrnl.exe', 'hal.dll', 'ndis.sys', 'bootvid.dll', 'kdcom.dll'}
if system_DLLs.intersection([imp.lower() for imp in self.__pe.libraries]):
return True
return False
@ -116,10 +117,10 @@ class PEObject(AbstractMISPObjectGenerator):
# Skip section if name is none AND size is 0.
continue
s = PESectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', 'Section {} of PE'.format(pos))
self.add_reference(s.uuid, 'includes', f'Section {pos} of PE')
if ((self.__pe.entrypoint >= section.virtual_address)
and (self.__pe.entrypoint < (section.virtual_address + section.virtual_size))):
self.add_attribute('entrypoint-section-at-position', value='{}|{}'.format(section.name, pos))
self.add_attribute('entrypoint-section-at-position', value=f'{section.name}|{pos}')
pos += 1
self.sections.append(s)
self.add_attribute('number-sections', value=len(self.sections))

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
# Standard imports
import base64
@ -49,7 +50,7 @@ def create_flowable_tag(misp_tag):
return [Flowable_Tag(text=misp_tag.name, color=misp_tag.colour, custom_style=col1_style)]
class Flowable_Tag(Flowable):
class Flowable_Tag(Flowable): # type: ignore[misc]
"""
Custom flowable to handle tags. Draw one Tag with the webview formatting
Modified from : http://two.pairlist.net/pipermail/reportlab-users/2005-February/003695.html
@ -108,7 +109,7 @@ class Flowable_Tag(Flowable):
LEFT_INTERNAL_PADDING = 2
ELONGATION = LEFT_INTERNAL_PADDING * 2
p = Paragraph("<font color='{}'>{}</font>".format(self.choose_good_text_color(), self.text), style=self.custom_style)
p = Paragraph(f"<font color='{self.choose_good_text_color()}'>{self.text}</font>", style=self.custom_style)
string_width = stringWidth(self.text, self.custom_style.fontName, self.custom_style.fontSize)
self.width = string_width + ELONGATION
@ -615,7 +616,7 @@ class Value_Formatter():
curr_uuid = str(is_safe_value(uuid))
curr_baseurl = self.config[moduleconfig[0]]
curr_url = uuid_to_url(curr_baseurl, curr_uuid)
html_url = "<a href={}>{}</a>".format(curr_url, safe_string(text))
html_url = f"<a href={curr_url}>{safe_string(text)}</a>"
if color:
# They want fancy colors
@ -744,7 +745,7 @@ class Value_Formatter():
answer = YES_ANSWER
if is_safe_value(published_timestamp):
# Published and have published date
answer += '({})'.format(published_timestamp.strftime(EXPORT_DATE_FORMAT))
answer += f'({published_timestamp.strftime(EXPORT_DATE_FORMAT)})'
else:
# Published without published date
answer += "(no date)"

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
@ -13,11 +14,11 @@ logger = logging.getLogger('pymisp')
class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
def __init__(self, authorized_keys_path: Optional[Union[Path, str]] = None, authorized_keys_pseudofile: Optional[StringIO] = None, **kwargs):
def __init__(self, authorized_keys_path: Path | str | None = None, authorized_keys_pseudofile: StringIO | None = None, **kwargs):
# PY3 way:
super().__init__('ssh-authorized-keys', **kwargs)
if authorized_keys_path:
with open(authorized_keys_path, 'r') as f:
with open(authorized_keys_path) as f:
self.__pseudofile = StringIO(f.read())
elif authorized_keys_pseudofile and isinstance(authorized_keys_pseudofile, StringIO):
self.__pseudofile = authorized_keys_pseudofile

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
try:
from misp_stix_converter.converters.buildMISPAttribute import buildEvent # type: ignore

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import zipfile
from io import BytesIO

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging

View File

@ -1,5 +1,7 @@
#!/usr/bin/python3
from __future__ import annotations
import requests
import json
@ -66,7 +68,7 @@ class VehicleObject(AbstractMISPObjectGenerator):
self.add_attribute('image-url', type='text', value=ImageUrl)
def _query(self):
payload = "RegistrationNumber={}&username={}".format(self._registration, self._username)
payload = f"RegistrationNumber={self._registration}&username={self._username}"
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'cache-control': "no-cache",

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
from typing import Optional
@ -24,7 +25,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
:indicator: IOC to search VirusTotal for
'''
def __init__(self, apikey: str, indicator: str, vt_proxies: Optional[dict] = None, **kwargs):
def __init__(self, apikey: str, indicator: str, vt_proxies: dict | None = None, **kwargs):
super().__init__('virustotal-report', **kwargs)
indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator)
@ -33,7 +34,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
self._report = self.__query_virustotal(apikey, indicator)
self.generate_attributes()
else:
error_msg = "A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{}' instead".format(indicator)
error_msg = f"A valid indicator is required. (One of type url, md5, sha1, sha256). Received '{indicator}' instead"
raise InvalidMISPObject(error_msg)
def get_report(self):
@ -70,7 +71,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
:resource: Indicator to search in VirusTotal
'''
url = "https://www.virustotal.com/vtapi/v2/{}/report".format(self._resource_type)
url = f"https://www.virustotal.com/vtapi/v2/{self._resource_type}/report"
params = {"apikey": apikey, "resource": resource}
# for now assume we're using a public API key - we'll figure out private keys later
if self._proxies:

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
@ -13,7 +12,7 @@ import json
from pathlib import Path
import hashlib
import urllib3 # type: ignore
import urllib3
import time
from uuid import uuid4
@ -31,11 +30,7 @@ try:
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
from pymisp.exceptions import MISPServerError
except ImportError:
if sys.version_info < (3, 6):
print('This test suite requires Python 3.6+, breaking.')
sys.exit(0)
else:
raise
raise
try:
from keys import url, key # type: ignore

View File

@ -1,23 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
import unittest
import subprocess
import urllib3 # type: ignore
import urllib3
import logging
logging.disable(logging.CRITICAL)
try:
from pymisp import ExpandedPyMISP, MISPOrganisation, MISPUser, MISPEvent, MISPObject, MISPSharingGroup, Distribution
except ImportError:
if sys.version_info < (3, 6):
print('This test suite requires Python 3.6+, breaking.')
sys.exit(0)
else:
raise
raise
key = 'eYQdGTEWZJ8C2lm9EpnMqxQGwGiPNyoR75JvLdlE'
verifycert = False