mirror of https://github.com/MISP/PyMISP
				
				
				
			chg: First batch of changes for strict typing
							parent
							
								
									3a74ca8704
								
							
						
					
					
						commit
						298e2f9035
					
				
							
								
								
									
										6
									
								
								mypy.ini
								
								
								
								
							
							
						
						
									
										6
									
								
								mypy.ini
								
								
								
								
							|  | @ -3,13 +3,11 @@ strict = True | |||
| warn_return_any = False | ||||
| show_error_context = True | ||||
| pretty = True | ||||
| exclude = feed-generator|examples | ||||
| exclude = feed-generator|examples|pymisp/tools|pymisp/data|tests | ||||
| 
 | ||||
| # Stuff to remove gradually | ||||
| disallow_untyped_defs = False | ||||
| # disallow_untyped_defs = False | ||||
| disallow_untyped_calls = False | ||||
| check_untyped_defs = False | ||||
| disable_error_code = attr-defined,type-arg,no-untyped-def | ||||
| 
 | ||||
| 
 | ||||
| [mypy-docs.source.*] | ||||
|  |  | |||
|  | @ -11,7 +11,7 @@ logger = logging.getLogger(__name__) | |||
| __version__ = importlib.metadata.version("pymisp") | ||||
| 
 | ||||
| 
 | ||||
| def warning_2024(): | ||||
| def warning_2024() -> None: | ||||
|     if sys.version_info < (3, 10): | ||||
|         warnings.warn(""" | ||||
| As our baseline system is the latest Ubuntu LTS, and Ubuntu LTS 22.04 has Python 3.10 available, | ||||
|  | @ -62,3 +62,17 @@ try: | |||
|     logger.debug('pymisp loaded properly') | ||||
| except ImportError as e: | ||||
|     logger.warning(f'Unable to load pymisp properly: {e}') | ||||
| 
 | ||||
| __all__ = ['PyMISP', 'register_user', 'AbstractMISP', 'MISPTag', | ||||
|            'MISPEvent', 'MISPAttribute', 'MISPObjectReference', 'MISPObjectAttribute', | ||||
|            'MISPObject', 'MISPUser', 'MISPOrganisation', 'MISPSighting', 'MISPLog', | ||||
|            'MISPShadowAttribute', 'MISPWarninglist', 'MISPTaxonomy', 'MISPNoticelist', | ||||
|            'MISPObjectTemplate', 'MISPSharingGroup', 'MISPRole', 'MISPServer', 'MISPFeed', | ||||
|            'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist', | ||||
|            'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion', | ||||
|            'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement', | ||||
|            'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError', | ||||
|            'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat', | ||||
|            'EmailObject', 'FileObject', 'IPObject', 'DomainObject', 'URIObject', 'ASNObject', | ||||
|            'Distribution', 'ThreatLevel', 'Analysis' | ||||
|            ] | ||||
|  |  | |||
|  | @ -30,7 +30,7 @@ logger = logging.getLogger('pymisp') | |||
| resources_path = Path(__file__).parent / 'data' | ||||
| misp_objects_path = resources_path / 'misp-objects' / 'objects' | ||||
| with (resources_path / 'describeTypes.json').open('rb') as f: | ||||
|     describe_types = loads(f.read())['result'] | ||||
|     describe_types: dict[str, Any] = loads(f.read())['result'] | ||||
| 
 | ||||
| 
 | ||||
| class MISPFileCache: | ||||
|  | @ -38,7 +38,7 @@ class MISPFileCache: | |||
| 
 | ||||
|     @staticmethod | ||||
|     @lru_cache(maxsize=150) | ||||
|     def _load_json(path: Path) -> dict | None: | ||||
|     def _load_json(path: Path) -> dict[str, Any] | None: | ||||
|         if not path.exists(): | ||||
|             return None | ||||
|         with path.open('rb') as f: | ||||
|  | @ -80,7 +80,7 @@ def _int_to_str(d: dict[str, Any]) -> dict[str, Any]: | |||
| 
 | ||||
| @deprecated(reason=" Use method default=pymisp_json_default instead of cls=MISPEncode", version='2.4.117', action='default') | ||||
| class MISPEncode(JSONEncoder): | ||||
|     def default(self, obj): | ||||
|     def default(self, obj: Any) -> dict[str, Any] | str: | ||||
|         if isinstance(obj, AbstractMISP): | ||||
|             return obj.jsonable() | ||||
|         elif isinstance(obj, (datetime, date)): | ||||
|  | @ -92,12 +92,12 @@ class MISPEncode(JSONEncoder): | |||
|         return JSONEncoder.default(self, obj) | ||||
| 
 | ||||
| 
 | ||||
| class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | ||||
| class AbstractMISP(MutableMapping[str, Any], MISPFileCache, metaclass=ABCMeta): | ||||
|     __resources_path = resources_path | ||||
|     __misp_objects_path = misp_objects_path | ||||
|     __describe_types = describe_types | ||||
| 
 | ||||
|     def __init__(self, **kwargs: dict): | ||||
|     def __init__(self, **kwargs) -> None:  # type: ignore[no-untyped-def] | ||||
|         """Abstract class for all the MISP objects. | ||||
|         NOTE: Every method in every classes inheriting this one are doing | ||||
|               changes in memory and  do not modify data on a remote MISP instance. | ||||
|  | @ -107,8 +107,8 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         super().__init__() | ||||
|         self.__edited: bool = True  # As we create a new object, we assume it is edited | ||||
|         self.__not_jsonable: list[str] = [] | ||||
|         self._fields_for_feed: set | ||||
|         self.__self_defined_describe_types: dict | None = None | ||||
|         self._fields_for_feed: set[str] | ||||
|         self.__self_defined_describe_types: dict[str, Any] | None = None | ||||
|         self.uuid: str | ||||
| 
 | ||||
|         if kwargs.get('force_timestamps') is not None: | ||||
|  | @ -118,13 +118,13 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|             self.__force_timestamps = False | ||||
| 
 | ||||
|     @property | ||||
|     def describe_types(self) -> dict: | ||||
|     def describe_types(self) -> dict[str, Any]: | ||||
|         if self.__self_defined_describe_types: | ||||
|             return self.__self_defined_describe_types | ||||
|         return self.__describe_types | ||||
| 
 | ||||
|     @describe_types.setter | ||||
|     def describe_types(self, describe_types: dict): | ||||
|     def describe_types(self, describe_types: dict[str, Any]) -> None: | ||||
|         self.__self_defined_describe_types = describe_types | ||||
| 
 | ||||
|     @property | ||||
|  | @ -136,12 +136,12 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         return self.__misp_objects_path | ||||
| 
 | ||||
|     @misp_objects_path.setter | ||||
|     def misp_objects_path(self, misp_objects_path: str | Path): | ||||
|     def misp_objects_path(self, misp_objects_path: str | Path) -> None: | ||||
|         if isinstance(misp_objects_path, str): | ||||
|             misp_objects_path = Path(misp_objects_path) | ||||
|         self.__misp_objects_path = misp_objects_path | ||||
| 
 | ||||
|     def from_dict(self, **kwargs) -> None: | ||||
|     def from_dict(self, **kwargs) -> None:  # type: ignore[no-untyped-def] | ||||
|         """Loading all the parameters as class properties, if they aren't `None`. | ||||
|         This method aims to be called when all the properties requiring a special | ||||
|         treatment are processed. | ||||
|  | @ -154,7 +154,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         # We load an existing dictionary, marking it an not-edited | ||||
|         self.__edited = False | ||||
| 
 | ||||
|     def update_not_jsonable(self, *args) -> None: | ||||
|     def update_not_jsonable(self, *args) -> None:  # type: ignore[no-untyped-def] | ||||
|         """Add entries to the __not_jsonable list""" | ||||
|         self.__not_jsonable += args | ||||
| 
 | ||||
|  | @ -162,7 +162,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         """Set __not_jsonable to a new list""" | ||||
|         self.__not_jsonable = args | ||||
| 
 | ||||
|     def _remove_from_not_jsonable(self, *args) -> None: | ||||
|     def _remove_from_not_jsonable(self, *args) -> None:  # type: ignore[no-untyped-def] | ||||
|         """Remove the entries that are in the __not_jsonable list""" | ||||
|         for entry in args: | ||||
|             try: | ||||
|  | @ -174,7 +174,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         """Load a JSON string""" | ||||
|         self.from_dict(**loads(json_string)) | ||||
| 
 | ||||
|     def to_dict(self, json_format: bool = False) -> dict: | ||||
|     def to_dict(self, json_format: bool = False) -> dict[str, Any]: | ||||
|         """Dump the class to a dictionary. | ||||
|         This method automatically removes the timestamp recursively in every object | ||||
|         that has been edited is order to let MISP update the event accordingly.""" | ||||
|  | @ -216,11 +216,11 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         to_return = _int_to_str(to_return) | ||||
|         return to_return | ||||
| 
 | ||||
|     def jsonable(self) -> dict: | ||||
|     def jsonable(self) -> dict[str, Any]: | ||||
|         """This method is used by the JSON encoder""" | ||||
|         return self.to_dict() | ||||
| 
 | ||||
|     def _to_feed(self) -> dict: | ||||
|     def _to_feed(self) -> dict[str, Any]: | ||||
|         if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed: | ||||
|             raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.') | ||||
|         if hasattr(self, '_set_default') and callable(self._set_default): | ||||
|  | @ -256,7 +256,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
| 
 | ||||
|         return dumps(self, default=pymisp_json_default, sort_keys=sort_keys, indent=indent) | ||||
| 
 | ||||
|     def __getitem__(self, key): | ||||
|     def __getitem__(self, key: str) -> Any: | ||||
|         try: | ||||
|             if key[0] != '_' and key not in self.__not_jsonable: | ||||
|                 return self.__dict__[key] | ||||
|  | @ -265,13 +265,13 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|             # Expected by pop and other dict-related methods | ||||
|             raise KeyError | ||||
| 
 | ||||
|     def __setitem__(self, key, value): | ||||
|     def __setitem__(self, key: str, value: Any) -> None: | ||||
|         setattr(self, key, value) | ||||
| 
 | ||||
|     def __delitem__(self, key): | ||||
|     def __delitem__(self, key: str) -> None: | ||||
|         delattr(self, key) | ||||
| 
 | ||||
|     def __iter__(self): | ||||
|     def __iter__(self) -> Any: | ||||
|         '''When we call **self, skip keys: | ||||
|             * starting with _ | ||||
|             * in __not_jsonable | ||||
|  | @ -290,7 +290,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         return self.__force_timestamps | ||||
| 
 | ||||
|     @force_timestamp.setter | ||||
|     def force_timestamp(self, force: bool): | ||||
|     def force_timestamp(self, force: bool) -> None: | ||||
|         self.__force_timestamps = force | ||||
| 
 | ||||
|     @property | ||||
|  | @ -310,14 +310,14 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|         return self.__edited | ||||
| 
 | ||||
|     @edited.setter | ||||
|     def edited(self, val: bool): | ||||
|     def edited(self, val: bool) -> None: | ||||
|         """Set the edit flag""" | ||||
|         if isinstance(val, bool): | ||||
|             self.__edited = val | ||||
|         else: | ||||
|             raise PyMISPError('edited can only be True or False') | ||||
| 
 | ||||
|     def __setattr__(self, name: str, value: Any): | ||||
|     def __setattr__(self, name: str, value: Any) -> None: | ||||
|         if name[0] != '_' and not self.__edited and name in self: | ||||
|             # The private members don't matter | ||||
|             # If we already have a key with that name, we're modifying it. | ||||
|  | @ -331,7 +331,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|             return int(d) | ||||
|         return int(d.timestamp()) | ||||
| 
 | ||||
|     def _add_tag(self, tag: str | MISPTag | Mapping | None = None, **kwargs): | ||||
|     def _add_tag(self, tag: str | MISPTag | Mapping[str, Any] | None = None, **kwargs):  # type: ignore[no-untyped-def] | ||||
|         """Add a tag to the attribute (by name or a MISPTag object)""" | ||||
|         if isinstance(tag, str): | ||||
|             misp_tag = MISPTag() | ||||
|  | @ -351,14 +351,14 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
|             self.edited = True | ||||
|         return misp_tag | ||||
| 
 | ||||
|     def _set_tags(self, tags: list[MISPTag]): | ||||
|     def _set_tags(self, tags: list[MISPTag]) -> None: | ||||
|         """Set a list of prepared MISPTag.""" | ||||
|         if all(isinstance(x, MISPTag) for x in tags): | ||||
|             self.Tag = tags | ||||
|         else: | ||||
|             raise PyMISPInvalidFormat('All the attributes have to be of type MISPTag.') | ||||
| 
 | ||||
|     def __eq__(self, other) -> bool: | ||||
|     def __eq__(self, other: object) -> bool: | ||||
|         if isinstance(other, AbstractMISP): | ||||
|             return self.to_dict() == other.to_dict() | ||||
|         elif isinstance(other, dict): | ||||
|  | @ -372,21 +372,21 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): | |||
| 
 | ||||
| class MISPTag(AbstractMISP): | ||||
| 
 | ||||
|     _fields_for_feed: set = {'name', 'colour', 'relationship_type', 'local'} | ||||
|     _fields_for_feed: set[str] = {'name', 'colour', 'relationship_type', 'local'} | ||||
| 
 | ||||
|     def __init__(self, **kwargs: dict): | ||||
|     def __init__(self, **kwargs) -> None:  # type: ignore[no-untyped-def] | ||||
|         super().__init__(**kwargs) | ||||
|         self.name: str | ||||
|         self.exportable: bool | ||||
|         self.local: bool | ||||
|         self.relationship_type: str | None | ||||
| 
 | ||||
|     def from_dict(self, **kwargs): | ||||
|     def from_dict(self, **kwargs) -> None:  # type: ignore[no-untyped-def] | ||||
|         if kwargs.get('Tag'): | ||||
|             kwargs = kwargs.get('Tag') | ||||
|             kwargs = kwargs.get('Tag')  # type: ignore[assignment] | ||||
|         super().from_dict(**kwargs) | ||||
| 
 | ||||
|     def _set_default(self): | ||||
|     def _set_default(self) -> None: | ||||
|         if not hasattr(self, 'relationship_type'): | ||||
|             self.relationship_type = '' | ||||
|         if not hasattr(self, 'colour'): | ||||
|  | @ -394,14 +394,14 @@ class MISPTag(AbstractMISP): | |||
|         if not hasattr(self, 'local'): | ||||
|             self.local = False | ||||
| 
 | ||||
|     def _to_feed(self, with_local: bool = True) -> dict: | ||||
|     def _to_feed(self, with_local: bool = True) -> dict[str, Any]: | ||||
|         if hasattr(self, 'exportable') and not self.exportable: | ||||
|             return {} | ||||
|         if with_local is False and hasattr(self, 'local') and self.local: | ||||
|             return {} | ||||
|         return super()._to_feed() | ||||
| 
 | ||||
|     def delete(self): | ||||
|     def delete(self) -> None: | ||||
|         self.deleted = True | ||||
|         self.edited = True | ||||
| 
 | ||||
|  | @ -412,7 +412,7 @@ class MISPTag(AbstractMISP): | |||
| 
 | ||||
| 
 | ||||
| # UUID, datetime, date and Enum is serialized by ORJSON by default | ||||
| def pymisp_json_default(obj: AbstractMISP | datetime | date | Enum | UUID) -> dict | str: | ||||
| def pymisp_json_default(obj: AbstractMISP | datetime | date | Enum | UUID) -> dict[str, Any] | str: | ||||
|     if isinstance(obj, AbstractMISP): | ||||
|         return obj.jsonable() | ||||
|     elif isinstance(obj, (datetime, date)): | ||||
|  |  | |||
							
								
								
									
										574
									
								
								pymisp/api.py
								
								
								
								
							
							
						
						
									
										574
									
								
								pymisp/api.py
								
								
								
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -2,7 +2,7 @@ from __future__ import annotations | |||
| 
 | ||||
| 
 | ||||
| class PyMISPError(Exception): | ||||
|     def __init__(self, message): | ||||
|     def __init__(self, message: str) -> None: | ||||
|         super().__init__(message) | ||||
|         self.message = message | ||||
| 
 | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -43,3 +43,13 @@ try: | |||
| except ImportError: | ||||
|     # Requires lief, optional [fileobjects] | ||||
|     pass | ||||
| 
 | ||||
| __all__ = ['VTReportObject', 'Neo4j', 'FileObject', 'make_binary_objects', | ||||
|            'AbstractMISPObjectGenerator', 'GenericObjectGenerator', | ||||
|            'load_openioc', 'load_openioc_file', 'SBSignatureObject', | ||||
|            'Fail2BanObject', 'DomainIPObject', 'ASNObject', 'GeolocationObject', | ||||
|            'GitVulnFinderObject', 'VehicleObject', 'CSVLoader', | ||||
|            'SSHAuthorizedKeysObject', 'feed_meta_generator', 'update_objects', | ||||
|            'EMailObject', 'URLObject', 'PEObject', 'PESectionObject', 'ELFObject', | ||||
|            'ELFSectionObject', 'MachOObject', 'MachOSectionObject' | ||||
|            ] | ||||
|  |  | |||
|  | @ -6,7 +6,6 @@ from .. import MISPObject | |||
| from ..exceptions import InvalidMISPObject | ||||
| from datetime import datetime, date | ||||
| from dateutil.parser import parse | ||||
| from typing import Union, Optional | ||||
| 
 | ||||
| 
 | ||||
| class AbstractMISPObjectGenerator(MISPObject): | ||||
|  |  | |||
|  | @ -2,15 +2,18 @@ | |||
| 
 | ||||
| from __future__ import annotations | ||||
| 
 | ||||
| from .abstractgenerator import AbstractMISPObjectGenerator | ||||
| import logging | ||||
| 
 | ||||
| from typing import Any | ||||
| 
 | ||||
| from .abstractgenerator import AbstractMISPObjectGenerator | ||||
| 
 | ||||
| logger = logging.getLogger('pymisp') | ||||
| 
 | ||||
| 
 | ||||
| class ASNObject(AbstractMISPObjectGenerator): | ||||
| 
 | ||||
|     def __init__(self, parameters: dict, strict: bool = True, **kwargs): | ||||
|     def __init__(self, parameters: dict[str, Any], strict: bool = True, **kwargs) -> None: | ||||
|         super().__init__('asn', strict=strict, **kwargs) | ||||
|         self._parameters = parameters | ||||
|         self.generate_attributes() | ||||
|  |  | |||
|  | @ -3,7 +3,6 @@ | |||
| from __future__ import annotations | ||||
| 
 | ||||
| import time | ||||
| import sys | ||||
| import unittest | ||||
| import subprocess | ||||
| 
 | ||||
|  | @ -12,7 +11,7 @@ import logging | |||
| logging.disable(logging.CRITICAL) | ||||
| 
 | ||||
| try: | ||||
|     from pymisp import ExpandedPyMISP, MISPOrganisation, MISPUser, MISPEvent, MISPObject, MISPSharingGroup, Distribution | ||||
|     from pymisp import PyMISP, MISPOrganisation, MISPUser, MISPEvent, MISPObject, MISPSharingGroup, Distribution | ||||
| except ImportError: | ||||
|     raise | ||||
| 
 | ||||
|  | @ -70,7 +69,7 @@ fast_mode = True | |||
| class MISPInstance(): | ||||
| 
 | ||||
|     def __init__(self, params): | ||||
|         self.initial_user_connector = ExpandedPyMISP(params['url'], params['key'], ssl=False, debug=False) | ||||
|         self.initial_user_connector = PyMISP(params['url'], params['key'], ssl=False, debug=False) | ||||
|         # Git pull | ||||
|         self.initial_user_connector.update_misp() | ||||
|         # Set the default role (id 3 on the VM is normal user) | ||||
|  | @ -98,7 +97,7 @@ class MISPInstance(): | |||
|         user.org_id = self.test_org.id | ||||
|         user.role_id = 1  # Site admin | ||||
|         self.test_site_admin = self.initial_user_connector.add_user(user) | ||||
|         self.site_admin_connector = ExpandedPyMISP(params['url'], self.test_site_admin.authkey, ssl=False, debug=False) | ||||
|         self.site_admin_connector = PyMISP(params['url'], self.test_site_admin.authkey, ssl=False, debug=False) | ||||
|         self.site_admin_connector.toggle_global_pythonify() | ||||
|         # Create org admin | ||||
|         user = MISPUser() | ||||
|  | @ -106,14 +105,14 @@ class MISPInstance(): | |||
|         user.org_id = self.test_org.id | ||||
|         user.role_id = 2  # Org admin | ||||
|         self.test_org_admin = self.site_admin_connector.add_user(user) | ||||
|         self.org_admin_connector = ExpandedPyMISP(params['url'], self.test_org_admin.authkey, ssl=False, debug=False) | ||||
|         self.org_admin_connector = PyMISP(params['url'], self.test_org_admin.authkey, ssl=False, debug=False) | ||||
|         self.org_admin_connector.toggle_global_pythonify() | ||||
|         # Create user | ||||
|         user = MISPUser() | ||||
|         user.email = params['email_user'] | ||||
|         user.org_id = self.test_org.id | ||||
|         self.test_usr = self.org_admin_connector.add_user(user) | ||||
|         self.user_connector = ExpandedPyMISP(params['url'], self.test_usr.authkey, ssl=False, debug=False) | ||||
|         self.user_connector = PyMISP(params['url'], self.test_usr.authkey, ssl=False, debug=False) | ||||
|         self.user_connector.toggle_global_pythonify() | ||||
| 
 | ||||
|         # Setup external_baseurl | ||||
|  | @ -138,7 +137,7 @@ class MISPInstance(): | |||
|         user.org_id = sync_org.id | ||||
|         user.role_id = 5  # Org admin | ||||
|         sync_user = self.site_admin_connector.add_user(user) | ||||
|         sync_user_connector = ExpandedPyMISP(self.site_admin_connector.root_url, sync_user.authkey, ssl=False, debug=False) | ||||
|         sync_user_connector = PyMISP(self.site_admin_connector.root_url, sync_user.authkey, ssl=False, debug=False) | ||||
|         sync_server_config = sync_user_connector.get_sync_config(pythonify=True) | ||||
|         self.sync.append((sync_org, sync_user, sync_server_config)) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Raphaël Vinot
						Raphaël Vinot