fix: More fixes to support responses from MISP

pull/1144/head
Raphaël Vinot 2024-02-01 17:24:24 +01:00
parent c3a3868f42
commit b5b4a5ef52
4 changed files with 38 additions and 27 deletions

View File

@ -243,13 +243,13 @@ class PyMISP:
self.category_type_mapping = self.describe_types['category_type_mappings'] self.category_type_mapping = self.describe_types['category_type_mappings']
self.sane_default = self.describe_types['sane_defaults'] self.sane_default = self.describe_types['sane_defaults']
def remote_acl(self, debug_type: str = 'findMissingFunctionNames') -> dict[str, Any]: def remote_acl(self, debug_type: str = 'findMissingFunctionNames') -> dict[str, Any] | list[dict[str, Any]]:
"""This should return an empty list, unless the ACL is outdated. """This should return an empty list, unless the ACL is outdated.
:param debug_type: printAllFunctionNames, findMissingFunctionNames, or printRoleAccess :param debug_type: printAllFunctionNames, findMissingFunctionNames, or printRoleAccess
""" """
response = self._prepare_request('GET', f'events/queryACL/{debug_type}') response = self._prepare_request('GET', f'events/queryACL/{debug_type}')
return self._check_json_response(response) return self._check_json_response_list(response)
@property @property
def describe_types_local(self) -> dict[str, Any]: def describe_types_local(self) -> dict[str, Any]:
@ -1210,16 +1210,19 @@ class PyMISP:
taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy)
self.disable_taxonomy_tags(taxonomy_id) self.disable_taxonomy_tags(taxonomy_id)
response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}') response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}')
try:
return self._check_json_response(response) return self._check_json_response(response)
except PyMISPError:
return self._check_json_response_list(response)
def disable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]: def disable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Disable all the tags of a taxonomy """Disable all the tags of a taxonomy
:param taxonomy: taxonomy with tags to disable :param taxonomy: taxonomy with tags to disable
""" """
taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy)
response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}') response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}')
return self._check_json_response(response) return self._check_json_response_list(response)
def enable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]: def enable_taxonomy_tags(self, taxonomy: MISPTaxonomy | int | str | UUID) -> dict[str, Any]:
"""Enable all the tags of a taxonomy. NOTE: this is automatically done when you call enable_taxonomy """Enable all the tags of a taxonomy. NOTE: this is automatically done when you call enable_taxonomy
@ -1238,10 +1241,10 @@ class PyMISP:
response = self._prepare_request('POST', url) response = self._prepare_request('POST', url)
return self._check_json_response(response) return self._check_json_response(response)
def update_taxonomies(self) -> dict[str, Any] | list[dict[str, Any]]: def update_taxonomies(self) -> dict[str, Any]:
"""Update all the taxonomies: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/updateTaxonomies""" """Update all the taxonomies: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/updateTaxonomies"""
response = self._prepare_request('POST', 'taxonomies/update') response = self._prepare_request('POST', 'taxonomies/update')
return self._check_json_response_list(response) return self._check_json_response(response)
def set_taxonomy_required(self, taxonomy: MISPTaxonomy | int | str, required: bool = False) -> dict[str, Any]: def set_taxonomy_required(self, taxonomy: MISPTaxonomy | int | str, required: bool = False) -> dict[str, Any]:
taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy) taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy)
@ -1330,18 +1333,18 @@ class PyMISP:
warninglist_id = get_uuid_or_id_from_abstract_misp(warninglist) warninglist_id = get_uuid_or_id_from_abstract_misp(warninglist)
return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False)
def values_in_warninglist(self, value: Iterable[str]) -> dict[str, Any] | list[dict[str, Any]]: def values_in_warninglist(self, value: Iterable[str]) -> dict[str, Any]:
"""Check if IOC values are in warninglist """Check if IOC values are in warninglist
:param value: iterator with values to check :param value: iterator with values to check
""" """
response = self._prepare_request('POST', 'warninglists/checkValue', data=value) response = self._prepare_request('POST', 'warninglists/checkValue', data=value)
return self._check_json_response_list(response) return self._check_json_response(response)
def update_warninglists(self) -> dict[str, Any] | list[dict[str, Any]]: def update_warninglists(self) -> dict[str, Any]:
"""Update all the warninglists: https://www.misp-project.org/openapi/#tag/Warninglists/operation/updateWarninglists""" """Update all the warninglists: https://www.misp-project.org/openapi/#tag/Warninglists/operation/updateWarninglists"""
response = self._prepare_request('POST', 'warninglists/update') response = self._prepare_request('POST', 'warninglists/update')
return self._check_json_response_list(response) return self._check_json_response(response)
# ## END Warninglists ### # ## END Warninglists ###
@ -1400,10 +1403,10 @@ class PyMISP:
response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}') response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}')
return self._check_json_response(response) return self._check_json_response(response)
def update_noticelists(self) -> dict[str, Any] | list[dict[str, Any]]: def update_noticelists(self) -> dict[str, Any]:
"""Update all the noticelists: https://www.misp-project.org/openapi/#tag/Noticelists/operation/updateNoticelists""" """Update all the noticelists: https://www.misp-project.org/openapi/#tag/Noticelists/operation/updateNoticelists"""
response = self._prepare_request('POST', 'noticelists/update') response = self._prepare_request('POST', 'noticelists/update')
return self._check_json_response_list(response) return self._check_json_response(response)
# ## END Noticelist ### # ## END Noticelist ###
@ -1553,10 +1556,10 @@ class PyMISP:
response.append(c) response.append(c)
return response return response
def update_galaxies(self) -> dict[str, Any] | list[dict[str, Any]]: def update_galaxies(self) -> dict[str, Any]:
"""Update all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/updateGalaxies""" """Update all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/updateGalaxies"""
response = self._prepare_request('POST', 'galaxies/update') response = self._prepare_request('POST', 'galaxies/update')
return self._check_json_response_list(response) return self._check_json_response(response)
def get_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPGalaxyCluster: def get_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPGalaxyCluster:
"""Gets a specific galaxy cluster """Gets a specific galaxy cluster
@ -1867,10 +1870,10 @@ class PyMISP:
response = self._prepare_request('GET', 'feeds/cacheFeeds/misp') response = self._prepare_request('GET', 'feeds/cacheFeeds/misp')
return self._check_json_response(response) return self._check_json_response(response)
def compare_feeds(self) -> dict[str, Any]: def compare_feeds(self) -> dict[str, Any] | list[dict[str, Any]]:
"""Generate the comparison matrix for all the MISP feeds""" """Generate the comparison matrix for all the MISP feeds"""
response = self._prepare_request('GET', 'feeds/compareFeeds') response = self._prepare_request('GET', 'feeds/compareFeeds')
return self._check_json_response(response) return self._check_json_response_list(response)
def load_default_feeds(self) -> dict[str, Any]: def load_default_feeds(self) -> dict[str, Any]:
"""Load all the default feeds.""" """Load all the default feeds."""
@ -2475,10 +2478,10 @@ class PyMISP:
# ## BEGIN Decaying Models ### # ## BEGIN Decaying Models ###
def update_decaying_models(self) -> dict[str, Any] | list[dict[str, Any]]: def update_decaying_models(self) -> dict[str, Any]:
"""Update all the Decaying models""" """Update all the Decaying models"""
response = self._prepare_request('POST', 'decayingModel/update') response = self._prepare_request('POST', 'decayingModel/update')
return self._check_json_response_list(response) return self._check_json_response(response)
def decaying_models(self, pythonify: bool = False) -> dict[str, Any] | list[MISPDecayingModel] | list[dict[str, Any]]: def decaying_models(self, pythonify: bool = False) -> dict[str, Any] | list[MISPDecayingModel] | list[dict[str, Any]]:
"""Get all the decaying models """Get all the decaying models
@ -2731,7 +2734,11 @@ class PyMISP:
elif return_format not in ['json', 'yara-json']: elif return_format not in ['json', 'yara-json']:
return self._check_response(response) return self._check_response(response)
normalized_response: list[dict[str, Any]] | dict[str, Any]
if controller in ['events', 'objects']:
# This one is truly fucked: event returns a list, attributes doesn't. # This one is truly fucked: event returns a list, attributes doesn't.
normalized_response = self._check_json_response_list(response)
elif controller == 'attributes':
normalized_response = self._check_json_response(response) normalized_response = self._check_json_response(response)
if 'errors' in normalized_response: if 'errors' in normalized_response:
@ -2749,7 +2756,7 @@ class PyMISP:
to_return.append(me) to_return.append(me)
elif controller == 'attributes': elif controller == 'attributes':
# FIXME: obvs, this is hurting my soul. We need something generic. # FIXME: obvs, this is hurting my soul. We need something generic.
for a in normalized_response['Attribute']: for a in normalized_response['Attribute']: # type: ignore[call-overload]
ma = MISPAttribute() ma = MISPAttribute()
ma.from_dict(**a) ma.from_dict(**a)
if 'Event' in ma: if 'Event' in ma:
@ -3298,7 +3305,7 @@ class PyMISP:
response = self._prepare_request('GET', f'tags/tagStatistics/{p}/{ns}') response = self._prepare_request('GET', f'tags/tagStatistics/{p}/{ns}')
return self._check_json_response(response) return self._check_json_response(response)
def users_statistics(self, context: str = 'data') -> dict[str, Any]: def users_statistics(self, context: str = 'data') -> dict[str, Any] | list[dict[str, Any]]:
"""Get user statistics from the MISP instance """Get user statistics from the MISP instance
:param context: one of 'data', 'orgs', 'users', 'tags', 'attributehistogram', 'sightings', 'galaxyMatrix' :param context: one of 'data', 'orgs', 'users', 'tags', 'attributehistogram', 'sightings', 'galaxyMatrix'
@ -3307,6 +3314,9 @@ class PyMISP:
if context not in availables_contexts: if context not in availables_contexts:
raise PyMISPError("context can only be {','.join(availables_contexts)}") raise PyMISPError("context can only be {','.join(availables_contexts)}")
response = self._prepare_request('GET', f'users/statistics/{context}') response = self._prepare_request('GET', f'users/statistics/{context}')
try:
return self._check_json_response_list(response)
except PyMISPError:
return self._check_json_response(response) return self._check_json_response(response)
# ## END Statistics ### # ## END Statistics ###

View File

@ -2007,7 +2007,7 @@ class MISPEvent(AbstractMISP):
:param object_id: ID or UUID :param object_id: ID or UUID
""" """
for o in self.objects: for o in self.objects:
if ((hasattr(o, 'id') and int(o.id) == int(object_id)) if ((hasattr(o, 'id') and object_id.isdigit() and int(o.id) == int(object_id))
or (hasattr(o, 'uuid') and o.uuid == object_id)): or (hasattr(o, 'uuid') and o.uuid == object_id)):
o.delete() o.delete()
break break

View File

@ -16,6 +16,7 @@ from .abstractgenerator import AbstractMISPObjectGenerator
from ..exceptions import InvalidMISPObject from ..exceptions import InvalidMISPObject
import lief import lief
import lief.PE
try: try:
import pydeep # type: ignore import pydeep # type: ignore
@ -201,7 +202,7 @@ class PESigners(AbstractMISPObjectGenerator):
self.add_attribute('digest_algorithm', value=str(self.__signer.digest_algorithm)) self.add_attribute('digest_algorithm', value=str(self.__signer.digest_algorithm))
self.add_attribute('encryption_algorithm', value=str(self.__signer.encryption_algorithm)) self.add_attribute('encryption_algorithm', value=str(self.__signer.encryption_algorithm))
self.add_attribute('digest-base64', value=b64encode(self.__signer.encrypted_digest)) self.add_attribute('digest-base64', value=b64encode(self.__signer.encrypted_digest))
info: lief.PE.SpcSpOpusInfo = self.__signer.get_attribute(lief.PE.SIG_ATTRIBUTE_TYPES.SPC_SP_OPUS_INFO) # type: ignore[attr-defined, assignment] info: lief.PE.SpcSpOpusInfo = self.__signer.get_attribute(lief.PE.Attribute.TYPE.SPC_SP_OPUS_INFO) # type: ignore[assignment]
if info: if info:
self.add_attribute('program-name', value=info.program_name) self.add_attribute('program-name', value=info.program_name)
self.add_attribute('url', value=info.more_info) self.add_attribute('url', value=info.more_info)

View File

@ -12,7 +12,7 @@ import unittest
from datetime import datetime, timedelta, date, timezone from datetime import datetime, timedelta, date, timezone
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import TypeVar, Type, Any from typing import TypeVar, Any
from uuid import uuid4 from uuid import uuid4
import urllib3 import urllib3