chg: Improve error message, add comments, rename whitelist->allowedlist

pull/633/head
Raphaël Vinot 2020-09-15 12:31:22 +02:00
parent 73b56a61da
commit 50e5f156bd
3 changed files with 32 additions and 7 deletions

View File

@ -35,6 +35,7 @@ logger = logging.getLogger('pymisp')
def get_uuid_or_id_from_abstract_misp(obj: Union[AbstractMISP, int, str, UUID]) -> Union[str, int]: def get_uuid_or_id_from_abstract_misp(obj: Union[AbstractMISP, int, str, UUID]) -> Union[str, int]:
"""Extract the relevant ID accordingly to the given type passed as parameter"""
if isinstance(obj, UUID): if isinstance(obj, UUID):
return str(obj) return str(obj)
if isinstance(obj, (int, str)): if isinstance(obj, (int, str)):
@ -188,6 +189,7 @@ class PyMISP:
@property @property
def pymisp_version_master(self) -> Dict: def pymisp_version_master(self) -> Dict:
"""PyMISP version as defined in the main repository"""
return self.pymisp_version_main return self.pymisp_version_main
@property @property
@ -215,36 +217,44 @@ class PyMISP:
return {'error': 'Impossible to retrieve the version of the master branch.'} return {'error': 'Impossible to retrieve the version of the master branch.'}
def update_misp(self) -> Dict: def update_misp(self) -> Dict:
"""Trigger a server update"""
response = self._prepare_request('POST', 'servers/update') response = self._prepare_request('POST', 'servers/update')
return self._check_json_response(response) return self._check_json_response(response)
def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool = False) -> Dict: def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool = False) -> Dict:
"""Set a setting on the MISP instance"""
data = {'value': value, 'force': force} data = {'value': value, 'force': force}
response = self._prepare_request('POST', f'servers/serverSettingsEdit/{setting}', data=data) response = self._prepare_request('POST', f'servers/serverSettingsEdit/{setting}', data=data)
return self._check_json_response(response) return self._check_json_response(response)
def get_server_setting(self, setting: str) -> Dict: def get_server_setting(self, setting: str) -> Dict:
"""Get a setting from the MISP instance"""
response = self._prepare_request('GET', f'servers/getSetting/{setting}') response = self._prepare_request('GET', f'servers/getSetting/{setting}')
return self._check_json_response(response) return self._check_json_response(response)
def server_settings(self) -> Dict: def server_settings(self) -> Dict:
"""Get all the settings from the server"""
response = self._prepare_request('GET', 'servers/serverSettings') response = self._prepare_request('GET', 'servers/serverSettings')
return self._check_json_response(response) return self._check_json_response(response)
def restart_workers(self) -> Dict: def restart_workers(self) -> Dict:
"""Restart all the workers"""
response = self._prepare_request('POST', 'servers/restartWorkers') response = self._prepare_request('POST', 'servers/restartWorkers')
return self._check_json_response(response) return self._check_json_response(response)
def db_schema_diagnostic(self) -> Dict: def db_schema_diagnostic(self) -> Dict:
"""Get the schema diagnostic"""
response = self._prepare_request('GET', 'servers/dbSchemaDiagnostic') response = self._prepare_request('GET', 'servers/dbSchemaDiagnostic')
return self._check_json_response(response) return self._check_json_response(response)
def toggle_global_pythonify(self) -> None: def toggle_global_pythonify(self) -> None:
"""Toggle the pythonify variable for the class"""
self.global_pythonify = not self.global_pythonify self.global_pythonify = not self.global_pythonify
# ## BEGIN Event ## # ## BEGIN Event ##
def events(self, pythonify: bool = False) -> Union[Dict, List[MISPEvent]]: def events(self, pythonify: bool = False) -> Union[Dict, List[MISPEvent]]:
"""Get all the events from the MISP instance"""
r = self._prepare_request('GET', 'events/index') r = self._prepare_request('GET', 'events/index')
events_r = self._check_json_response(r) events_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in events_r: if not (self.global_pythonify or pythonify) or 'errors' in events_r:
@ -424,6 +434,7 @@ class PyMISP:
# ## BEGIN Attribute ### # ## BEGIN Attribute ###
def attributes(self, pythonify: bool = False) -> Union[Dict, List[MISPAttribute]]: def attributes(self, pythonify: bool = False) -> Union[Dict, List[MISPAttribute]]:
"""Get all the attributes from the MISP instance"""
r = self._prepare_request('GET', 'attributes/index') r = self._prepare_request('GET', 'attributes/index')
attributes_r = self._check_json_response(r) attributes_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in attributes_r: if not (self.global_pythonify or pythonify) or 'errors' in attributes_r:
@ -519,6 +530,7 @@ class PyMISP:
# ## BEGIN Attribute Proposal ### # ## BEGIN Attribute Proposal ###
def attribute_proposals(self, event: Optional[Union[MISPEvent, int, str, UUID]] = None, pythonify: bool = False) -> Union[Dict, List[MISPShadowAttribute]]: def attribute_proposals(self, event: Optional[Union[MISPEvent, int, str, UUID]] = None, pythonify: bool = False) -> Union[Dict, List[MISPShadowAttribute]]:
"""Get all the attribute proposals"""
if event: if event:
event_id = get_uuid_or_id_from_abstract_misp(event) event_id = get_uuid_or_id_from_abstract_misp(event)
r = self._prepare_request('GET', f'shadowAttributes/index/{event_id}') r = self._prepare_request('GET', f'shadowAttributes/index/{event_id}')
@ -535,6 +547,7 @@ class PyMISP:
return to_return return to_return
def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPShadowAttribute]: def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPShadowAttribute]:
"""Get an attribute proposal"""
proposal_id = get_uuid_or_id_from_abstract_misp(proposal) proposal_id = get_uuid_or_id_from_abstract_misp(proposal)
r = self._prepare_request('GET', f'shadowAttributes/view/{proposal_id}') r = self._prepare_request('GET', f'shadowAttributes/view/{proposal_id}')
attribute_proposal = self._check_json_response(r) attribute_proposal = self._check_json_response(r)
@ -1166,6 +1179,7 @@ class PyMISP:
return self._check_json_response(response) return self._check_json_response(response)
def test_server(self, server: Union[MISPServer, int, str, UUID]) -> Dict: def test_server(self, server: Union[MISPServer, int, str, UUID]) -> Dict:
"""Test if a sync link is working as expected"""
server_id = get_uuid_or_id_from_abstract_misp(server) server_id = get_uuid_or_id_from_abstract_misp(server)
response = self._prepare_request('POST', f'servers/testConnection/{server_id}') response = self._prepare_request('POST', f'servers/testConnection/{server_id}')
return self._check_json_response(response) return self._check_json_response(response)
@ -1409,6 +1423,7 @@ class PyMISP:
role: Optional[Union[MISPRole, int, str]] = None, role: Optional[Union[MISPRole, int, str]] = None,
perm_sync: bool = False, perm_publish: bool = False, perm_admin: bool = False, perm_sync: bool = False, perm_publish: bool = False, perm_admin: bool = False,
unsafe_fallback: bool = False): unsafe_fallback: bool = False):
"""Accept a user registration"""
registration_id = get_uuid_or_id_from_abstract_misp(registration) registration_id = get_uuid_or_id_from_abstract_misp(registration)
if role: if role:
role_id = role_id = get_uuid_or_id_from_abstract_misp(role) role_id = role_id = get_uuid_or_id_from_abstract_misp(role)
@ -1446,6 +1461,7 @@ class PyMISP:
return self._check_json_response(r) return self._check_json_response(r)
def discard_user_registration(self, registration: Union[MISPInbox, int, str, UUID]): def discard_user_registration(self, registration: Union[MISPInbox, int, str, UUID]):
"""Discard a user registration"""
registration_id = get_uuid_or_id_from_abstract_misp(registration) registration_id = get_uuid_or_id_from_abstract_misp(registration)
r = self._prepare_request('POST', f'users/discardRegistrations/{registration_id}') r = self._prepare_request('POST', f'users/discardRegistrations/{registration_id}')
return self._check_json_response(r) return self._check_json_response(r)
@ -1468,6 +1484,7 @@ class PyMISP:
return to_return return to_return
def set_default_role(self, role: Union[MISPRole, int, str, UUID]) -> Dict: def set_default_role(self, role: Union[MISPRole, int, str, UUID]) -> Dict:
"""Set a default role for the new user accounts"""
role_id = get_uuid_or_id_from_abstract_misp(role) role_id = get_uuid_or_id_from_abstract_misp(role)
url = urljoin(self.root_url, f'/admin/roles/set_default/{role_id}') url = urljoin(self.root_url, f'/admin/roles/set_default/{role_id}')
response = self._prepare_request('POST', url) response = self._prepare_request('POST', url)
@ -1964,6 +1981,7 @@ class PyMISP:
message: Optional[str] = None, sync: bool = False, message: Optional[str] = None, sync: bool = False,
anonymise_requestor_server: bool = False, anonymise_requestor_server: bool = False,
mock: bool = False) -> Dict: mock: bool = False) -> Dict:
"""Request the access to a community"""
community_id = get_uuid_or_id_from_abstract_misp(community) community_id = get_uuid_or_id_from_abstract_misp(community)
to_post = {'org_name': requestor_organisation_name, to_post = {'org_name': requestor_organisation_name,
'org_uuid': requestor_organisation_uuid, 'org_uuid': requestor_organisation_uuid,
@ -1992,11 +2010,13 @@ class PyMISP:
return to_return return to_return
def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool = False) -> Dict: def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool = False) -> Dict:
"""Accept the delegation of an event"""
delegation_id = get_uuid_or_id_from_abstract_misp(delegation) delegation_id = get_uuid_or_id_from_abstract_misp(delegation)
r = self._prepare_request('POST', f'eventDelegations/acceptDelegation/{delegation_id}') r = self._prepare_request('POST', f'eventDelegations/acceptDelegation/{delegation_id}')
return self._check_json_response(r) return self._check_json_response(r)
def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool = False) -> Dict: def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool = False) -> Dict:
"""Discard the delegation of an event"""
delegation_id = get_uuid_or_id_from_abstract_misp(delegation) delegation_id = get_uuid_or_id_from_abstract_misp(delegation)
r = self._prepare_request('POST', f'eventDelegations/deleteDelegation/{delegation_id}') r = self._prepare_request('POST', f'eventDelegations/deleteDelegation/{delegation_id}')
return self._check_json_response(r) return self._check_json_response(r)
@ -2005,7 +2025,8 @@ class PyMISP:
organisation: Optional[Union[MISPOrganisation, int, str, UUID]] = None, organisation: Optional[Union[MISPOrganisation, int, str, UUID]] = None,
event_delegation: Optional[MISPEventDelegation] = None, event_delegation: Optional[MISPEventDelegation] = None,
distribution: int = -1, message: str = '', pythonify: bool = False) -> Union[Dict, MISPEventDelegation]: distribution: int = -1, message: str = '', pythonify: bool = False) -> Union[Dict, MISPEventDelegation]:
'''Note: distribution == -1 means recipient decides''' '''Delegates an event.
Note: distribution == -1 means recipient decides'''
if event and organisation: if event and organisation:
event_id = get_uuid_or_id_from_abstract_misp(event) event_id = get_uuid_or_id_from_abstract_misp(event)
organisation_id = get_uuid_or_id_from_abstract_misp(organisation) organisation_id = get_uuid_or_id_from_abstract_misp(organisation)

View File

@ -1529,7 +1529,11 @@ class MISPFeed(AbstractMISP):
kwargs = kwargs['Feed'] kwargs = kwargs['Feed']
super().from_dict(**kwargs) super().from_dict(**kwargs)
if hasattr(self, 'settings'): if hasattr(self, 'settings'):
try:
self.settings = json.loads(self.settings) self.settings = json.loads(self.settings)
except json.decoder.JSONDecodeError as e:
logger.error("Failed to parse feed settings: {}".format(self.settings))
raise e
class MISPWarninglist(AbstractMISP): class MISPWarninglist(AbstractMISP):

View File

@ -2752,11 +2752,11 @@ class TestComprehensive(unittest.TestCase):
"warninglists/enableWarninglist", "warninglists/enableWarninglist",
"warninglists/getToggleField", "warninglists/getToggleField",
"warninglists/delete", "warninglists/delete",
"admin/whitelists/add", "admin/allowedlists/add",
"admin/whitelists/index", "admin/allowedlists/index",
"admin/whitelists/edit", "admin/allowedlists/edit",
"admin/whitelists/delete", "admin/allowedlists/delete",
"whitelists/index" "allowedlists/index"
] ]
missing = self.admin_misp_connector.get_all_functions(True) missing = self.admin_misp_connector.get_all_functions(True)
with open('all_missing.json', 'w') as f: with open('all_missing.json', 'w') as f: