diff --git a/pymisp/aping.py b/pymisp/aping.py index 152612a..e102ddd 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -18,7 +18,10 @@ import sys from . import __version__ from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey from .api import everything_broken, PyMISP -from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity +from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, \ + MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \ + MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ + MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types SearchType = TypeVar('SearchType', str, int) @@ -83,6 +86,9 @@ class ExpandedPyMISP(PyMISP): misp_version = self.misp_instance_version if 'version' in misp_version: self._misp_version = tuple(int(v) for v in misp_version['version'].split('.')) + + # Get the user information + self._current_user, self._current_role, self._current_user_settings = self.get_user(pythonify=True, expanded=True) except Exception as e: raise PyMISPError(f'Unable to connect to MISP ({self.root_url}). Please make sure the API key and the URL are correct (http/https is required): {e}') @@ -1261,8 +1267,9 @@ class ExpandedPyMISP(PyMISP): to_return.append(u) return to_return - def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False): - '''Get a user. `me` means the owner of the API key doing the query.''' + def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False, expanded: bool=False): + '''Get a user. `me` means the owner of the API key doing the query. + expanded also returns a MISPRole and a MISPUserSetting''' user_id = self.__get_uuid_or_id_from_abstract_misp(user) user = self._prepare_request('GET', f'users/view/{user_id}') user = self._check_response(user, expect_json=True) @@ -1270,7 +1277,18 @@ class ExpandedPyMISP(PyMISP): return user u = MISPUser() u.from_dict(**user) - return u + if not expanded: + return u + else: + r = MISPRole() + r.from_dict(**user['Role']) + usersettings = [] + if user['UserSetting']: + for name, value in user['UserSetting'].items(): + us = MISPUserSetting() + us.from_dict(**{'name': name, 'value': value}) + usersettings.append(us) + return u, r, usersettings def add_user(self, user: MISPUser, pythonify: bool=False): '''Add a new user''' @@ -1288,7 +1306,10 @@ class ExpandedPyMISP(PyMISP): user_id = self.__get_uuid_or_id_from_abstract_misp(user) else: user_id = self.__get_uuid_or_id_from_abstract_misp(user_id) - updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user) + url = f'users/edit/{user_id}' + if self._current_role.perm_admin or self._current_role.perm_site_admin: + url = f'admin/{url}' + updated_user = self._prepare_request('POST', url, data=user) updated_user = self._check_response(updated_user, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in updated_user: return updated_user @@ -1303,6 +1324,10 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('POST', f'admin/users/delete/{user_id}') return self._check_response(response, expect_json=True) + def change_user_password(self, new_password: str, user: Union[MISPUser, int, str, UUID]=None): + response = self._prepare_request('POST', f'users/change_pw', data={'password': new_password}) + return self._check_response(response, expect_json=True) + # ## END User ### # ## BEGIN Role ### @@ -1944,6 +1969,61 @@ class ExpandedPyMISP(PyMISP): # ## END Statistics ### + # ## BEGIN User Settings ### + + def user_settings(self, pythonify: bool=False): + """Get all the user settings.""" + user_settings = self._prepare_request('GET', 'user_settings') + user_settings = self._check_response(user_settings, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in user_settings: + return user_settings + to_return = [] + for user_setting in user_settings: + u = MISPUserSetting() + u.from_dict(**user_setting) + to_return.append(u) + return to_return + + def get_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None, pythonify: bool=False): + '''Get an user setting''' + query = {'setting': user_setting} + if user: + query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user) + response = self._prepare_request('POST', f'user_settings/getSetting') + user_setting = self._check_response(response, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in user_setting: + return user_setting + u = MISPUserSetting() + u.from_dict(**user_setting) + return u + + def set_user_setting(self, user_setting: str, value: Union[str, dict], user: Union[MISPUser, int, str, UUID]=None, pythonify: bool=False): + '''Get an user setting''' + query = {'setting': user_setting} + if isinstance(value, dict): + value = json.dumps(value) + query['value'] = value + if user: + query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user) + response = self._prepare_request('POST', f'user_settings/setSetting', data=query) + user_setting = self._check_response(response, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in user_setting: + return user_setting + u = MISPUserSetting() + u.from_dict(**user_setting) + return u + + def delete_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None): + '''Delete a user setting''' + query = {'setting': user_setting} + if user: + query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user) + response = self._prepare_request('POST', f'user_settings/delete', data=query) + return self._check_response(response, expect_json=True) + + + # ## END User Settings ### + # ## BEGIN Global helpers ### def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False): @@ -2016,6 +2096,12 @@ class ExpandedPyMISP(PyMISP): return str(obj) if isinstance(obj, (int, str)): return obj + + if isinstance(obj, dict) and len(obj.keys()) == 1: + # We have an object in that format: {'Event': {'id': 2, ...}} + # We need to get the content of that dictionary + obj = obj[list(obj.keys())[0]] + if self._old_misp((2, 4, 113), '2020-01-01', sys._getframe().f_code.co_name, message='MISP now accepts UUIDs to access entiries, usinf it is a lot safer across instances. Just update your MISP instance, plz.'): if 'id' in obj: return obj['id'] diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 36f70cd..78582e1 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -1121,6 +1121,20 @@ class MISPCommunity(AbstractMISP): return '<{self.__class__.__name__}(name={self.name}, uuid={self.uuid})'.format(self=self) +class MISPUserSetting(AbstractMISP): + + def __init__(self): + super(MISPUserSetting, self).__init__() + + def from_dict(self, **kwargs): + if 'UserSetting' in kwargs: + kwargs = kwargs['UserSetting'] + super(MISPUserSetting, self).from_dict(**kwargs) + + def __repr__(self): + return '<{self.__class__.__name__}(name={self.setting}'.format(self=self) + + class MISPObject(AbstractMISP): def __init__(self, name, strict=False, standalone=False, default_attributes_parameters={}, **kwargs): diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 170c6eb..f5353dc 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -25,7 +25,7 @@ import logging logging.disable(logging.CRITICAL) try: - from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer + from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator from pymisp.exceptions import MISPServerError except ImportError: @@ -1945,6 +1945,60 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) + def test_user_settings(self): + first = self.create_simple_event() + first.distribution = 3 + first.add_tag('test_publish_filter') + first.add_tag('test_publish_filter_not') + second = self.create_simple_event() + second.distribution = 3 + try: + # Set + setting = self.admin_misp_connector.set_user_setting('dashboard_access', 1, pythonify=True) + setting_value = {'Tag.name': 'test_publish_filter'} + setting = self.admin_misp_connector.set_user_setting('publish_alert_filter', setting_value, pythonify=True) + self.assertTrue(isinstance(setting, MISPUserSetting)) + self.assertEqual(setting.value, setting_value) + + # Get + # FIXME: https://github.com/MISP/MISP/issues/5297 + # setting = self.admin_misp_connector.get_user_setting('dashboard_access', pythonify=True) + + # Get All + user_settings = self.admin_misp_connector.user_settings(pythonify=True) + # TODO: Make that one better + self.assertTrue(isinstance(user_settings, list)) + + # Test if publish_alert_filter works + first = self.admin_misp_connector.add_event(first, pythonify=True) + second = self.admin_misp_connector.add_event(second, pythonify=True) + r = self.user_misp_connector.change_user_password('Password1234') + self.assertEqual(r['message'], 'Password Changed.') + self.test_usr.autoalert = True + self.test_usr.termsaccepted = True + user = self.user_misp_connector.update_user(self.test_usr, pythonify=True) + self.assertTrue(user.autoalert) + self.admin_misp_connector.publish(first, alert=True) + self.admin_misp_connector.publish(second, alert=True) + time.sleep(10) + # FIXME https://github.com/MISP/MISP/issues/4872 + # mail_logs = self.admin_misp_connector.search_logs(model='User', action='email', limit=2, pythonify=True) + mail_logs = self.admin_misp_connector.search_logs(model='User', action='email', created=datetime.now() - timedelta(seconds=30), pythonify=True) + self.assertEqual(len(mail_logs), 3) + self.assertTrue(mail_logs[0].title.startswith(f'Email to {self.admin_misp_connector._current_user.email}'), mail_logs[0].title) + self.assertTrue(mail_logs[1].title.startswith(f'Email to {self.user_misp_connector._current_user.email}'), mail_logs[1].title) + self.assertTrue(mail_logs[2].title.startswith(f'Email to {self.user_misp_connector._current_user.email}'), mail_logs[2].title) + + # Delete + # FIXME: https://github.com/MISP/MISP/issues/5297 + # response = self.admin_misp_connector.delete_user_setting('publish_alert_filter') + finally: + self.test_usr.autoalert = False + self.user_misp_connector.update_user(self.test_usr) + # Delete event + self.admin_misp_connector.delete_event(first) + self.admin_misp_connector.delete_event(second) + @unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6') def test_communities(self): communities = self.admin_misp_connector.communities(pythonify=True) @@ -1977,7 +2031,7 @@ class TestComprehensive(unittest.TestCase): finally: # Delete event self.admin_misp_connector.delete_event(first) - self.admin_misp_connector.delete_event(second['Event']['id']) + self.admin_misp_connector.delete_event(second) if __name__ == '__main__':