diff --git a/pymisp/api.py b/pymisp/api.py index 76e6880..d7046eb 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -243,13 +243,13 @@ class PyMISP: self.category_type_mapping = self.describe_types['category_type_mappings'] self.sane_default = self.describe_types['sane_defaults'] - def remote_acl(self, debug_type: str = 'findMissingFunctionNames') -> dict[str, Any]: + def remote_acl(self, debug_type: str = 'findMissingFunctionNames') -> dict[str, Any] | list[dict[str, Any]]: """This should return an empty list, unless the ACL is outdated. :param debug_type: printAllFunctionNames, findMissingFunctionNames, or printRoleAccess """ response = self._prepare_request('GET', f'events/queryACL/{debug_type}') - return self._check_json_response(response) + return self._check_json_response_list(response) @property def describe_types_local(self) -> dict[str, Any]: @@ -1210,16 +1210,19 @@ class PyMISP: taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) self.disable_taxonomy_tags(taxonomy_id) response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}') - return self._check_json_response(response) + try: + return self._check_json_response(response) + except PyMISPError: + return self._check_json_response_list(response) - def disable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]: + def disable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Disable all the tags of a taxonomy :param taxonomy: taxonomy with tags to disable """ taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}') - return self._check_json_response(response) + return self._check_json_response_list(response) def enable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]: """Enable all the tags of a taxonomy. NOTE: this is automatically done when you call enable_taxonomy @@ -1238,10 +1241,10 @@ class PyMISP: response = self._prepare_request('POST', url) return self._check_json_response(response) - def update_taxonomies(self) -> dict[str, Any] | list[dict[str, Any]]: + def update_taxonomies(self) -> dict[str, Any]: """Update all the taxonomies: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/updateTaxonomies""" response = self._prepare_request('POST', 'taxonomies/update') - return self._check_json_response_list(response) + return self._check_json_response(response) def set_taxonomy_required(self, taxonomy: MISPTaxonomy | int | str, required: bool = False) -> dict[str, Any]: taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) @@ -1330,18 +1333,18 @@ class PyMISP: warninglist_id = get_uuid_or_id_from_abstract_misp(warninglist) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False) - def values_in_warninglist(self, value: Iterable[str]) -> dict[str, Any] | list[dict[str, Any]]: + def values_in_warninglist(self, value: Iterable[str]) -> dict[str, Any]: """Check if IOC values are in warninglist :param value: iterator with values to check """ response = self._prepare_request('POST', 'warninglists/checkValue', data=value) - return self._check_json_response_list(response) + return self._check_json_response(response) - def update_warninglists(self) -> dict[str, Any] | list[dict[str, Any]]: + def update_warninglists(self) -> dict[str, Any]: """Update all the warninglists: https://www.misp-project.org/openapi/#tag/Warninglists/operation/updateWarninglists""" response = self._prepare_request('POST', 'warninglists/update') - return self._check_json_response_list(response) + return self._check_json_response(response) # ## END Warninglists ### @@ -1400,10 +1403,10 @@ class PyMISP: response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}') return self._check_json_response(response) - def update_noticelists(self) -> dict[str, Any] | list[dict[str, Any]]: + def update_noticelists(self) -> dict[str, Any]: """Update all the noticelists: https://www.misp-project.org/openapi/#tag/Noticelists/operation/updateNoticelists""" response = self._prepare_request('POST', 'noticelists/update') - return self._check_json_response_list(response) + return self._check_json_response(response) # ## END Noticelist ### @@ -1553,10 +1556,10 @@ class PyMISP: response.append(c) return response - def update_galaxies(self) -> dict[str, Any] | list[dict[str, Any]]: + def update_galaxies(self) -> dict[str, Any]: """Update all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/updateGalaxies""" response = self._prepare_request('POST', 'galaxies/update') - return self._check_json_response_list(response) + return self._check_json_response(response) def get_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPGalaxyCluster: """Gets a specific galaxy cluster @@ -1867,10 +1870,10 @@ class PyMISP: response = self._prepare_request('GET', 'feeds/cacheFeeds/misp') return self._check_json_response(response) - def compare_feeds(self) -> dict[str, Any]: + def compare_feeds(self) -> dict[str, Any] | list[dict[str, Any]]: """Generate the comparison matrix for all the MISP feeds""" response = self._prepare_request('GET', 'feeds/compareFeeds') - return self._check_json_response(response) + return self._check_json_response_list(response) def load_default_feeds(self) -> dict[str, Any]: """Load all the default feeds.""" @@ -2475,10 +2478,10 @@ class PyMISP: # ## BEGIN Decaying Models ### - def update_decaying_models(self) -> dict[str, Any] | list[dict[str, Any]]: + def update_decaying_models(self) -> dict[str, Any]: """Update all the Decaying models""" response = self._prepare_request('POST', 'decayingModel/update') - return self._check_json_response_list(response) + return self._check_json_response(response) def decaying_models(self, pythonify: bool = False) -> dict[str, Any] | list[MISPDecayingModel] | list[dict[str, Any]]: """Get all the decaying models @@ -2731,8 +2734,12 @@ class PyMISP: elif return_format not in ['json', 'yara-json']: return self._check_response(response) - # This one is truly fucked: event returns a list, attributes doesn't. - normalized_response = self._check_json_response(response) + normalized_response: list[dict[str, Any]] | dict[str, Any] + if controller in ['events', 'objects']: + # This one is truly fucked: event returns a list, attributes doesn't. + normalized_response = self._check_json_response_list(response) + elif controller == 'attributes': + normalized_response = self._check_json_response(response) if 'errors' in normalized_response: return normalized_response @@ -2749,7 +2756,7 @@ class PyMISP: to_return.append(me) elif controller == 'attributes': # FIXME: obvs, this is hurting my soul. We need something generic. - for a in normalized_response['Attribute']: + for a in normalized_response['Attribute']: # type: ignore[call-overload] ma = MISPAttribute() ma.from_dict(**a) if 'Event' in ma: @@ -3298,7 +3305,7 @@ class PyMISP: response = self._prepare_request('GET', f'tags/tagStatistics/{p}/{ns}') return self._check_json_response(response) - def users_statistics(self, context: str = 'data') -> dict[str, Any]: + def users_statistics(self, context: str = 'data') -> dict[str, Any] | list[dict[str, Any]]: """Get user statistics from the MISP instance :param context: one of 'data', 'orgs', 'users', 'tags', 'attributehistogram', 'sightings', 'galaxyMatrix' @@ -3307,7 +3314,10 @@ class PyMISP: if context not in availables_contexts: raise PyMISPError("context can only be {','.join(availables_contexts)}") response = self._prepare_request('GET', f'users/statistics/{context}') - return self._check_json_response(response) + try: + return self._check_json_response_list(response) + except PyMISPError: + return self._check_json_response(response) # ## END Statistics ### diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index a283201..9bab316 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -2007,7 +2007,7 @@ class MISPEvent(AbstractMISP): :param object_id: ID or UUID """ for o in self.objects: - if ((hasattr(o, 'id') and int(o.id) == int(object_id)) + if ((hasattr(o, 'id') and object_id.isdigit() and int(o.id) == int(object_id)) or (hasattr(o, 'uuid') and o.uuid == object_id)): o.delete() break diff --git a/pymisp/tools/peobject.py b/pymisp/tools/peobject.py index e83bbb3..08f35cf 100644 --- a/pymisp/tools/peobject.py +++ b/pymisp/tools/peobject.py @@ -16,6 +16,7 @@ from .abstractgenerator import AbstractMISPObjectGenerator from ..exceptions import InvalidMISPObject import lief +import lief.PE try: import pydeep # type: ignore @@ -201,7 +202,7 @@ class PESigners(AbstractMISPObjectGenerator): self.add_attribute('digest_algorithm', value=str(self.__signer.digest_algorithm)) self.add_attribute('encryption_algorithm', value=str(self.__signer.encryption_algorithm)) self.add_attribute('digest-base64', value=b64encode(self.__signer.encrypted_digest)) - info: lief.PE.SpcSpOpusInfo = self.__signer.get_attribute(lief.PE.SIG_ATTRIBUTE_TYPES.SPC_SP_OPUS_INFO) # type: ignore[attr-defined, assignment] + info: lief.PE.SpcSpOpusInfo = self.__signer.get_attribute(lief.PE.Attribute.TYPE.SPC_SP_OPUS_INFO) # type: ignore[assignment] if info: self.add_attribute('program-name', value=info.program_name) self.add_attribute('url', value=info.more_info) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index e2b7198..341784e 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -12,7 +12,7 @@ import unittest from datetime import datetime, timedelta, date, timezone from io import BytesIO from pathlib import Path -from typing import TypeVar, Type, Any +from typing import TypeVar, Any from uuid import uuid4 import urllib3