chg: Add a bunch more typing.

pull/1144/head
Raphaël Vinot 2024-01-31 15:20:31 +01:00
parent ca0fb8dc99
commit 9853f23683
8 changed files with 74 additions and 56 deletions

View File

@ -2,11 +2,14 @@
from __future__ import annotations from __future__ import annotations
from .. import MISPObject
from ..exceptions import InvalidMISPObject
from datetime import datetime, date from datetime import datetime, date
from dateutil.parser import parse from dateutil.parser import parse
from typing import Any
from .. import MISPObject
from ..exceptions import InvalidMISPObject
class AbstractMISPObjectGenerator(MISPObject): class AbstractMISPObjectGenerator(MISPObject):
@ -21,7 +24,7 @@ class AbstractMISPObjectGenerator(MISPObject):
except ValueError: except ValueError:
return False return False
def _sanitize_timestamp(self, timestamp: datetime | date | dict | str | int | float | None = None) -> datetime: def _sanitize_timestamp(self, timestamp: datetime | date | dict[str, Any] | str | int | float | None = None) -> datetime:
if not timestamp: if not timestamp:
return datetime.now() return datetime.now()
@ -42,9 +45,9 @@ class AbstractMISPObjectGenerator(MISPObject):
else: else:
raise Exception(f'Unable to convert {timestamp} to a datetime.') raise Exception(f'Unable to convert {timestamp} to a datetime.')
def generate_attributes(self): def generate_attributes(self) -> None:
"""Contains the logic where all the values of the object are gathered""" """Contains the logic where all the values of the object are gathered"""
if hasattr(self, '_parameters'): if hasattr(self, '_parameters') and self._definition is not None:
for object_relation in self._definition['attributes']: for object_relation in self._definition['attributes']:
value = self._parameters.pop(object_relation, None) value = self._parameters.pop(object_relation, None)
if not value: if not value:

View File

@ -13,12 +13,12 @@ logger = logging.getLogger('pymisp')
class ASNObject(AbstractMISPObjectGenerator): class ASNObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('asn', strict=strict, **kwargs) super().__init__('asn', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()
def generate_attributes(self): def generate_attributes(self) -> None:
first = self._sanitize_timestamp(self._parameters.pop('first-seen', None)) first = self._sanitize_timestamp(self._parameters.pop('first-seen', None))
self._parameters['first-seen'] = first self._parameters['first-seen'] = first
last = self._sanitize_timestamp(self._parameters.pop('last-seen', None)) last = self._sanitize_timestamp(self._parameters.pop('last-seen', None))

View File

@ -5,9 +5,8 @@ from __future__ import annotations
import logging import logging
from io import BytesIO from io import BytesIO
from typing import Any from typing import Any, TYPE_CHECKING
from . import FileObject, PEObject, ELFObject, MachOObject, PESectionObject, ELFSectionObject, MachOSectionObject
from ..exceptions import MISPObjectException from ..exceptions import MISPObjectException
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
@ -28,6 +27,9 @@ except AttributeError:
except ImportError: except ImportError:
HAS_LIEF = False HAS_LIEF = False
if TYPE_CHECKING:
from . import FileObject, PEObject, ELFObject, MachOObject, PESectionObject, ELFSectionObject, MachOSectionObject
class FileTypeNotImplemented(MISPObjectException): class FileTypeNotImplemented(MISPObjectException):
pass pass

View File

@ -3,7 +3,6 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import List, Optional
import csv import csv
from pymisp import MISPObject from pymisp import MISPObject
@ -11,8 +10,9 @@ from pymisp import MISPObject
class CSVLoader(): class CSVLoader():
def __init__(self, template_name: str, csv_path: Path, fieldnames: list[str] | None = None, has_fieldnames=False, def __init__(self, template_name: str, csv_path: Path,
delimiter: str = ',', quotechar: str = '"'): fieldnames: list[str] | None = None, has_fieldnames: bool=False,
delimiter: str = ',', quotechar: str = '"') -> None:
self.template_name = template_name self.template_name = template_name
self.delimiter = delimiter self.delimiter = delimiter
self.quotechar = quotechar self.quotechar = quotechar
@ -26,7 +26,7 @@ class CSVLoader():
else: else:
self.has_fieldnames = has_fieldnames self.has_fieldnames = has_fieldnames
def load(self): def load(self) -> list[MISPObject]:
objects = [] objects = []
@ -44,7 +44,7 @@ class CSVLoader():
# Check if the CSV file has a header, and if it matches with the object template # Check if the CSV file has a header, and if it matches with the object template
tmp_object = MISPObject(self.template_name) tmp_object = MISPObject(self.template_name)
if not tmp_object._definition['attributes']: if not tmp_object._definition or not tmp_object._definition['attributes']:
raise Exception(f'Unable to find the object template ({self.template_name}), impossible to create objects.') raise Exception(f'Unable to find the object template ({self.template_name}), impossible to create objects.')
allowed_fieldnames = list(tmp_object._definition['attributes'].keys()) allowed_fieldnames = list(tmp_object._definition['attributes'].keys())
for fieldname in self.fieldnames: for fieldname in self.fieldnames:

View File

@ -2,20 +2,23 @@
from __future__ import annotations from __future__ import annotations
from .abstractgenerator import AbstractMISPObjectGenerator
import logging import logging
from typing import Any
from .abstractgenerator import AbstractMISPObjectGenerator
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
class DomainIPObject(AbstractMISPObjectGenerator): class DomainIPObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool = True, **kwargs): def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('domain-ip', strict=strict, **kwargs) super().__init__('domain-ip', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()
def generate_attributes(self): def generate_attributes(self) -> None:
first = self._sanitize_timestamp(self._parameters.pop('first-seen', None)) first = self._sanitize_timestamp(self._parameters.pop('first-seen', None))
self._parameters['first-seen'] = first self._parameters['first-seen'] = first
last = self._sanitize_timestamp(self._parameters.pop('last-seen', None)) last = self._sanitize_timestamp(self._parameters.pop('last-seen', None))

View File

@ -10,10 +10,11 @@ from email import policy, message_from_bytes
from email.message import EmailMessage from email.message import EmailMessage
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import Union, List, Tuple, Dict, cast, Any, Optional from typing import cast, Any
from extract_msg import openMsg from extract_msg import openMsg
from extract_msg.msg_classes import MessageBase from extract_msg.msg_classes import MessageBase
from extract_msg.attachments import AttachmentBase, SignedAttachment
from extract_msg.properties import FixedLengthProp from extract_msg.properties import FixedLengthProp
from RTFDE.exceptions import MalformedEncapsulatedRtf, NotEncapsulatedRtf # type: ignore from RTFDE.exceptions import MalformedEncapsulatedRtf, NotEncapsulatedRtf # type: ignore
from RTFDE.deencapsulate import DeEncapsulator # type: ignore from RTFDE.deencapsulate import DeEncapsulator # type: ignore
@ -30,15 +31,14 @@ class MISPMsgConverstionError(MISPObjectException):
class EMailObject(AbstractMISPObjectGenerator): class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None, def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | None=None, # type: ignore[no-untyped-def]
attach_original_email: bool = True, **kwargs): attach_original_email: bool = True, **kwargs) -> None:
super().__init__('email', **kwargs) super().__init__('email', **kwargs)
self.attach_original_email = attach_original_email self.attach_original_email = attach_original_email
self.encapsulated_body: str | None = None self.encapsulated_body: str | None = None
self.eml_from_msg: bool | None = None self.eml_from_msg: bool | None = None
self.raw_emails: dict[str, BytesIO | None] = {'msg': None, self.raw_emails: dict[str, BytesIO | None] = {'msg': None, 'eml': None}
'eml': None}
self.__pseudofile = self.create_pseudofile(filepath, pseudofile) self.__pseudofile = self.create_pseudofile(filepath, pseudofile)
self.email = self.parse_email() self.email = self.parse_email()
@ -103,7 +103,7 @@ class EMailObject(AbstractMISPObjectGenerator):
eml = self._build_eml(message, body, attachments) eml = self._build_eml(message, body, attachments)
return eml return eml
def _extract_msg_objects(self, msg_obj: MessageBase) -> tuple[EmailMessage, dict, list[Any]]: def _extract_msg_objects(self, msg_obj: MessageBase) -> tuple[EmailMessage, dict[str, Any], list[AttachmentBase] | list[SignedAttachment]]:
"""Extracts email objects needed to construct an eml from a msg.""" """Extracts email objects needed to construct an eml from a msg."""
message: EmailMessage = email.message_from_string(msg_obj.header.as_string(), policy=policy.default) # type: ignore message: EmailMessage = email.message_from_string(msg_obj.header.as_string(), policy=policy.default) # type: ignore
body = {} body = {}
@ -151,13 +151,12 @@ class EMailObject(AbstractMISPObjectGenerator):
attachments = msg_obj.attachments attachments = msg_obj.attachments
return message, body, attachments return message, body, attachments
def _build_eml(self, message: EmailMessage, body: dict, attachments: list) -> EmailMessage: def _build_eml(self, message: EmailMessage, body: dict[str, Any], attachments: list[Any]) -> EmailMessage:
"""Constructs an eml file from objects extracted from a msg.""" """Constructs an eml file from objects extracted from a msg."""
# Order the body objects by increasing complexity and toss any missing objects # Order the body objects by increasing complexity and toss any missing objects
body_objects: list[dict] = [body.get('text', {}), body_objects: list[dict[str, Any]] = [i for i in [body.get('text'),
body.get('html', {}), body.get('html'),
body.get('rtf', {})] body.get('rtf')] if i is not None]
body_objects = [i for i in body_objects if i != {}]
# If this a non-multipart email then we only need to attach the payload # If this a non-multipart email then we only need to attach the payload
if message.get_content_maintype() != 'multipart': if message.get_content_maintype() != 'multipart':
for _body in body_objects: for _body in body_objects:
@ -225,7 +224,7 @@ class EMailObject(AbstractMISPObjectGenerator):
return message return message
@staticmethod @staticmethod
def _update_content_disp_properties(msg_attch, eml_attch): def _update_content_disp_properties(msg_attch: AttachmentBase, eml_attch: EmailMessage) -> None:
"""Set Content-Disposition params on binary eml objects """Set Content-Disposition params on binary eml objects
You currently have to set non-filename content-disp params by hand in python. You currently have to set non-filename content-disp params by hand in python.
@ -235,7 +234,7 @@ class EMailObject(AbstractMISPObjectGenerator):
for num, name in attch_cont_disp_props.items(): for num, name in attch_cont_disp_props.items():
try: try:
eml_attch.set_param(name, eml_attch.set_param(name,
email.utils.format_datetime(msg_attch.props[num].value), email.utils.format_datetime(msg_attch.props.getValue(num)),
header='Content-Disposition') header='Content-Disposition')
except KeyError: except KeyError:
# It's fine if they don't have those values # It's fine if they don't have those values
@ -256,7 +255,7 @@ class EMailObject(AbstractMISPObjectGenerator):
pass pass
return to_return return to_return
def generate_attributes(self): def generate_attributes(self) -> None:
# Attach original & Converted # Attach original & Converted
if self.attach_original_email is not None: if self.attach_original_email is not None:
@ -269,20 +268,28 @@ class EMailObject(AbstractMISPObjectGenerator):
message = self.email message = self.email
for _pref, body in message._find_body(message, preferencelist=['plain', 'html']): if body := message.get_body(preferencelist=['plain']):
comment = f"{body.get_content_type()} body" comment = f"{body.get_content_type()} body"
if self.encapsulated_body == body.get_content_type(): if self.encapsulated_body == body.get_content_type():
comment += " De-Encapsulated from RTF in original msg." comment += " De-Encapsulated from RTF in original msg."
self.add_attribute("email-body", self.add_attribute("email-body",
body.get_content(), body.as_string(),
comment=comment)
if body := message.get_body(preferencelist=['html']):
comment = f"{body.get_content_type()} body"
if self.encapsulated_body == body.get_content_type():
comment += " De-Encapsulated from RTF in original msg."
self.add_attribute("email-body",
body.as_string(),
comment=comment) comment=comment)
headers = [f"{k}: {v}" for k, v in message.items()] headers = [f"{k}: {v}" for k, v in message.items()]
if headers: if headers:
self.add_attribute("header", "\n".join(headers)) self.add_attribute("header", "\n".join(headers))
if "Date" in message and message.get('date').datetime is not None: if "Date" in message and message['date'].datetime is not None:
self.add_attribute("send-date", message.get('date').datetime) self.add_attribute("send-date", message['date'].datetime)
if "To" in message: if "To" in message:
self.__add_emails("to", message["To"]) self.__add_emails("to", message["To"])
@ -326,9 +333,9 @@ class EMailObject(AbstractMISPObjectGenerator):
self.__generate_received() self.__generate_received()
def __add_emails(self, typ: str, data: str, insert_display_names: bool = True): def __add_emails(self, typ: str, data: str, insert_display_names: bool = True) -> None:
addresses = [] addresses: list[dict[str, str]] = []
display_names = [] display_names: list[dict[str, str]] = []
for realname, address in email.utils.getaddresses([data]): for realname, address in email.utils.getaddresses([data]):
if address and realname: if address and realname:
@ -341,16 +348,17 @@ class EMailObject(AbstractMISPObjectGenerator):
if realname: if realname:
display_names.append({"value": realname, "comment": f"{realname} <{address}>"}) display_names.append({"value": realname, "comment": f"{realname} <{address}>"})
if addresses: for a in addresses:
self.add_attributes(typ, *addresses) self.add_attribute(typ, **a)
if insert_display_names and display_names: if insert_display_names and display_names:
try: try:
self.add_attributes(f"{typ}-display-name", *display_names) for d in display_names:
self.add_attribute(f"{typ}-display-name", **d)
except NewAttributeError: except NewAttributeError:
# email object doesn't support display name for all email addrs # email object doesn't support display name for all email addrs
pass pass
def __generate_received(self): def __generate_received(self) -> None:
""" """
Extract IP addresses from received headers that are not private. Also extract hostnames or domains. Extract IP addresses from received headers that are not private. Also extract hostnames or domains.
""" """

View File

@ -2,14 +2,15 @@
from __future__ import annotations from __future__ import annotations
from typing import Any
from .abstractgenerator import AbstractMISPObjectGenerator from .abstractgenerator import AbstractMISPObjectGenerator
from typing import List
class GenericObjectGenerator(AbstractMISPObjectGenerator): class GenericObjectGenerator(AbstractMISPObjectGenerator):
# FIXME: this method is different from the master one, and that's probably not a good idea. # FIXME: this method is different from the master one, and that's probably not a good idea.
def generate_attributes(self, attributes: list[dict]): # type: ignore def generate_attributes(self, attributes: list[dict[str, Any]]) -> None:
"""Generates MISPObjectAttributes from a list of dictionaries. """Generates MISPObjectAttributes from a list of dictionaries.
Each entry if the list must be in one of the two following formats: Each entry if the list must be in one of the two following formats:
* {<object_relation>: <value>} * {<object_relation>: <value>}

View File

@ -2,22 +2,23 @@
from __future__ import annotations from __future__ import annotations
import logging
from typing import Any
# NOTE: Reference on how this module is used: https://vvx7.io/posts/2020/05/misp-slack-bot/ # NOTE: Reference on how this module is used: https://vvx7.io/posts/2020/05/misp-slack-bot/
from .abstractgenerator import AbstractMISPObjectGenerator from .abstractgenerator import AbstractMISPObjectGenerator
import logging
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
class MicroblogObject(AbstractMISPObjectGenerator): class MicroblogObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool = True, **kwargs): def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs): # type: ignore[no-untyped-def]
super().__init__('microblog', strict=strict, **kwargs) super().__init__('microblog', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()
def generate_attributes(self): def generate_attributes(self) -> None:
# Raw post. # Raw post.
if 'post' in self._parameters: if 'post' in self._parameters:
self.add_attribute('post', value=self._parameters['post']) self.add_attribute('post', value=self._parameters['post'])
@ -33,7 +34,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
# Original URL location of the microblog post (potentially malicious. # Original URL location of the microblog post (potentially malicious.
if 'url' in self._parameters: if 'url' in self._parameters:
if isinstance(self._parameters.get('url'), list): if isinstance(self._parameters.get('url'), list):
for i in self._parameters.get('url'): for i in self._parameters['url']:
self.add_attribute('url', value=i) self.add_attribute('url', value=i)
else: else:
self.add_attribute('url', value=self._parameters['url']) self.add_attribute('url', value=self._parameters['url'])
@ -41,7 +42,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
# Archive of the original document (Internet Archive, Archive.is, etc). # Archive of the original document (Internet Archive, Archive.is, etc).
if 'archive' in self._parameters: if 'archive' in self._parameters:
if isinstance(self._parameters.get('archive'), list): if isinstance(self._parameters.get('archive'), list):
for i in self._parameters.get('archive'): for i in self._parameters['archive']:
self.add_attribute('archive', value=i) self.add_attribute('archive', value=i)
else: else:
self.add_attribute('archive', value=self._parameters['archive']) self.add_attribute('archive', value=self._parameters['archive'])
@ -75,7 +76,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
"Instagram", "Forum", "Other"] "Instagram", "Forum", "Other"]
if 'type' in self._parameters: if 'type' in self._parameters:
if isinstance(self._parameters.get('type'), list): if isinstance(self._parameters.get('type'), list):
for i in self._parameters.get('type'): for i in self._parameters['type']:
if i in type_allowed_values: if i in type_allowed_values:
self.add_attribute('type', value=i) self.add_attribute('type', value=i)
else: else:
@ -86,7 +87,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"] type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"]
if 'state' in self._parameters: if 'state' in self._parameters:
if isinstance(self._parameters.get('state'), list): if isinstance(self._parameters.get('state'), list):
for i in self._parameters.get('state'): for i in self._parameters['state']:
if i in type_allowed_values: if i in type_allowed_values:
self.add_attribute('state', value=i) self.add_attribute('state', value=i)
else: else:
@ -101,7 +102,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
type_allowed_values = ["Verified", "Unverified", "Unknown"] type_allowed_values = ["Verified", "Unverified", "Unknown"]
if 'verified-username' in self._parameters: if 'verified-username' in self._parameters:
if isinstance(self._parameters.get('verified-username'), list): if isinstance(self._parameters.get('verified-username'), list):
for i in self._parameters.get('verified-username'): for i in self._parameters['verified-username']:
if i in type_allowed_values: if i in type_allowed_values:
self.add_attribute('verified-username', value=i) self.add_attribute('verified-username', value=i)
else: else:
@ -111,7 +112,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
# embedded-link. # embedded-link.
if 'embedded-link' in self._parameters: if 'embedded-link' in self._parameters:
if isinstance(self._parameters.get('embedded-link'), list): if isinstance(self._parameters.get('embedded-link'), list):
for i in self._parameters.get('embedded-link'): for i in self._parameters['embedded-link']:
self.add_attribute('embedded-link', value=i) self.add_attribute('embedded-link', value=i)
else: else:
self.add_attribute('embedded-link', value=self._parameters['embedded-link']) self.add_attribute('embedded-link', value=self._parameters['embedded-link'])
@ -119,7 +120,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
# embedded-safe-link # embedded-safe-link
if 'embedded-safe-link' in self._parameters: if 'embedded-safe-link' in self._parameters:
if isinstance(self._parameters.get('embedded-safe-link'), list): if isinstance(self._parameters.get('embedded-safe-link'), list):
for i in self._parameters.get('embedded-safe-link'): for i in self._parameters['embedded-safe-link']:
self.add_attribute('embedded-safe-link', value=i) self.add_attribute('embedded-safe-link', value=i)
else: else:
self.add_attribute('embedded-safe-link', value=self._parameters['embedded-safe-link']) self.add_attribute('embedded-safe-link', value=self._parameters['embedded-safe-link'])
@ -127,7 +128,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
# Hashtag into the microblog post. # Hashtag into the microblog post.
if 'hashtag' in self._parameters: if 'hashtag' in self._parameters:
if isinstance(self._parameters.get('hashtag'), list): if isinstance(self._parameters.get('hashtag'), list):
for i in self._parameters.get('hashtag'): for i in self._parameters['hashtag']:
self.add_attribute('hashtag', value=i) self.add_attribute('hashtag', value=i)
else: else:
self.add_attribute('hashtag', value=self._parameters['hashtag']) self.add_attribute('hashtag', value=self._parameters['hashtag'])
@ -135,7 +136,7 @@ class MicroblogObject(AbstractMISPObjectGenerator):
# username quoted # username quoted
if 'username-quoted' in self._parameters: if 'username-quoted' in self._parameters:
if isinstance(self._parameters.get('username-quoted'), list): if isinstance(self._parameters.get('username-quoted'), list):
for i in self._parameters.get('username-quoted'): for i in self._parameters['username-quoted']:
self.add_attribute('username-quoted', value=i) self.add_attribute('username-quoted', value=i)
else: else:
self.add_attribute('username-quoted', value=self._parameters['username-quoted']) self.add_attribute('username-quoted', value=self._parameters['username-quoted'])