From 685ef22d0a1123c518093fba14dd2c1a0d153ad0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 2 Feb 2024 15:05:15 +0100 Subject: [PATCH] fix: revert typing changes. --- mypy.ini | 3 +- pymisp/api.py | 267 ++++++++++++++++++++++++-------------------------- 2 files changed, 131 insertions(+), 139 deletions(-) diff --git a/mypy.ini b/mypy.ini index b4fb74d..3f7aec0 100644 --- a/mypy.ini +++ b/mypy.ini @@ -6,8 +6,9 @@ pretty = True exclude = tests/testlive_comprehensive.py|tests/testlive_sync.py|feed-generator|examples|pymisp/data|docs|pymisp/tools/openioc.py|pymisp/tools/reportlab_generator.py|tests/test_reportlab.py # Stuff to remove gradually -# disallow_untyped_defs = False +disallow_untyped_defs = False disallow_untyped_calls = False +disable_error_code = arg-type,return-value,assignment,call-overload,union-attr [mypy-docs.source.*] diff --git a/pymisp/api.py b/pymisp/api.py index 3537a7d..ffbb08b 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -100,7 +100,7 @@ def register_user(misp_url: str, email: str, org_id: str | None = None, org_name: str | None = None, message: str | None = None, custom_perms: str | None = None, perm_sync: bool = False, perm_publish: bool = False, perm_admin: bool = False, - verify: bool = True) -> dict[str, Any]: + verify: bool = True) -> dict[str, Any] | list[dict[str, Any]]: """Ask for the creation of an account for the user with the given email address""" data = copy.deepcopy(locals()) if organisation: @@ -249,22 +249,22 @@ class PyMISP: :param debug_type: printAllFunctionNames, findMissingFunctionNames, or printRoleAccess """ response = self._prepare_request('GET', f'events/queryACL/{debug_type}') - return self._check_json_response_list(response) + return self._check_json_response(response) @property - def describe_types_local(self) -> dict[str, Any]: + def describe_types_local(self) -> dict[str, Any] | list[dict[str, Any]]: '''Returns the content of describe types from the package''' return describe_types @property - def describe_types_remote(self) -> dict[str, Any]: + def describe_types_remote(self) -> dict[str, Any] | list[dict[str, Any]]: '''Returns the content of describe types from the remote instance''' response = self._prepare_request('GET', 'attributes/describeTypes.json') remote_describe_types = self._check_json_response(response) return remote_describe_types['result'] @property - def recommended_pymisp_version(self) -> dict[str, Any]: + def recommended_pymisp_version(self) -> dict[str, Any] | list[dict[str, Any]]: """Returns the recommended API version from the server""" # Sine MISP 2.4.146 is recommended PyMISP version included in getVersion call misp_version = self.misp_instance_version @@ -275,17 +275,17 @@ class PyMISP: return self._check_json_response(response) @property - def version(self) -> dict[str, Any]: + def version(self) -> dict[str, Any] | list[dict[str, Any]]: """Returns the version of PyMISP you're currently using""" return {'version': __version__} @property - def pymisp_version_master(self) -> dict[str, Any]: + def pymisp_version_master(self) -> dict[str, Any] | list[dict[str, Any]]: """PyMISP version as defined in the main repository""" return self.pymisp_version_main @property - def pymisp_version_main(self) -> dict[str, Any]: + def pymisp_version_main(self) -> dict[str, Any] | list[dict[str, Any]]: """Get the most recent version of PyMISP from github""" r = requests.get('https://raw.githubusercontent.com/MISP/PyMISP/main/pyproject.toml') if r.status_code == 200: @@ -294,13 +294,13 @@ class PyMISP: return {'error': 'Impossible to retrieve the version of the main branch.'} @cached_property - def misp_instance_version(self) -> dict[str, Any]: + def misp_instance_version(self) -> dict[str, Any] | list[dict[str, Any]]: """Returns the version of the instance.""" response = self._prepare_request('GET', 'servers/getVersion') return self._check_json_response(response) @property - def misp_instance_version_master(self) -> dict[str, Any]: + def misp_instance_version_master(self) -> dict[str, Any] | list[dict[str, Any]]: """Get the most recent version from github""" r = requests.get('https://raw.githubusercontent.com/MISP/MISP/2.4/VERSION.json') if r.status_code == 200: @@ -308,12 +308,12 @@ class PyMISP: return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])} return {'error': 'Impossible to retrieve the version of the master branch.'} - def update_misp(self) -> dict[str, Any]: + def update_misp(self) -> dict[str, Any] | list[dict[str, Any]]: """Trigger a server update""" response = self._prepare_request('POST', 'servers/update') return self._check_json_response(response) - def set_server_setting(self, setting: str, value: str | int | bool, force: bool = False) -> dict[str, Any]: + def set_server_setting(self, setting: str, value: str | int | bool, force: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Set a setting on the MISP instance :param setting: server setting name @@ -324,7 +324,7 @@ class PyMISP: response = self._prepare_request('POST', f'servers/serverSettingsEdit/{setting}', data=data) return self._check_json_response(response) - def get_server_setting(self, setting: str) -> dict[str, Any]: + def get_server_setting(self, setting: str) -> dict[str, Any] | list[dict[str, Any]]: """Get a setting from the MISP instance :param setting: server setting name @@ -332,17 +332,17 @@ class PyMISP: response = self._prepare_request('GET', f'servers/getSetting/{setting}') return self._check_json_response(response) - def server_settings(self) -> dict[str, Any]: + def server_settings(self) -> dict[str, Any] | list[dict[str, Any]]: """Get all the settings from the server""" response = self._prepare_request('GET', 'servers/serverSettings') return self._check_json_response(response) - def restart_workers(self) -> dict[str, Any]: + def restart_workers(self) -> dict[str, Any] | list[dict[str, Any]]: """Restart all the workers""" response = self._prepare_request('POST', 'servers/restartWorkers') return self._check_json_response(response) - def db_schema_diagnostic(self) -> dict[str, Any]: + def db_schema_diagnostic(self) -> dict[str, Any] | list[dict[str, Any]]: """Get the schema diagnostic""" response = self._prepare_request('GET', 'servers/dbSchemaDiagnostic') return self._check_json_response(response) @@ -359,7 +359,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'events/index') - events_r = self._check_json_response_list(r) + events_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(events_r, dict): return events_r to_return = [] @@ -444,7 +444,7 @@ class PyMISP: e.load(updated_event) return e - def delete_event(self, event: MISPEvent | int | str | UUID) -> dict[str, Any]: + def delete_event(self, event: MISPEvent | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete an event from a MISP instance: https://www.misp-project.org/openapi/#tag/Events/operation/deleteEvent :param event: event to delete @@ -453,7 +453,7 @@ class PyMISP: response = self._prepare_request('POST', f'events/delete/{event_id}') return self._check_json_response(response) - def publish(self, event: MISPEvent | int | str | UUID, alert: bool = False) -> dict[str, Any]: + def publish(self, event: MISPEvent | int | str | UUID, alert: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Publish the event with one single HTTP POST: https://www.misp-project.org/openapi/#tag/Events/operation/publishEvent :param event: event to publish @@ -466,7 +466,7 @@ class PyMISP: response = self._prepare_request('POST', f'events/publish/{event_id}') return self._check_json_response(response) - def unpublish(self, event: MISPEvent | int | str | UUID) -> dict[str, Any]: + def unpublish(self, event: MISPEvent | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Unpublish the event with one single HTTP POST: https://www.misp-project.org/openapi/#tag/Events/operation/unpublishEvent :param event: event to unpublish @@ -475,7 +475,7 @@ class PyMISP: response = self._prepare_request('POST', f'events/unpublish/{event_id}') return self._check_json_response(response) - def contact_event_reporter(self, event: MISPEvent | int | str | UUID, message: str) -> dict[str, Any]: + def contact_event_reporter(self, event: MISPEvent | int | str | UUID, message: str) -> dict[str, Any] | list[dict[str, Any]]: """Send a message to the reporter of an event :param event: event with reporter to contact @@ -514,7 +514,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. """ r = self._prepare_request('GET', f'eventReports/index/event_id:{event_id}') - event_reports = self._check_json_response_list(r) + event_reports = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(event_reports, dict): return event_reports to_return = [] @@ -559,7 +559,7 @@ class PyMISP: er.from_dict(**updated_event_report) return er - def delete_event_report(self, event_report: MISPEventReport | int | str | UUID, hard: bool = False) -> dict[str, Any]: + def delete_event_report(self, event_report: MISPEventReport | int | str | UUID, hard: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Delete an event report from a MISP instance :param event_report: event report to delete @@ -638,7 +638,7 @@ class PyMISP: o.from_dict(**updated_object) return o - def delete_object(self, misp_object: MISPObject | int | str | UUID, hard: bool = False) -> dict[str, Any]: + def delete_object(self, misp_object: MISPObject | int | str | UUID, hard: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Delete an object from a MISP instance: https://www.misp-project.org/openapi/#tag/Objects/operation/deleteObject :param misp_object: object to delete @@ -669,7 +669,7 @@ class PyMISP: self, object_reference: MISPObjectReference | int | str | UUID, hard: bool = False, - ) -> dict[str, Any]: + ) -> dict[str, Any] | list[dict[str, Any]]: """Delete a reference to an object.""" object_reference_id = get_uuid_or_id_from_abstract_misp(object_reference) query_url = f"objectReferences/delete/{object_reference_id}" @@ -686,7 +686,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'objectTemplates/index') - templates = self._check_json_response_list(r) + templates = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(templates, dict): return templates to_return = [] @@ -711,7 +711,7 @@ class PyMISP: t.from_dict(**object_template_r) return t - def get_raw_object_template(self, uuid_or_name: str) -> dict[str, Any]: + def get_raw_object_template(self, uuid_or_name: str) -> dict[str, Any] | list[dict[str, Any]]: """Get a row template. It needs to be present on disk on the MISP instance you're connected to. The response of this method can be passed to MISPObject(, misp_objects_template_custom=) """ @@ -721,7 +721,7 @@ class PyMISP: def update_object_templates(self) -> dict[str, Any] | list[dict[str, Any]]: """Trigger an update of the object templates""" response = self._prepare_request('POST', 'objectTemplates/update') - return self._check_json_response_list(response) + return self._check_json_response(response) # ## END Object ### @@ -733,7 +733,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'attributes/index') - attributes_r = self._check_json_response_list(r) + attributes_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(attributes_r, dict): return attributes_r to_return = [] @@ -839,7 +839,7 @@ class PyMISP: a.from_dict(**updated_attribute) return a - def delete_attribute(self, attribute: MISPAttribute | int | str | UUID, hard: bool = False) -> dict[str, Any]: + def delete_attribute(self, attribute: MISPAttribute | int | str | UUID, hard: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Delete an attribute from a MISP instance: https://www.misp-project.org/openapi/#tag/Attributes/operation/deleteAttribute :param attribute: attribute to delete @@ -888,7 +888,7 @@ class PyMISP: r = self._prepare_request('GET', f'shadowAttributes/index/{event_id}') else: r = self._prepare_request('GET', 'shadowAttributes/index') - attribute_proposals = self._check_json_response_list(r) + attribute_proposals = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(attribute_proposals, dict): return attribute_proposals to_return = [] @@ -947,7 +947,7 @@ class PyMISP: a.from_dict(**update_attribute_proposal) return a - def delete_attribute_proposal(self, attribute: MISPAttribute | int | str | UUID) -> dict[str, Any]: + def delete_attribute_proposal(self, attribute: MISPAttribute | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Propose the deletion of an attribute :param attribute: attribute to delete @@ -956,7 +956,7 @@ class PyMISP: response = self._prepare_request('POST', f'shadowAttributes/delete/{attribute_id}') return self._check_json_response(response) - def accept_attribute_proposal(self, proposal: MISPShadowAttribute | int | str | UUID) -> dict[str, Any]: + def accept_attribute_proposal(self, proposal: MISPShadowAttribute | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Accept a proposal. You cannot modify an existing proposal, only accept/discard :param proposal: attribute proposal to accept @@ -965,7 +965,7 @@ class PyMISP: response = self._prepare_request('POST', f'shadowAttributes/accept/{proposal_id}') return self._check_json_response(response) - def discard_attribute_proposal(self, proposal: MISPShadowAttribute | int | str | UUID) -> dict[str, Any]: + def discard_attribute_proposal(self, proposal: MISPShadowAttribute | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Discard a proposal. You cannot modify an existing proposal, only accept/discard :param proposal: attribute proposal to discard @@ -1002,7 +1002,7 @@ class PyMISP: to_post['org_id'] = org_id r = self._prepare_request('POST', url, data=to_post) - sightings = self._check_json_response_list(r) + sightings = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(sightings, dict): return sightings to_return = [] @@ -1034,7 +1034,7 @@ class PyMISP: s.from_dict(**new_sighting) return s - def delete_sighting(self, sighting: MISPSighting | int | str | UUID) -> dict[str, Any]: + def delete_sighting(self, sighting: MISPSighting | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a sighting from a MISP instance: https://www.misp-project.org/openapi/#tag/Sightings/operation/deleteSighting :param sighting: sighting to delete @@ -1131,7 +1131,7 @@ class PyMISP: t.from_dict(**updated_tag) return t - def delete_tag(self, tag: MISPTag | int | str | UUID) -> dict[str, Any]: + def delete_tag(self, tag: MISPTag | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a tag from a MISP instance: https://www.misp-project.org/openapi/#tag/Tags/operation/deleteTag :param tag: tag to delete @@ -1148,7 +1148,7 @@ class PyMISP: """ query = {'tagname': tagname, 'strict_tagname': strict_tagname} response = self._prepare_request('POST', 'tags/search', data=query) - normalized_response = self._check_json_response_list(response) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or isinstance(normalized_response, dict): return normalized_response to_return: list[MISPTag] = [] @@ -1168,7 +1168,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'taxonomies/index') - taxonomies = self._check_json_response_list(r) + taxonomies = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(taxonomies, dict): return taxonomies to_return = [] @@ -1193,7 +1193,7 @@ class PyMISP: t.from_dict(**taxonomy_r) return t - def enable_taxonomy(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]: + def enable_taxonomy(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Enable a taxonomy: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/enableTaxonomy :param taxonomy: taxonomy to enable @@ -1213,7 +1213,7 @@ class PyMISP: try: return self._check_json_response(response) except PyMISPError: - return self._check_json_response_list(response) + return self._check_json_response(response) def disable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Disable all the tags of a taxonomy @@ -1222,9 +1222,9 @@ class PyMISP: """ taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}') - return self._check_json_response_list(response) + return self._check_json_response(response) - def enable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]: + def enable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Enable all the tags of a taxonomy. NOTE: this is automatically done when you call enable_taxonomy :param taxonomy: taxonomy with tags to enable @@ -1241,12 +1241,12 @@ class PyMISP: response = self._prepare_request('POST', url) return self._check_json_response(response) - def update_taxonomies(self) -> dict[str, Any]: + def update_taxonomies(self) -> dict[str, Any] | list[dict[str, Any]]: """Update all the taxonomies: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/updateTaxonomies""" response = self._prepare_request('POST', 'taxonomies/update') return self._check_json_response(response) - def set_taxonomy_required(self, taxonomy: MISPTaxonomy | int | str, required: bool = False) -> dict[str, Any]: + def set_taxonomy_required(self, taxonomy: MISPTaxonomy | int | str, required: bool = False) -> dict[str, Any] | list[dict[str, Any]]: taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) url = urljoin(self.root_url, f'taxonomies/toggleRequired/{taxonomy_id}') payload = { @@ -1292,7 +1292,7 @@ class PyMISP: w.from_dict(**wl) return w - def toggle_warninglist(self, warninglist_id: str | int | list[int] | None = None, warninglist_name: str | list[str] | None = None, force_enable: bool = False) -> dict[str, Any]: + def toggle_warninglist(self, warninglist_id: str | int | list[int] | None = None, warninglist_name: str | list[str] | None = None, force_enable: bool = False) -> dict[str, Any] | list[dict[str, Any]]: '''Toggle (enable/disable) the status of a warninglist by id: https://www.misp-project.org/openapi/#tag/Warninglists/operation/toggleEnableWarninglist :param warninglist_id: ID of the WarningList @@ -1317,7 +1317,7 @@ class PyMISP: response = self._prepare_request('POST', 'warninglists/toggleEnable', data=query) return self._check_json_response(response) - def enable_warninglist(self, warninglist: MISPWarninglist | int | str | UUID) -> dict[str, Any]: + def enable_warninglist(self, warninglist: MISPWarninglist | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Enable a warninglist :param warninglist: warninglist to enable @@ -1325,7 +1325,7 @@ class PyMISP: warninglist_id = get_uuid_or_id_from_abstract_misp(warninglist) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=True) - def disable_warninglist(self, warninglist: MISPWarninglist | int | str | UUID) -> dict[str, Any]: + def disable_warninglist(self, warninglist: MISPWarninglist | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Disable a warninglist :param warninglist: warninglist to disable @@ -1340,11 +1340,11 @@ class PyMISP: """ response = self._prepare_request('POST', 'warninglists/checkValue', data=value) try: - return self._check_json_response_list(response) + return self._check_json_response(response) except PyMISPError: return self._check_json_response(response) - def update_warninglists(self) -> dict[str, Any]: + def update_warninglists(self) -> dict[str, Any] | list[dict[str, Any]]: """Update all the warninglists: https://www.misp-project.org/openapi/#tag/Warninglists/operation/updateWarninglists""" response = self._prepare_request('POST', 'warninglists/update') return self._check_json_response(response) @@ -1359,7 +1359,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'noticelists/index') - noticelists = self._check_json_response_list(r) + noticelists = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(noticelists, dict): return noticelists to_return = [] @@ -1384,7 +1384,7 @@ class PyMISP: n.from_dict(**noticelist_j) return n - def enable_noticelist(self, noticelist: MISPNoticelist | int | str | UUID) -> dict[str, Any]: + def enable_noticelist(self, noticelist: MISPNoticelist | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Enable a noticelist by id: https://www.misp-project.org/openapi/#tag/Noticelists/operation/toggleEnableNoticelist :param noticelist: Noticelist to enable @@ -1395,7 +1395,7 @@ class PyMISP: response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}/true') return self._check_json_response(response) - def disable_noticelist(self, noticelist: MISPNoticelist | int | str | UUID) -> dict[str, Any]: + def disable_noticelist(self, noticelist: MISPNoticelist | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Disable a noticelist by id :param noticelist: Noticelist to disable @@ -1406,7 +1406,7 @@ class PyMISP: response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}') return self._check_json_response(response) - def update_noticelists(self) -> dict[str, Any]: + def update_noticelists(self) -> dict[str, Any] | list[dict[str, Any]]: """Update all the noticelists: https://www.misp-project.org/openapi/#tag/Noticelists/operation/updateNoticelists""" response = self._prepare_request('POST', 'noticelists/update') return self._check_json_response(response) @@ -1421,7 +1421,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'correlation_exclusions') - correlation_exclusions = self._check_json_response_list(r) + correlation_exclusions = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(correlation_exclusions, dict): return correlation_exclusions to_return = [] @@ -1460,7 +1460,7 @@ class PyMISP: c.from_dict(**new_correlation_exclusion) return c - def delete_correlation_exclusion(self, correlation_exclusion: MISPCorrelationExclusion | int | str | UUID) -> dict[str, Any]: + def delete_correlation_exclusion(self, correlation_exclusion: MISPCorrelationExclusion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a correlation exclusion :param correlation_exclusion: The MISPCorrelationExclusion you wish to delete from MISP @@ -1469,7 +1469,7 @@ class PyMISP: r = self._prepare_request('POST', f'correlation_exclusions/delete/{exclusion_id}') return self._check_json_response(r) - def clean_correlation_exclusions(self) -> dict[str, Any]: + def clean_correlation_exclusions(self) -> dict[str, Any] | list[dict[str, Any]]: """Initiate correlation exclusions cleanup""" r = self._prepare_request('POST', 'correlation_exclusions/clean') return self._check_json_response(r) @@ -1488,7 +1488,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'galaxies/index') - galaxies = self._check_json_response_list(r) + galaxies = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(galaxies, dict): return galaxies to_return = [] @@ -1506,7 +1506,7 @@ class PyMISP: ) -> dict[str, Any] | list[MISPGalaxy] | list[dict[str, Any]]: """Text search to find a matching galaxy name, namespace, description, or uuid.""" r = self._prepare_request("POST", "galaxies", data={"value": value}) - galaxies = self._check_json_response_list(r) + galaxies = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(galaxies, dict): return galaxies to_return = [] @@ -1549,7 +1549,7 @@ class PyMISP: if searchall: kw_params["searchall"] = searchall r = self._prepare_request('POST', f"galaxy_clusters/index/{galaxy_id}", data=kw_params) - clusters_j = self._check_json_response_list(r) + clusters_j = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(clusters_j, dict): return clusters_j response = [] @@ -1559,7 +1559,7 @@ class PyMISP: response.append(c) return response - def update_galaxies(self) -> dict[str, Any]: + def update_galaxies(self) -> dict[str, Any] | list[dict[str, Any]]: """Update all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/updateGalaxies""" response = self._prepare_request('POST', 'galaxies/update') return self._check_json_response(response) @@ -1619,7 +1619,7 @@ class PyMISP: gc.from_dict(**cluster_j) return gc - def publish_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID) -> dict[str, Any]: + def publish_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Publishes a galaxy cluster: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/publishGalaxyCluster :param galaxy_cluster: The galaxy cluster you wish to publish @@ -1655,7 +1655,7 @@ class PyMISP: gc.from_dict(**cluster_j) return gc - def delete_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID, hard: bool=False) -> dict[str, Any]: + def delete_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID, hard: bool=False) -> dict[str, Any] | list[dict[str, Any]]: """Deletes a galaxy cluster from MISP: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/deleteGalaxyCluster :param galaxy_cluster: The MISPGalaxyCluster you wish to delete from MISP @@ -1671,7 +1671,7 @@ class PyMISP: r = self._prepare_request('POST', f'galaxy_clusters/delete/{cluster_id}', data=data) return self._check_json_response(r) - def add_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> dict[str, Any]: + def add_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> dict[str, Any] | list[dict[str, Any]]: """Add a galaxy cluster relation, cluster relation must include cluster UUIDs in both directions @@ -1681,7 +1681,7 @@ class PyMISP: cluster_rel_j = self._check_json_response(r) return cluster_rel_j - def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> dict[str, Any]: + def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> dict[str, Any] | list[dict[str, Any]]: """Update a galaxy cluster relation :param galaxy_cluster_relation: The MISPGalaxyClusterRelation to update @@ -1691,7 +1691,7 @@ class PyMISP: cluster_rel_j = self._check_json_response(r) return cluster_rel_j - def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation | int | str | UUID) -> dict[str, Any]: + def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a galaxy cluster relation :param galaxy_cluster_relation: The MISPGalaxyClusterRelation to delete @@ -1711,7 +1711,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'feeds/index') - feeds = self._check_json_response_list(r) + feeds = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(feeds, dict): return feeds to_return = [] @@ -1831,7 +1831,7 @@ class PyMISP: f.from_dict(**updated_feed) return f - def delete_feed(self, feed: MISPFeed | int | str | UUID) -> dict[str, Any]: + def delete_feed(self, feed: MISPFeed | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a feed from a MISP instance :param feed: feed to delete @@ -1840,7 +1840,7 @@ class PyMISP: response = self._prepare_request('POST', f'feeds/delete/{feed_id}') return self._check_json_response(response) - def fetch_feed(self, feed: MISPFeed | int | str | UUID) -> dict[str, Any]: + def fetch_feed(self, feed: MISPFeed | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Fetch one single feed by id: https://www.misp-project.org/openapi/#tag/Feeds/operation/fetchFromFeed :param feed: feed to fetch @@ -1849,12 +1849,12 @@ class PyMISP: response = self._prepare_request('GET', f'feeds/fetchFromFeed/{feed_id}') return self._check_json_response(response) - def cache_all_feeds(self) -> dict[str, Any]: + def cache_all_feeds(self) -> dict[str, Any] | list[dict[str, Any]]: """ Cache all the feeds: https://www.misp-project.org/openapi/#tag/Feeds/operation/cacheFeeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/all') return self._check_json_response(response) - def cache_feed(self, feed: MISPFeed | int | str | UUID) -> dict[str, Any]: + def cache_feed(self, feed: MISPFeed | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Cache a specific feed by id: https://www.misp-project.org/openapi/#tag/Feeds/operation/cacheFeeds :param feed: feed to cache @@ -1863,12 +1863,12 @@ class PyMISP: response = self._prepare_request('GET', f'feeds/cacheFeeds/{feed_id}') return self._check_json_response(response) - def cache_freetext_feeds(self) -> dict[str, Any]: + def cache_freetext_feeds(self) -> dict[str, Any] | list[dict[str, Any]]: """Cache all the freetext feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/freetext') return self._check_json_response(response) - def cache_misp_feeds(self) -> dict[str, Any]: + def cache_misp_feeds(self) -> dict[str, Any] | list[dict[str, Any]]: """Cache all the MISP feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/misp') return self._check_json_response(response) @@ -1876,9 +1876,9 @@ class PyMISP: def compare_feeds(self) -> dict[str, Any] | list[dict[str, Any]]: """Generate the comparison matrix for all the MISP feeds""" response = self._prepare_request('GET', 'feeds/compareFeeds') - return self._check_json_response_list(response) + return self._check_json_response(response) - def load_default_feeds(self) -> dict[str, Any]: + def load_default_feeds(self) -> dict[str, Any] | list[dict[str, Any]]: """Load all the default feeds.""" response = self._prepare_request('POST', 'feeds/loadDefaultFeeds') return self._check_json_response(response) @@ -1893,7 +1893,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'servers/index') - servers = self._check_json_response_list(r) + servers = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(servers, dict): return servers to_return = [] @@ -1964,7 +1964,7 @@ class PyMISP: s.from_dict(**updated_server) return s - def delete_server(self, server: MISPServer | int | str | UUID) -> dict[str, Any]: + def delete_server(self, server: MISPServer | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a sync server: https://www.misp-project.org/openapi/#tag/Servers/operation/getServers :param server: sync server config @@ -1973,7 +1973,7 @@ class PyMISP: response = self._prepare_request('POST', f'servers/delete/{server_id}') return self._check_json_response(response) - def server_pull(self, server: MISPServer | int | str | UUID, event: MISPEvent | int | str | UUID | None = None) -> dict[str, Any]: + def server_pull(self, server: MISPServer | int | str | UUID, event: MISPEvent | int | str | UUID | None = None) -> dict[str, Any] | list[dict[str, Any]]: """Initialize a pull from a sync server, optionally limited to one event: https://www.misp-project.org/openapi/#tag/Servers/operation/pullServer :param server: sync server config @@ -1989,7 +1989,7 @@ class PyMISP: # FIXME: can we pythonify? return self._check_json_response(response) - def server_push(self, server: MISPServer | int | str | UUID, event: MISPEvent | int | str | UUID | None = None) -> dict[str, Any]: + def server_push(self, server: MISPServer | int | str | UUID, event: MISPEvent | int | str | UUID | None = None) -> dict[str, Any] | list[dict[str, Any]]: """Initialize a push to a sync server, optionally limited to one event: https://www.misp-project.org/openapi/#tag/Servers/operation/pushServer :param server: sync server config @@ -2005,7 +2005,7 @@ class PyMISP: # FIXME: can we pythonify? return self._check_json_response(response) - def test_server(self, server: MISPServer | int | str | UUID) -> dict[str, Any]: + def test_server(self, server: MISPServer | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Test if a sync link is working as expected :param server: sync server config @@ -2024,7 +2024,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'sharingGroups/index') - sharing_groups = self._check_json_response_list(r) + sharing_groups = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(sharing_groups, dict): return sharing_groups to_return = [] @@ -2092,7 +2092,7 @@ class PyMISP: r = self._prepare_request('HEAD', f'sharing_groups/view/{sharing_group_id}') return self._check_head_response(r) - def delete_sharing_group(self, sharing_group: MISPSharingGroup | int | str | UUID) -> dict[str, Any]: + def delete_sharing_group(self, sharing_group: MISPSharingGroup | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/deleteSharingGroup :param sharing_group: sharing group to delete @@ -2102,7 +2102,7 @@ class PyMISP: return self._check_json_response(response) def add_org_to_sharing_group(self, sharing_group: MISPSharingGroup | int | str | UUID, - organisation: MISPOrganisation | int | str | UUID, extend: bool = False) -> dict[str, Any]: + organisation: MISPOrganisation | int | str | UUID, extend: bool = False) -> dict[str, Any] | list[dict[str, Any]]: '''Add an organisation to a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/addOrganisationToSharingGroup :param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID @@ -2116,7 +2116,7 @@ class PyMISP: return self._check_json_response(response) def remove_org_from_sharing_group(self, sharing_group: MISPSharingGroup | int | str | UUID, - organisation: MISPOrganisation | int | str | UUID) -> dict[str, Any]: + organisation: MISPOrganisation | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: '''Remove an organisation from a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/removeOrganisationFromSharingGroup :param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID @@ -2129,7 +2129,7 @@ class PyMISP: return self._check_json_response(response) def add_server_to_sharing_group(self, sharing_group: MISPSharingGroup | int | str | UUID, - server: MISPServer | int | str | UUID, all_orgs: bool = False) -> dict[str, Any]: + server: MISPServer | int | str | UUID, all_orgs: bool = False) -> dict[str, Any] | list[dict[str, Any]]: '''Add a server to a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/addServerToSharingGroup :param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID @@ -2143,7 +2143,7 @@ class PyMISP: return self._check_json_response(response) def remove_server_from_sharing_group(self, sharing_group: MISPSharingGroup | int | str | UUID, - server: MISPServer | int | str | UUID) -> dict[str, Any]: + server: MISPServer | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: '''Remove a server from a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/removeServerFromSharingGroup :param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID @@ -2171,7 +2171,7 @@ class PyMISP: url_path += f"/searchall:{search}" r = self._prepare_request('GET', url_path) - organisations = self._check_json_response_list(r) + organisations = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(organisations, dict): return organisations to_return = [] @@ -2238,7 +2238,7 @@ class PyMISP: o.from_dict(**organisation) return o - def delete_organisation(self, organisation: MISPOrganisation | int | str | UUID) -> dict[str, Any]: + def delete_organisation(self, organisation: MISPOrganisation | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete an organisation by id: https://www.misp-project.org/openapi/#tag/Organisations/operation/deleteOrganisation :param organisation: organization to delete @@ -2266,7 +2266,7 @@ class PyMISP: organisation_id = get_uuid_or_id_from_abstract_misp(organisation) urlpath += f"/searchorg:{organisation_id}" r = self._prepare_request('GET', urlpath) - users = self._check_json_response_list(r) + users = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(users, dict): return users to_return = [] @@ -2352,7 +2352,7 @@ class PyMISP: e.from_dict(**updated_user) return e - def delete_user(self, user: MISPUser | int | str | UUID) -> dict[str, Any]: + def delete_user(self, user: MISPUser | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a user by id: https://www.misp-project.org/openapi/#tag/Users/operation/deleteUser :param user: user to delete @@ -2362,7 +2362,7 @@ class PyMISP: response = self._prepare_request('POST', f'admin/users/delete/{user_id}') return self._check_json_response(response) - def change_user_password(self, new_password: str) -> dict[str, Any]: + def change_user_password(self, new_password: str) -> dict[str, Any] | list[dict[str, Any]]: """Change the password of the curent user: :param new_password: password to set @@ -2376,7 +2376,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'users/registrations/index') - registrations = self._check_json_response_list(r) + registrations = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(registrations, dict): return registrations to_return = [] @@ -2391,7 +2391,7 @@ class PyMISP: role: MISPRole | int | str | None = None, perm_sync: bool = False, perm_publish: bool = False, perm_admin: bool = False, - unsafe_fallback: bool = False) -> dict[str, Any]: + unsafe_fallback: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Accept a user registration :param registration: the registration to accept @@ -2438,7 +2438,7 @@ class PyMISP: r = self._prepare_request('POST', f'users/acceptRegistrations/{registration_id}', data=to_post) return self._check_json_response(r) - def discard_user_registration(self, registration: MISPInbox | int | str | UUID) -> dict[str, Any]: + def discard_user_registration(self, registration: MISPInbox | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Discard a user registration :param registration: the registration to discard @@ -2457,7 +2457,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'roles/index') - roles = self._check_json_response_list(r) + roles = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(roles, dict): return roles to_return = [] @@ -2467,7 +2467,7 @@ class PyMISP: to_return.append(nr) return to_return - def set_default_role(self, role: MISPRole | int | str | UUID) -> dict[str, Any]: + def set_default_role(self, role: MISPRole | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Set a default role for the new user accounts :param role: the default role to set @@ -2481,7 +2481,7 @@ class PyMISP: # ## BEGIN Decaying Models ### - def update_decaying_models(self) -> dict[str, Any]: + def update_decaying_models(self) -> dict[str, Any] | list[dict[str, Any]]: """Update all the Decaying models""" response = self._prepare_request('POST', 'decayingModel/update') return self._check_json_response(response) @@ -2492,7 +2492,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output """ r = self._prepare_request('GET', 'decayingModel/index') - models = self._check_json_response_list(r) + models = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(models, dict): return models to_return = [] @@ -2502,7 +2502,7 @@ class PyMISP: to_return.append(n) return to_return - def enable_decaying_model(self, decaying_model: MISPDecayingModel | int | str) -> dict[str, Any]: + def enable_decaying_model(self, decaying_model: MISPDecayingModel | int | str) -> dict[str, Any] | list[dict[str, Any]]: """Enable a decaying Model""" if isinstance(decaying_model, MISPDecayingModel): decaying_model_id = decaying_model.id @@ -2511,7 +2511,7 @@ class PyMISP: response = self._prepare_request('POST', f'decayingModel/enable/{decaying_model_id}') return self._check_json_response(response) - def disable_decaying_model(self, decaying_model: MISPDecayingModel | int | str) -> dict[str, Any]: + def disable_decaying_model(self, decaying_model: MISPDecayingModel | int | str) -> dict[str, Any] | list[dict[str, Any]]: """Disable a decaying Model""" if isinstance(decaying_model, MISPDecayingModel): decaying_model_id = decaying_model.id @@ -2740,7 +2740,7 @@ class PyMISP: normalized_response: list[dict[str, Any]] | dict[str, Any] if controller in ['events', 'objects']: # This one is truly fucked: event returns a list, attributes doesn't. - normalized_response = self._check_json_response_list(response) + normalized_response = self._check_json_response(response) elif controller == 'attributes': normalized_response = self._check_json_response(response) @@ -2882,7 +2882,7 @@ class PyMISP: query["direction"] = "desc" if desc else "asc" url = urljoin(self.root_url, 'events/index') response = self._prepare_request('POST', url, data=query) - normalized_response = self._check_json_response_list(response) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or isinstance(normalized_response, dict): return normalized_response @@ -2958,7 +2958,7 @@ class PyMISP: url = urljoin(self.root_url, url_path) response = self._prepare_request('POST', url, data=query) - normalized_response = self._check_json_response_list(response) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or isinstance(normalized_response, dict): return normalized_response @@ -3019,7 +3019,7 @@ class PyMISP: query['created'] = query.pop('created').timestamp() response = self._prepare_request('POST', 'admin/logs/index', data=query) - normalized_response = self._check_json_response_list(response) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or isinstance(normalized_response, dict): return normalized_response @@ -3033,7 +3033,7 @@ class PyMISP: def search_feeds(self, value: SearchParameterTypes | None = None, pythonify: bool | None = False) -> dict[str, Any] | list[MISPFeed] | list[dict[str, Any]]: '''Search in the feeds cached on the servers''' response = self._prepare_request('POST', 'feeds/searchCaches', data={'value': value}) - normalized_response = self._check_json_response_list(response) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or isinstance(normalized_response, dict): return normalized_response to_return = [] @@ -3053,7 +3053,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'communities/index') - communities = self._check_json_response_list(r) + communities = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(communities, dict): return communities to_return = [] @@ -3086,7 +3086,7 @@ class PyMISP: requestor_organisation_description: str | None = None, message: str | None = None, sync: bool = False, anonymise_requestor_server: bool = False, - mock: bool = False) -> dict[str, Any]: + mock: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Request the access to a community :param community: community to request access @@ -3120,7 +3120,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'eventDelegations') - delegations = self._check_json_response_list(r) + delegations = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(delegations, dict): return delegations to_return = [] @@ -3130,7 +3130,7 @@ class PyMISP: to_return.append(d) return to_return - def accept_event_delegation(self, delegation: MISPEventDelegation | int | str, pythonify: bool = False) -> dict[str, Any]: + def accept_event_delegation(self, delegation: MISPEventDelegation | int | str, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Accept the delegation of an event :param delegation: event delegation to accept @@ -3140,7 +3140,7 @@ class PyMISP: r = self._prepare_request('POST', f'eventDelegations/acceptDelegation/{delegation_id}') return self._check_json_response(r) - def discard_event_delegation(self, delegation: MISPEventDelegation | int | str, pythonify: bool = False) -> dict[str, Any]: + def discard_event_delegation(self, delegation: MISPEventDelegation | int | str, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Discard the delegation of an event :param delegation: event delegation to discard @@ -3183,7 +3183,7 @@ class PyMISP: # ## BEGIN Others ### - def push_event_to_ZMQ(self, event: MISPEvent | int | str | UUID) -> dict[str, Any]: + def push_event_to_ZMQ(self, event: MISPEvent | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Force push an event by id on ZMQ :param event: the event to push @@ -3231,7 +3231,7 @@ class PyMISP: if returnMetaAttributes: query['returnMetaAttributes'] = returnMetaAttributes r = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query, **kwargs) - attributes = self._check_json_response_list(r) + attributes = self._check_json_response(r) if returnMetaAttributes or not (self.global_pythonify or pythonify) or isinstance(attributes, dict): return attributes to_return = [] @@ -3273,7 +3273,7 @@ class PyMISP: # ## BEGIN Statistics ### - def attributes_statistics(self, context: str = 'type', percentage: bool = False) -> dict[str, Any]: + def attributes_statistics(self, context: str = 'type', percentage: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Get attribute statistics from the MISP instance :param context: "type" or "category" @@ -3289,7 +3289,7 @@ class PyMISP: response = self._prepare_request('GET', path) return self._check_json_response(response) - def tags_statistics(self, percentage: bool = False, name_sort: bool = False) -> dict[str, Any]: + def tags_statistics(self, percentage: bool = False, name_sort: bool = False) -> dict[str, Any] | list[dict[str, Any]]: """Get tag statistics from the MISP instance :param percentage: get percentages @@ -3318,7 +3318,7 @@ class PyMISP: raise PyMISPError("context can only be {','.join(availables_contexts)}") response = self._prepare_request('GET', f'users/statistics/{context}') try: - return self._check_json_response_list(response) + return self._check_json_response(response) except PyMISPError: return self._check_json_response(response) @@ -3332,7 +3332,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'userSettings/index') - user_settings = self._check_json_response_list(r) + user_settings = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(user_settings, dict): return user_settings to_return = [] @@ -3384,7 +3384,7 @@ class PyMISP: u.from_dict(**user_setting_j) return u - def delete_user_setting(self, user_setting: str, user: MISPUser | int | str | UUID | None = None) -> dict[str, Any]: + def delete_user_setting(self, user_setting: str, user: MISPUser | int | str | UUID | None = None) -> dict[str, Any] | list[dict[str, Any]]: """Delete a user setting: https://www.misp-project.org/openapi/#tag/UserSettings/operation/deleteUserSettingById :param user_setting: name of user setting @@ -3406,7 +3406,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'eventBlocklists/index') - event_blocklists = self._check_json_response_list(r) + event_blocklists = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(event_blocklists, dict): return event_blocklists to_return = [] @@ -3422,7 +3422,7 @@ class PyMISP: :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM """ r = self._prepare_request('GET', 'orgBlocklists/index') - organisation_blocklists = self._check_json_response_list(r) + organisation_blocklists = self._check_json_response(r) if not (self.global_pythonify or pythonify) or isinstance(organisation_blocklists, dict): return organisation_blocklists to_return = [] @@ -3432,7 +3432,7 @@ class PyMISP: to_return.append(obl) return to_return - def _add_entries_to_blocklist(self, blocklist_type: str, uuids: str | list[str], **kwargs) -> dict[str, Any]: # type: ignore[no-untyped-def] + def _add_entries_to_blocklist(self, blocklist_type: str, uuids: str | list[str], **kwargs) -> dict[str, Any] | list[dict[str, Any]]: # type: ignore[no-untyped-def] if blocklist_type == 'event': url = 'eventBlocklists/add' elif blocklist_type == 'organisation': @@ -3448,7 +3448,7 @@ class PyMISP: return self._check_json_response(r) def add_event_blocklist(self, uuids: str | list[str], comment: str | None = None, - event_info: str | None = None, event_orgc: str | None = None) -> dict[str, Any]: + event_info: str | None = None, event_orgc: str | None = None) -> dict[str, Any] | list[dict[str, Any]]: """Add a new event in the blocklist :param uuids: UUIDs @@ -3459,7 +3459,7 @@ class PyMISP: return self._add_entries_to_blocklist('event', uuids=uuids, comment=comment, event_info=event_info, event_orgc=event_orgc) def add_organisation_blocklist(self, uuids: str | list[str], comment: str | None = None, - org_name: str | None = None) -> dict[str, Any]: + org_name: str | None = None) -> dict[str, Any] | list[dict[str, Any]]: """Add a new organisation in the blocklist :param uuids: UUIDs @@ -3468,7 +3468,7 @@ class PyMISP: """ return self._add_entries_to_blocklist('organisation', uuids=uuids, comment=comment, org_name=org_name) - def _update_entries_in_blocklist(self, blocklist_type: str, uuid, **kwargs) -> dict[str, Any]: # type: ignore[no-untyped-def] + def _update_entries_in_blocklist(self, blocklist_type: str, uuid, **kwargs) -> dict[str, Any] | list[dict[str, Any]]: # type: ignore[no-untyped-def] if blocklist_type == 'event': url = f'eventBlocklists/edit/{uuid}' elif blocklist_type == 'organisation': @@ -3515,7 +3515,7 @@ class PyMISP: o.from_dict(**updated_organisation_blocklist) return o - def delete_event_blocklist(self, event_blocklist: MISPEventBlocklist | str | UUID) -> dict[str, Any]: + def delete_event_blocklist(self, event_blocklist: MISPEventBlocklist | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a blocklisted event by id :param event_blocklist: event block list to delete @@ -3524,7 +3524,7 @@ class PyMISP: response = self._prepare_request('POST', f'eventBlocklists/delete/{event_blocklist_id}') return self._check_json_response(response) - def delete_organisation_blocklist(self, organisation_blocklist: MISPOrganisationBlocklist | str | UUID) -> dict[str, Any]: + def delete_organisation_blocklist(self, organisation_blocklist: MISPOrganisationBlocklist | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: """Delete a blocklisted organisation by id :param organisation_blocklist: organization block list to delete @@ -3561,7 +3561,7 @@ class PyMISP: raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') def tag(self, misp_entity: AbstractMISP | str | dict[str, Any], tag: MISPTag | str, - local: bool = False, relationship_type: str | None = None) -> dict[str, Any]: + local: bool = False, relationship_type: str | None = None) -> dict[str, Any] | list[dict[str, Any]]: """Tag an event or an attribute. :param misp_entity: a MISPEvent, a MISP Attribute, or a UUID @@ -3580,7 +3580,7 @@ class PyMISP: response = self._prepare_request('POST', 'tags/attachTagToObject', data=to_post) return self._check_json_response(response) - def untag(self, misp_entity: AbstractMISP | str | dict[str, Any], tag: MISPTag | str) -> dict[str, Any]: + def untag(self, misp_entity: AbstractMISP | str | dict[str, Any], tag: MISPTag | str) -> dict[str, Any] | list[dict[str, Any]]: """Untag an event or an attribute :param misp_entity: misp_entity can be a UUID @@ -3690,18 +3690,9 @@ class PyMISP: return value return value - def _check_json_response_list(self, response: requests.Response) -> dict[str, Any] | list[dict[str, Any]]: + def _check_json_response(self, response: requests.Response) -> dict[str, Any] | list[dict[str, Any]]: r = self._check_response(response, expect_json=True) - if isinstance(r, dict) and 'errors' in r: - return r - if isinstance(r, list): - return r - # Else: an exception was raised anyway - raise PyMISPUnexpectedResponse(f'A list was expected, got a {type(r)}: {r}') - - def _check_json_response(self, response: requests.Response) -> dict[str, Any]: - r = self._check_response(response, expect_json=True) - if isinstance(r, dict): + if isinstance(r, (dict, list)): return r # Else: an exception was raised anyway raise PyMISPUnexpectedResponse(f'A dict was expected, got a string: {r}')