From 9853f23683cbdf86ec60f9f55cec17b50346ec98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 31 Jan 2024 15:20:31 +0100 Subject: [PATCH] chg: Add a bunch more typing. --- pymisp/tools/abstractgenerator.py | 13 ++++--- pymisp/tools/asnobject.py | 4 +-- pymisp/tools/create_misp_object.py | 6 ++-- pymisp/tools/csvloader.py | 10 +++--- pymisp/tools/domainipobject.py | 9 +++-- pymisp/tools/emailobject.py | 58 +++++++++++++++++------------- pymisp/tools/genericgenerator.py | 5 +-- pymisp/tools/microblogobject.py | 25 ++++++------- 8 files changed, 74 insertions(+), 56 deletions(-) diff --git a/pymisp/tools/abstractgenerator.py b/pymisp/tools/abstractgenerator.py index 2703c46..a3ca26f 100644 --- a/pymisp/tools/abstractgenerator.py +++ b/pymisp/tools/abstractgenerator.py @@ -2,11 +2,14 @@ from __future__ import annotations -from .. import MISPObject -from ..exceptions import InvalidMISPObject from datetime import datetime, date from dateutil.parser import parse +from typing import Any + +from .. import MISPObject +from ..exceptions import InvalidMISPObject + class AbstractMISPObjectGenerator(MISPObject): @@ -21,7 +24,7 @@ class AbstractMISPObjectGenerator(MISPObject): except ValueError: return False - def _sanitize_timestamp(self, timestamp: datetime | date | dict | str | int | float | None = None) -> datetime: + def _sanitize_timestamp(self, timestamp: datetime | date | dict[str, Any] | str | int | float | None = None) -> datetime: if not timestamp: return datetime.now() @@ -42,9 +45,9 @@ class AbstractMISPObjectGenerator(MISPObject): else: raise Exception(f'Unable to convert {timestamp} to a datetime.') - def generate_attributes(self): + def generate_attributes(self) -> None: """Contains the logic where all the values of the object are gathered""" - if hasattr(self, '_parameters'): + if hasattr(self, '_parameters') and self._definition is not None: for object_relation in self._definition['attributes']: value = self._parameters.pop(object_relation, None) if not value: diff --git a/pymisp/tools/asnobject.py b/pymisp/tools/asnobject.py index a51c0b3..685da7a 100644 --- a/pymisp/tools/asnobject.py +++ b/pymisp/tools/asnobject.py @@ -13,12 +13,12 @@ logger = logging.getLogger('pymisp') class ASNObject(AbstractMISPObjectGenerator): - def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: + def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('asn', strict=strict, **kwargs) self._parameters = parameters self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: first = self._sanitize_timestamp(self._parameters.pop('first-seen', None)) self._parameters['first-seen'] = first last = self._sanitize_timestamp(self._parameters.pop('last-seen', None)) diff --git a/pymisp/tools/create_misp_object.py b/pymisp/tools/create_misp_object.py index 31e7048..2efcb90 100644 --- a/pymisp/tools/create_misp_object.py +++ b/pymisp/tools/create_misp_object.py @@ -5,9 +5,8 @@ from __future__ import annotations import logging from io import BytesIO -from typing import Any +from typing import Any, TYPE_CHECKING -from . import FileObject, PEObject, ELFObject, MachOObject, PESectionObject, ELFSectionObject, MachOSectionObject from ..exceptions import MISPObjectException logger = logging.getLogger('pymisp') @@ -28,6 +27,9 @@ except AttributeError: except ImportError: HAS_LIEF = False +if TYPE_CHECKING: + from . import FileObject, PEObject, ELFObject, MachOObject, PESectionObject, ELFSectionObject, MachOSectionObject + class FileTypeNotImplemented(MISPObjectException): pass diff --git a/pymisp/tools/csvloader.py b/pymisp/tools/csvloader.py index 7d68f88..e0452ec 100644 --- a/pymisp/tools/csvloader.py +++ b/pymisp/tools/csvloader.py @@ -3,7 +3,6 @@ from __future__ import annotations from pathlib import Path -from typing import List, Optional import csv from pymisp import MISPObject @@ -11,8 +10,9 @@ from pymisp import MISPObject class CSVLoader(): - def __init__(self, template_name: str, csv_path: Path, fieldnames: list[str] | None = None, has_fieldnames=False, - delimiter: str = ',', quotechar: str = '"'): + def __init__(self, template_name: str, csv_path: Path, + fieldnames: list[str] | None = None, has_fieldnames: bool=False, + delimiter: str = ',', quotechar: str = '"') -> None: self.template_name = template_name self.delimiter = delimiter self.quotechar = quotechar @@ -26,7 +26,7 @@ class CSVLoader(): else: self.has_fieldnames = has_fieldnames - def load(self): + def load(self) -> list[MISPObject]: objects = [] @@ -44,7 +44,7 @@ class CSVLoader(): # Check if the CSV file has a header, and if it matches with the object template tmp_object = MISPObject(self.template_name) - if not tmp_object._definition['attributes']: + if not tmp_object._definition or not tmp_object._definition['attributes']: raise Exception(f'Unable to find the object template ({self.template_name}), impossible to create objects.') allowed_fieldnames = list(tmp_object._definition['attributes'].keys()) for fieldname in self.fieldnames: diff --git a/pymisp/tools/domainipobject.py b/pymisp/tools/domainipobject.py index 1bed317..2269342 100644 --- a/pymisp/tools/domainipobject.py +++ b/pymisp/tools/domainipobject.py @@ -2,20 +2,23 @@ from __future__ import annotations -from .abstractgenerator import AbstractMISPObjectGenerator import logging +from typing import Any + +from .abstractgenerator import AbstractMISPObjectGenerator + logger = logging.getLogger('pymisp') class DomainIPObject(AbstractMISPObjectGenerator): - def __init__(self, parameters: dict, strict: bool = True, **kwargs): + def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__('domain-ip', strict=strict, **kwargs) self._parameters = parameters self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: first = self._sanitize_timestamp(self._parameters.pop('first-seen', None)) self._parameters['first-seen'] = first last = self._sanitize_timestamp(self._parameters.pop('last-seen', None)) diff --git a/pymisp/tools/emailobject.py b/pymisp/tools/emailobject.py index 21e2478..25a85a3 100644 --- a/pymisp/tools/emailobject.py +++ b/pymisp/tools/emailobject.py @@ -10,10 +10,11 @@ from email import policy, message_from_bytes from email.message import EmailMessage from io import BytesIO from pathlib import Path -from typing import Union, List, Tuple, Dict, cast, Any, Optional +from typing import cast, Any from extract_msg import openMsg from extract_msg.msg_classes import MessageBase +from extract_msg.attachments import AttachmentBase, SignedAttachment from extract_msg.properties import FixedLengthProp from RTFDE.exceptions import MalformedEncapsulatedRtf, NotEncapsulatedRtf # type: ignore from RTFDE.deencapsulate import DeEncapsulator # type: ignore @@ -30,15 +31,14 @@ class MISPMsgConverstionError(MISPObjectException): class EMailObject(AbstractMISPObjectGenerator): - def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None, - attach_original_email: bool = True, **kwargs): + def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None, # type: ignore[no-untyped-def] + attach_original_email: bool = True, **kwargs) -> None: super().__init__('email', **kwargs) self.attach_original_email = attach_original_email self.encapsulated_body: str | None = None self.eml_from_msg: bool | None = None - self.raw_emails: dict[str, BytesIO | None] = {'msg': None, - 'eml': None} + self.raw_emails: dict[str, BytesIO | None] = {'msg': None, 'eml': None} self.__pseudofile = self.create_pseudofile(filepath, pseudofile) self.email = self.parse_email() @@ -103,7 +103,7 @@ class EMailObject(AbstractMISPObjectGenerator): eml = self._build_eml(message, body, attachments) return eml - def _extract_msg_objects(self, msg_obj: MessageBase) -> tuple[EmailMessage, dict, list[Any]]: + def _extract_msg_objects(self, msg_obj: MessageBase) -> tuple[EmailMessage, dict[str, Any], list[AttachmentBase] | list[SignedAttachment]]: """Extracts email objects needed to construct an eml from a msg.""" message: EmailMessage = email.message_from_string(msg_obj.header.as_string(), policy=policy.default) # type: ignore body = {} @@ -151,13 +151,12 @@ class EMailObject(AbstractMISPObjectGenerator): attachments = msg_obj.attachments return message, body, attachments - def _build_eml(self, message: EmailMessage, body: dict, attachments: list) -> EmailMessage: + def _build_eml(self, message: EmailMessage, body: dict[str, Any], attachments: list[Any]) -> EmailMessage: """Constructs an eml file from objects extracted from a msg.""" # Order the body objects by increasing complexity and toss any missing objects - body_objects: list[dict] = [body.get('text', {}), - body.get('html', {}), - body.get('rtf', {})] - body_objects = [i for i in body_objects if i != {}] + body_objects: list[dict[str, Any]] = [i for i in [body.get('text'), + body.get('html'), + body.get('rtf')] if i is not None] # If this a non-multipart email then we only need to attach the payload if message.get_content_maintype() != 'multipart': for _body in body_objects: @@ -225,7 +224,7 @@ class EMailObject(AbstractMISPObjectGenerator): return message @staticmethod - def _update_content_disp_properties(msg_attch, eml_attch): + def _update_content_disp_properties(msg_attch: AttachmentBase, eml_attch: EmailMessage) -> None: """Set Content-Disposition params on binary eml objects You currently have to set non-filename content-disp params by hand in python. @@ -235,7 +234,7 @@ class EMailObject(AbstractMISPObjectGenerator): for num, name in attch_cont_disp_props.items(): try: eml_attch.set_param(name, - email.utils.format_datetime(msg_attch.props[num].value), + email.utils.format_datetime(msg_attch.props.getValue(num)), header='Content-Disposition') except KeyError: # It's fine if they don't have those values @@ -256,7 +255,7 @@ class EMailObject(AbstractMISPObjectGenerator): pass return to_return - def generate_attributes(self): + def generate_attributes(self) -> None: # Attach original & Converted if self.attach_original_email is not None: @@ -269,20 +268,28 @@ class EMailObject(AbstractMISPObjectGenerator): message = self.email - for _pref, body in message._find_body(message, preferencelist=['plain', 'html']): + if body := message.get_body(preferencelist=['plain']): comment = f"{body.get_content_type()} body" if self.encapsulated_body == body.get_content_type(): comment += " De-Encapsulated from RTF in original msg." self.add_attribute("email-body", - body.get_content(), + body.as_string(), + comment=comment) + + if body := message.get_body(preferencelist=['html']): + comment = f"{body.get_content_type()} body" + if self.encapsulated_body == body.get_content_type(): + comment += " De-Encapsulated from RTF in original msg." + self.add_attribute("email-body", + body.as_string(), comment=comment) headers = [f"{k}: {v}" for k, v in message.items()] if headers: self.add_attribute("header", "\n".join(headers)) - if "Date" in message and message.get('date').datetime is not None: - self.add_attribute("send-date", message.get('date').datetime) + if "Date" in message and message['date'].datetime is not None: + self.add_attribute("send-date", message['date'].datetime) if "To" in message: self.__add_emails("to", message["To"]) @@ -326,9 +333,9 @@ class EMailObject(AbstractMISPObjectGenerator): self.__generate_received() - def __add_emails(self, typ: str, data: str, insert_display_names: bool = True): - addresses = [] - display_names = [] + def __add_emails(self, typ: str, data: str, insert_display_names: bool = True) -> None: + addresses: list[dict[str, str]] = [] + display_names: list[dict[str, str]] = [] for realname, address in email.utils.getaddresses([data]): if address and realname: @@ -341,16 +348,17 @@ class EMailObject(AbstractMISPObjectGenerator): if realname: display_names.append({"value": realname, "comment": f"{realname} <{address}>"}) - if addresses: - self.add_attributes(typ, *addresses) + for a in addresses: + self.add_attribute(typ, **a) if insert_display_names and display_names: try: - self.add_attributes(f"{typ}-display-name", *display_names) + for d in display_names: + self.add_attribute(f"{typ}-display-name", **d) except NewAttributeError: # email object doesn't support display name for all email addrs pass - def __generate_received(self): + def __generate_received(self) -> None: """ Extract IP addresses from received headers that are not private. Also extract hostnames or domains. """ diff --git a/pymisp/tools/genericgenerator.py b/pymisp/tools/genericgenerator.py index dbe6d50..7279ca3 100644 --- a/pymisp/tools/genericgenerator.py +++ b/pymisp/tools/genericgenerator.py @@ -2,14 +2,15 @@ from __future__ import annotations +from typing import Any + from .abstractgenerator import AbstractMISPObjectGenerator -from typing import List class GenericObjectGenerator(AbstractMISPObjectGenerator): # FIXME: this method is different from the master one, and that's probably not a good idea. - def generate_attributes(self, attributes: list[dict]): # type: ignore + def generate_attributes(self, attributes: list[dict[str, Any]]) -> None: """Generates MISPObjectAttributes from a list of dictionaries. Each entry if the list must be in one of the two following formats: * {: } diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py index 089877c..63d20a1 100644 --- a/pymisp/tools/microblogobject.py +++ b/pymisp/tools/microblogobject.py @@ -2,22 +2,23 @@ from __future__ import annotations +import logging +from typing import Any # NOTE: Reference on how this module is used: https://vvx7.io/posts/2020/05/misp-slack-bot/ from .abstractgenerator import AbstractMISPObjectGenerator -import logging logger = logging.getLogger('pymisp') class MicroblogObject(AbstractMISPObjectGenerator): - def __init__(self, parameters: dict, strict: bool = True, **kwargs): + def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs): # type: ignore[no-untyped-def] super().__init__('microblog', strict=strict, **kwargs) self._parameters = parameters self.generate_attributes() - def generate_attributes(self): + def generate_attributes(self) -> None: # Raw post. if 'post' in self._parameters: self.add_attribute('post', value=self._parameters['post']) @@ -33,7 +34,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Original URL location of the microblog post (potentially malicious. if 'url' in self._parameters: if isinstance(self._parameters.get('url'), list): - for i in self._parameters.get('url'): + for i in self._parameters['url']: self.add_attribute('url', value=i) else: self.add_attribute('url', value=self._parameters['url']) @@ -41,7 +42,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Archive of the original document (Internet Archive, Archive.is, etc). if 'archive' in self._parameters: if isinstance(self._parameters.get('archive'), list): - for i in self._parameters.get('archive'): + for i in self._parameters['archive']: self.add_attribute('archive', value=i) else: self.add_attribute('archive', value=self._parameters['archive']) @@ -75,7 +76,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): "Instagram", "Forum", "Other"] if 'type' in self._parameters: if isinstance(self._parameters.get('type'), list): - for i in self._parameters.get('type'): + for i in self._parameters['type']: if i in type_allowed_values: self.add_attribute('type', value=i) else: @@ -86,7 +87,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"] if 'state' in self._parameters: if isinstance(self._parameters.get('state'), list): - for i in self._parameters.get('state'): + for i in self._parameters['state']: if i in type_allowed_values: self.add_attribute('state', value=i) else: @@ -101,7 +102,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): type_allowed_values = ["Verified", "Unverified", "Unknown"] if 'verified-username' in self._parameters: if isinstance(self._parameters.get('verified-username'), list): - for i in self._parameters.get('verified-username'): + for i in self._parameters['verified-username']: if i in type_allowed_values: self.add_attribute('verified-username', value=i) else: @@ -111,7 +112,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # embedded-link. if 'embedded-link' in self._parameters: if isinstance(self._parameters.get('embedded-link'), list): - for i in self._parameters.get('embedded-link'): + for i in self._parameters['embedded-link']: self.add_attribute('embedded-link', value=i) else: self.add_attribute('embedded-link', value=self._parameters['embedded-link']) @@ -119,7 +120,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # embedded-safe-link if 'embedded-safe-link' in self._parameters: if isinstance(self._parameters.get('embedded-safe-link'), list): - for i in self._parameters.get('embedded-safe-link'): + for i in self._parameters['embedded-safe-link']: self.add_attribute('embedded-safe-link', value=i) else: self.add_attribute('embedded-safe-link', value=self._parameters['embedded-safe-link']) @@ -127,7 +128,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Hashtag into the microblog post. if 'hashtag' in self._parameters: if isinstance(self._parameters.get('hashtag'), list): - for i in self._parameters.get('hashtag'): + for i in self._parameters['hashtag']: self.add_attribute('hashtag', value=i) else: self.add_attribute('hashtag', value=self._parameters['hashtag']) @@ -135,7 +136,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # username quoted if 'username-quoted' in self._parameters: if isinstance(self._parameters.get('username-quoted'), list): - for i in self._parameters.get('username-quoted'): + for i in self._parameters['username-quoted']: self.add_attribute('username-quoted', value=i) else: self.add_attribute('username-quoted', value=self._parameters['username-quoted'])