chg: Add typing markup

pull/511/head
Raphaël Vinot 2020-01-02 15:55:00 +01:00
parent 6427ce3c84
commit 2e064563c3
2 changed files with 118 additions and 118 deletions

View File

@ -1,27 +1,27 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from deprecated import deprecated
from deprecated import deprecated # type: ignore
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
try:
from rapidjson import load
from rapidjson import loads
from rapidjson import dumps
import rapidjson
from rapidjson import load # type: ignore
from rapidjson import loads # type: ignore
from rapidjson import dumps # type: ignore
HAS_RAPIDJSON = True
except ImportError:
from json import load
from json import loads
from json import dumps
import json
HAS_RAPIDJSON = False
import logging
from enum import Enum
from typing import Union, Optional
from .exceptions import PyMISPInvalidFormat, PyMISPError
@ -43,7 +43,7 @@ class MISPFileCache(object):
@staticmethod
@lru_cache(maxsize=150)
def _load_json(path):
def _load_json(path: Path) -> Union[dict, None]:
if not path.exists():
return None
with path.open('r') as f:
@ -73,7 +73,7 @@ class Analysis(Enum):
completed = 2
def _int_to_str(d):
def _int_to_str(d: dict) -> dict:
# transform all integer back to string
for k, v in d.items():
if isinstance(v, (int, float)) and not isinstance(v, bool):
@ -95,31 +95,7 @@ class MISPEncode(JSONEncoder):
return JSONEncoder.default(self, obj)
if HAS_RAPIDJSON:
def pymisp_json_default(obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)
return rapidjson.default(obj)
else:
def pymisp_json_default(obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)
return json.default(obj)
class AbstractMISP(MutableMapping, MISPFileCache):
class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
__resources_path = resources_path
__misp_objects_path = misp_objects_path
__describe_types = describe_types
@ -132,15 +108,16 @@ class AbstractMISP(MutableMapping, MISPFileCache):
methods in ExpandedPyMISP/PyMISP.
"""
super().__init__()
self.__edited = True # As we create a new object, we assume it is edited
self.__not_jsonable = []
self.__self_defined_describe_types = None
self.__edited: bool = True # As we create a new object, we assume it is edited
self.__not_jsonable: list = []
self._fields_for_feed: set = {}
self.__self_defined_describe_types: Union[dict, None] = None
if kwargs.get('force_timestamps') is not None:
# Ignore the edited objects and keep the timestamps.
self.__force_timestamps = True
self.__force_timestamps: bool = True
else:
self.__force_timestamps = False
self.__force_timestamps: bool = False
# List of classes having tags
from .mispevent import MISPAttribute, MISPEvent
@ -151,30 +128,30 @@ class AbstractMISP(MutableMapping, MISPFileCache):
setattr(AbstractMISP, 'tags', property(AbstractMISP.__get_tags, AbstractMISP.__set_tags))
@property
def describe_types(self):
def describe_types(self) -> dict:
if self.__self_defined_describe_types:
return self.__self_defined_describe_types
return self.__describe_types
@describe_types.setter
def describe_types(self, describe_types):
def describe_types(self, describe_types: dict):
self.__self_defined_describe_types = describe_types
@property
def resources_path(self):
def resources_path(self) -> Path:
return self.__resources_path
@property
def misp_objects_path(self):
def misp_objects_path(self) -> Path:
return self.__misp_objects_path
@misp_objects_path.setter
def misp_objects_path(self, misp_objects_path):
def misp_objects_path(self, misp_objects_path: Union[str, Path]):
if isinstance(misp_objects_path, str):
misp_objects_path = Path(misp_objects_path)
self.__misp_objects_path = misp_objects_path
def from_dict(self, **kwargs):
def from_dict(self, **kwargs) -> None:
"""Loading all the parameters as class properties, if they aren't `None`.
This method aims to be called when all the properties requiring a special
treatment are processed.
@ -187,19 +164,19 @@ class AbstractMISP(MutableMapping, MISPFileCache):
# We load an existing dictionary, marking it an not-edited
self.__edited = False
def update_not_jsonable(self, *args):
def update_not_jsonable(self, *args) -> None:
"""Add entries to the __not_jsonable list"""
self.__not_jsonable += args
def set_not_jsonable(self, *args):
def set_not_jsonable(self, args: list) -> None:
"""Set __not_jsonable to a new list"""
self.__not_jsonable = args
def from_json(self, json_string):
def from_json(self, json_string: str) -> None:
"""Load a JSON string"""
self.from_dict(**loads(json_string))
def to_dict(self):
def to_dict(self) -> dict:
"""Dump the class to a dictionary.
This method automatically removes the timestamp recursively in every object
that has been edited is order to let MISP update the event accordingly."""
@ -223,15 +200,15 @@ class AbstractMISP(MutableMapping, MISPFileCache):
to_return = _int_to_str(to_return)
return to_return
def jsonable(self):
def jsonable(self) -> dict:
"""This method is used by the JSON encoder"""
return self.to_dict()
def _to_feed(self):
if not hasattr(self, '_fields_for_feed'):
def _to_feed(self) -> dict:
if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed:
raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.')
if hasattr(self, '_set_default') and callable(self._set_default):
self._set_default()
if hasattr(self, '_set_default') and callable(self._set_default): # type: ignore
self._set_default() # type: ignore
to_return = {}
for field in self._fields_for_feed:
if getattr(self, field, None) is not None:
@ -248,7 +225,7 @@ class AbstractMISP(MutableMapping, MISPFileCache):
raise PyMISPError('The field {} is required in {} when generating a feed.'.format(field, self.__class__.__name__))
return to_return
def to_json(self, sort_keys=False, indent=None):
def to_json(self, sort_keys: bool=False, indent: Optional[int]=None):
"""Dump recursively any class of type MISPAbstract to a json string"""
return dumps(self, default=pymisp_json_default, sort_keys=sort_keys, indent=indent)
@ -274,7 +251,7 @@ class AbstractMISP(MutableMapping, MISPFileCache):
return len([k for k in self.__dict__.keys() if not (k[0] == '_' or k in self.__not_jsonable)])
@property
def edited(self):
def edited(self) -> bool:
"""Recursively check if an object has been edited and update the flag accordingly
to the parent objects"""
if self.__edited:
@ -304,7 +281,7 @@ class AbstractMISP(MutableMapping, MISPFileCache):
self.__edited = True
super().__setattr__(name, value)
def _datetime_to_timestamp(self, d):
def _datetime_to_timestamp(self, d: Union[int, float, str, datetime.datetime]) -> int:
"""Convert a datetime.datetime object to a timestamp (int)"""
if isinstance(d, (int, float, str)):
# Assume we already have a timestamp
@ -325,10 +302,11 @@ class AbstractMISP(MutableMapping, MISPFileCache):
misp_tag = MISPTag()
misp_tag.from_dict(**kwargs)
else:
raise PyMISPInvalidFormat("The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {}".format(tag))
raise PyMISPInvalidFormat(f"The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {tag}")
if misp_tag not in self.tags:
self.Tag.append(misp_tag)
self.edited = True
return misp_tag
def __get_tags(self):
"""Returns a lost of tags associated to this Attribute"""
@ -357,7 +335,7 @@ class AbstractMISP(MutableMapping, MISPFileCache):
class MISPTag(AbstractMISP):
_fields_for_feed = {'name', 'colour'}
_fields_for_feed: set = {'name', 'colour'}
def from_dict(self, **kwargs):
if kwargs.get('Tag'):
@ -372,3 +350,25 @@ class MISPTag(AbstractMISP):
if hasattr(self, 'exportable') and not self.exportable:
return False
return super()._to_feed()
if HAS_RAPIDJSON:
def pymisp_json_default(obj: Union[AbstractMISP, datetime.datetime, datetime.date, Enum, UUID]) -> Union[dict, str]:
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)
else:
def pymisp_json_default(obj: Union[AbstractMISP, datetime.datetime, datetime.date, Enum, UUID]) -> Union[dict, str]:
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)

View File

@ -25,26 +25,26 @@ except ImportError:
logger.exception("Cannot import dateutil")
try:
import jsonschema
import jsonschema # type: ignore
except ImportError:
logger.exception("Cannot import jsonschema")
try:
# pyme renamed to gpg the 2016-10-28
import gpg
from gpg.constants.sig import mode
import gpg # type: ignore
from gpg.constants.sig import mode # type: ignore
has_pyme = True
except ImportError:
try:
# pyme renamed to gpg the 2016-10-28
import pyme as gpg
from pyme.constants.sig import mode
import pyme as gpg # type: ignore
from pyme.constants.sig import mode # type: ignore
has_pyme = True
except ImportError:
has_pyme = False
def _int_to_str(d):
def _int_to_str(d: dict):
# transform all integer back to string
for k, v in d.items():
if isinstance(v, (int, float)) and not isinstance(v, bool):
@ -52,7 +52,7 @@ def _int_to_str(d):
return d
def make_bool(value):
def make_bool(value: Union[bool, int, str, dict, list, None]) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, int):
@ -70,7 +70,7 @@ def make_bool(value):
class MISPOrganisation(AbstractMISP):
_fields_for_feed = {'name', 'uuid'}
_fields_for_feed: set = {'name', 'uuid'}
def from_dict(self, **kwargs):
if 'Organisation' in kwargs:
@ -87,8 +87,8 @@ class MISPShadowAttribute(AbstractMISP):
def __repr__(self) -> str:
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(type={self.type}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
return f'<{self.__class__.__name__}(type={self.type}, value={self.value})' # type: ignore
return f'<{self.__class__.__name__}(NotInitialized)'
class MISPSighting(AbstractMISP):
@ -109,17 +109,17 @@ class MISPSighting(AbstractMISP):
def __repr__(self) -> str:
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(value={self.value})'.format(self=self) # type: ignore
if hasattr(self, 'id'):
return '<{self.__class__.__name__}(id={self.id})'.format(self=self)
return '<{self.__class__.__name__}(id={self.id})'.format(self=self) # type: ignore
if hasattr(self, 'uuid'):
return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self)
return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self) # type: ignore
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPAttribute(AbstractMISP):
_fields_for_feed = {'uuid', 'value', 'category', 'type', 'comment', 'data',
'timestamp', 'to_ids', 'disable_correlation'}
_fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data',
'timestamp', 'to_ids', 'disable_correlation'}
def __init__(self, describe_types: Optional[dict]=None, strict: bool=False):
"""Represents an Attribute
@ -128,15 +128,15 @@ class MISPAttribute(AbstractMISP):
"""
super().__init__()
if describe_types:
self.describe_types = describe_types
self.__categories = self.describe_types['categories']
self.__category_type_mapping = self.describe_types['category_type_mappings']
self.__sane_default = self.describe_types['sane_defaults']
self.__strict = strict
self.describe_types: dict = describe_types
self.__categories: List[str] = self.describe_types['categories']
self.__category_type_mapping: dict = self.describe_types['category_type_mappings']
self.__sane_default: dict = self.describe_types['sane_defaults']
self.__strict: bool = strict
self._data = None
self.uuid = str(uuid.uuid4())
self.ShadowAttribute = []
self.Sighting = []
self.uuid: str = str(uuid.uuid4())
self.ShadowAttribute: List[MISPShadowAttribute] = []
self.Sighting: List[MISPSighting] = []
def hash_values(self, algorithm: str='sha512') -> List[str]:
"""Compute the hash of every values for fast lookups"""
@ -167,8 +167,8 @@ class MISPAttribute(AbstractMISP):
to_return = super()._to_feed()
if self.data:
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
if self.tags:
to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags]))
if self.tags: # type: ignore
to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags])) # type: ignore
return to_return
@property
@ -177,7 +177,7 @@ class MISPAttribute(AbstractMISP):
return self.describe_types['types']
@property
def malware_binary(self) -> BytesIO:
def malware_binary(self) -> Union[BytesIO, None]:
"""Returns a BytesIO of the malware (if the attribute has one, obvs)."""
if hasattr(self, '_malware_binary'):
return self._malware_binary
@ -445,8 +445,8 @@ class MISPAttribute(AbstractMISP):
class MISPObjectReference(AbstractMISP):
_fields_for_feed = {'uuid', 'timestamp', 'relationship_type', 'comment',
'object_uuid', 'referenced_uuid'}
_fields_for_feed: set = {'uuid', 'timestamp', 'relationship_type', 'comment',
'object_uuid', 'referenced_uuid'}
def __init__(self):
super().__init__()
@ -471,9 +471,9 @@ class MISPObjectReference(AbstractMISP):
class MISPObject(AbstractMISP):
_fields_for_feed = {'name', 'meta-category', 'description', 'template_uuid',
'template_version', 'uuid', 'timestamp', 'distribution',
'sharing_group_id', 'comment'}
_fields_for_feed: set = {'name', 'meta-category', 'description', 'template_uuid',
'template_version', 'uuid', 'timestamp', 'distribution',
'sharing_group_id', 'comment'}
def __init__(self, name: str, strict: bool=False, standalone: bool=False, default_attributes_parameters: dict={}, **kwargs):
''' Master class representing a generic MISP object
@ -489,21 +489,21 @@ class MISPObject(AbstractMISP):
:misp_objects_path_custom: Path to custom object templates
'''
super().__init__(**kwargs)
self._strict = strict
self.name = name
self._known_template = False
self._strict: bool = strict
self.name: str = name
self._known_template: bool = False
self._set_template(kwargs.get('misp_objects_path_custom'))
self.uuid = str(uuid.uuid4())
self.__fast_attribute_access = defaultdict(list) # Hashtable object_relation: [attributes]
self.ObjectReference = []
self.Attribute = []
self.uuid: str = str(uuid.uuid4())
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
self.ObjectReference: List[MISPObjectReference] = []
self.Attribute: List[MISPAttribute] = []
if isinstance(default_attributes_parameters, MISPAttribute):
# Just make sure we're not modifying an existing MISPAttribute
self._default_attributes_parameters = default_attributes_parameters.to_dict()
self._default_attributes_parameters: dict = default_attributes_parameters.to_dict()
else:
self._default_attributes_parameters = default_attributes_parameters
self._default_attributes_parameters: dict = default_attributes_parameters
if self._default_attributes_parameters:
# Let's clean that up
self._default_attributes_parameters.pop('value', None) # duh
@ -529,7 +529,7 @@ class MISPObject(AbstractMISP):
self.update_not_jsonable('ObjectReference')
def _load_template_path(self, template_path: Union[Path, str]) -> bool:
self._definition = self._load_json(template_path)
self._definition: Union[dict, None] = self._load_json(template_path)
if not self._definition:
return False
setattr(self, 'meta-category', self._definition['meta-category'])
@ -555,7 +555,7 @@ class MISPObject(AbstractMISP):
self.name = object_name
self._set_template(misp_objects_path_custom)
def _set_template(self, misp_objects_path_custom: Union[Path, str]=None):
def _set_template(self, misp_objects_path_custom: Optional[Union[Path, str]]=None):
if misp_objects_path_custom:
# If misp_objects_path_custom is given, and an object with the given name exists, use that.
self.misp_objects_path = misp_objects_path_custom
@ -668,7 +668,7 @@ class MISPObject(AbstractMISP):
'''True if all the relations in the list are defined in the object'''
return all(relation in self._fast_attribute_access for relation in list_of_relations)
def add_attribute(self, object_relation: str, simple_value: Union[str, int, float]=None, **value) -> MISPAttribute:
def add_attribute(self, object_relation: str, simple_value: Union[str, int, float]=None, **value) -> Union[MISPAttribute, None]:
"""Add an attribute. object_relation is required and the value key is a
dictionary with all the keys supported by MISPAttribute"""
if simple_value is not None: # /!\ The value *can* be 0
@ -676,7 +676,7 @@ class MISPObject(AbstractMISP):
if value.get('value') in [None, '']:
logger.warning("The value of the attribute you're trying to add is None or empty string, skipping it. Object relation: {}".format(object_relation))
return None
if self._known_template:
if self._known_template and self._definition:
if object_relation in self._definition['attributes']:
attribute = MISPObjectAttribute(self._definition['attributes'][object_relation])
else:
@ -694,7 +694,7 @@ class MISPObject(AbstractMISP):
self.edited = True
return attribute
def add_attributes(self, object_relation: str, *attributes) -> List[MISPAttribute]:
def add_attributes(self, object_relation: str, *attributes) -> List[Optional[MISPAttribute]]:
'''Add multiple attributes with the same object_relation.
Helper for object_relation when multiple is True in the template.
It is the same as calling multiple times add_attribute with the same object_relation.
@ -713,7 +713,7 @@ class MISPObject(AbstractMISP):
self._validate()
return super(MISPObject, self).to_dict()
def to_json(self, strict: bool=False, sort_keys: bool=False, indent: Optional[int]=None):
def to_json(self, sort_keys: bool=False, indent: Optional[int]=None, strict: bool=False):
if strict or self._strict and self._known_template:
self._validate()
return super(MISPObject, self).to_json(sort_keys=sort_keys, indent=indent)
@ -745,8 +745,8 @@ class MISPObject(AbstractMISP):
class MISPEvent(AbstractMISP):
_fields_for_feed = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
'publish_timestamp', 'published', 'date', 'extends_uuid'}
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
'publish_timestamp', 'published', 'date', 'extends_uuid'}
def __init__(self, describe_types: dict=None, strict_validation: bool=False, **kwargs):
super().__init__(**kwargs)
@ -759,10 +759,10 @@ class MISPEvent(AbstractMISP):
# This variable is used in add_attribute in order to avoid duplicating the structure
self.describe_types = describe_types
self.Attribute = []
self.Object = []
self.RelatedEvent = []
self.ShadowAttribute = []
self.Attribute: List[MISPAttribute] = []
self.Object: List[MISPObject] = []
self.RelatedEvent: List[MISPEvent] = []
self.ShadowAttribute: List[MISPShadowAttribute] = []
def _set_default(self):
"""There are a few keys that could, or need to be set by default for the feed generator"""
@ -807,7 +807,7 @@ class MISPEvent(AbstractMISP):
}
def attributes_hashes(self, algorithm: str='sha512') -> List[str]:
to_return = []
to_return: List[str] = []
for attribute in self.attributes:
to_return += attribute.hash_values(algorithm)
for obj in self.objects:
@ -828,7 +828,7 @@ class MISPEvent(AbstractMISP):
if (hasattr(self, 'distribution')
and self.distribution is not None
and int(self.distribution) not in valid_distributions):
return
return {}
to_return = super()._to_feed()
if with_meta:
@ -1069,7 +1069,7 @@ class MISPEvent(AbstractMISP):
"""Return the tags associated to an attribute or an object attribute.
:attribute_identifier: can be an ID, UUID, or the value.
"""
tags = []
tags: List[MISPTag] = []
for a in self.attributes + [attribute for o in self.objects for attribute in o.attributes]:
if ((hasattr(a, 'id') and a.id == attribute_identifier)
or (hasattr(a, 'uuid') and a.uuid == attribute_identifier)
@ -1117,10 +1117,10 @@ class MISPEvent(AbstractMISP):
if not found:
raise PyMISPError('No attribute with UUID/ID {} found.'.format(attribute_id))
def add_attribute(self, type: str, value: Union[str, int, float], **kwargs) -> MISPAttribute:
def add_attribute(self, type: str, value: Union[str, int, float], **kwargs) -> Union[MISPAttribute, List[MISPAttribute]]:
"""Add an attribute. type and value are required but you can pass all
other parameters supported by MISPAttribute"""
attr_list = []
attr_list: List[MISPAttribute] = []
if isinstance(value, list):
attr_list = [self.add_attribute(type=type, value=a, **kwargs) for a in value]
else:
@ -1377,8 +1377,8 @@ class MISPEventDelegation(AbstractMISP):
class MISPObjectAttribute(MISPAttribute):
_fields_for_feed = {'uuid', 'object_relation', 'value', 'category', 'type',
'comment', 'data', 'timestamp', 'to_ids', 'disable_correlation'}
_fields_for_feed: set = {'uuid', 'object_relation', 'value', 'category', 'type',
'comment', 'data', 'timestamp', 'to_ids', 'disable_correlation'}
def __init__(self, definition):
super().__init__()