new: Add support for UserSettings

pull/485/head
Raphaël Vinot 2019-10-16 17:22:19 +02:00
parent 82abf4c801
commit c509b22beb
3 changed files with 161 additions and 7 deletions

View File

@ -18,7 +18,10 @@ import sys
from . import __version__
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey
from .api import everything_broken, PyMISP
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, \
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
SearchType = TypeVar('SearchType', str, int)
@ -83,6 +86,9 @@ class ExpandedPyMISP(PyMISP):
misp_version = self.misp_instance_version
if 'version' in misp_version:
self._misp_version = tuple(int(v) for v in misp_version['version'].split('.'))
# Get the user information
self._current_user, self._current_role, self._current_user_settings = self.get_user(pythonify=True, expanded=True)
except Exception as e:
raise PyMISPError(f'Unable to connect to MISP ({self.root_url}). Please make sure the API key and the URL are correct (http/https is required): {e}')
@ -1261,8 +1267,9 @@ class ExpandedPyMISP(PyMISP):
to_return.append(u)
return to_return
def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False):
'''Get a user. `me` means the owner of the API key doing the query.'''
def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False, expanded: bool=False):
'''Get a user. `me` means the owner of the API key doing the query.
expanded also returns a MISPRole and a MISPUserSetting'''
user_id = self.__get_uuid_or_id_from_abstract_misp(user)
user = self._prepare_request('GET', f'users/view/{user_id}')
user = self._check_response(user, expect_json=True)
@ -1270,7 +1277,18 @@ class ExpandedPyMISP(PyMISP):
return user
u = MISPUser()
u.from_dict(**user)
return u
if not expanded:
return u
else:
r = MISPRole()
r.from_dict(**user['Role'])
usersettings = []
if user['UserSetting']:
for name, value in user['UserSetting'].items():
us = MISPUserSetting()
us.from_dict(**{'name': name, 'value': value})
usersettings.append(us)
return u, r, usersettings
def add_user(self, user: MISPUser, pythonify: bool=False):
'''Add a new user'''
@ -1288,7 +1306,10 @@ class ExpandedPyMISP(PyMISP):
user_id = self.__get_uuid_or_id_from_abstract_misp(user)
else:
user_id = self.__get_uuid_or_id_from_abstract_misp(user_id)
updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user)
url = f'users/edit/{user_id}'
if self._current_role.perm_admin or self._current_role.perm_site_admin:
url = f'admin/{url}'
updated_user = self._prepare_request('POST', url, data=user)
updated_user = self._check_response(updated_user, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in updated_user:
return updated_user
@ -1303,6 +1324,10 @@ class ExpandedPyMISP(PyMISP):
response = self._prepare_request('POST', f'admin/users/delete/{user_id}')
return self._check_response(response, expect_json=True)
def change_user_password(self, new_password: str, user: Union[MISPUser, int, str, UUID]=None):
response = self._prepare_request('POST', f'users/change_pw', data={'password': new_password})
return self._check_response(response, expect_json=True)
# ## END User ###
# ## BEGIN Role ###
@ -1944,6 +1969,61 @@ class ExpandedPyMISP(PyMISP):
# ## END Statistics ###
# ## BEGIN User Settings ###
def user_settings(self, pythonify: bool=False):
"""Get all the user settings."""
user_settings = self._prepare_request('GET', 'user_settings')
user_settings = self._check_response(user_settings, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in user_settings:
return user_settings
to_return = []
for user_setting in user_settings:
u = MISPUserSetting()
u.from_dict(**user_setting)
to_return.append(u)
return to_return
def get_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None, pythonify: bool=False):
'''Get an user setting'''
query = {'setting': user_setting}
if user:
query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user)
response = self._prepare_request('POST', f'user_settings/getSetting')
user_setting = self._check_response(response, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in user_setting:
return user_setting
u = MISPUserSetting()
u.from_dict(**user_setting)
return u
def set_user_setting(self, user_setting: str, value: Union[str, dict], user: Union[MISPUser, int, str, UUID]=None, pythonify: bool=False):
'''Get an user setting'''
query = {'setting': user_setting}
if isinstance(value, dict):
value = json.dumps(value)
query['value'] = value
if user:
query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user)
response = self._prepare_request('POST', f'user_settings/setSetting', data=query)
user_setting = self._check_response(response, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in user_setting:
return user_setting
u = MISPUserSetting()
u.from_dict(**user_setting)
return u
def delete_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None):
'''Delete a user setting'''
query = {'setting': user_setting}
if user:
query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user)
response = self._prepare_request('POST', f'user_settings/delete', data=query)
return self._check_response(response, expect_json=True)
# ## END User Settings ###
# ## BEGIN Global helpers ###
def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False):
@ -2016,6 +2096,12 @@ class ExpandedPyMISP(PyMISP):
return str(obj)
if isinstance(obj, (int, str)):
return obj
if isinstance(obj, dict) and len(obj.keys()) == 1:
# We have an object in that format: {'Event': {'id': 2, ...}}
# We need to get the content of that dictionary
obj = obj[list(obj.keys())[0]]
if self._old_misp((2, 4, 113), '2020-01-01', sys._getframe().f_code.co_name, message='MISP now accepts UUIDs to access entiries, usinf it is a lot safer across instances. Just update your MISP instance, plz.'):
if 'id' in obj:
return obj['id']

View File

@ -1121,6 +1121,20 @@ class MISPCommunity(AbstractMISP):
return '<{self.__class__.__name__}(name={self.name}, uuid={self.uuid})'.format(self=self)
class MISPUserSetting(AbstractMISP):
def __init__(self):
super(MISPUserSetting, self).__init__()
def from_dict(self, **kwargs):
if 'UserSetting' in kwargs:
kwargs = kwargs['UserSetting']
super(MISPUserSetting, self).from_dict(**kwargs)
def __repr__(self):
return '<{self.__class__.__name__}(name={self.setting}'.format(self=self)
class MISPObject(AbstractMISP):
def __init__(self, name, strict=False, standalone=False, default_attributes_parameters={}, **kwargs):

View File

@ -25,7 +25,7 @@ import logging
logging.disable(logging.CRITICAL)
try:
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
from pymisp.exceptions import MISPServerError
except ImportError:
@ -1945,6 +1945,60 @@ class TestComprehensive(unittest.TestCase):
# Delete event
self.admin_misp_connector.delete_event(first)
def test_user_settings(self):
first = self.create_simple_event()
first.distribution = 3
first.add_tag('test_publish_filter')
first.add_tag('test_publish_filter_not')
second = self.create_simple_event()
second.distribution = 3
try:
# Set
setting = self.admin_misp_connector.set_user_setting('dashboard_access', 1, pythonify=True)
setting_value = {'Tag.name': 'test_publish_filter'}
setting = self.admin_misp_connector.set_user_setting('publish_alert_filter', setting_value, pythonify=True)
self.assertTrue(isinstance(setting, MISPUserSetting))
self.assertEqual(setting.value, setting_value)
# Get
# FIXME: https://github.com/MISP/MISP/issues/5297
# setting = self.admin_misp_connector.get_user_setting('dashboard_access', pythonify=True)
# Get All
user_settings = self.admin_misp_connector.user_settings(pythonify=True)
# TODO: Make that one better
self.assertTrue(isinstance(user_settings, list))
# Test if publish_alert_filter works
first = self.admin_misp_connector.add_event(first, pythonify=True)
second = self.admin_misp_connector.add_event(second, pythonify=True)
r = self.user_misp_connector.change_user_password('Password1234')
self.assertEqual(r['message'], 'Password Changed.')
self.test_usr.autoalert = True
self.test_usr.termsaccepted = True
user = self.user_misp_connector.update_user(self.test_usr, pythonify=True)
self.assertTrue(user.autoalert)
self.admin_misp_connector.publish(first, alert=True)
self.admin_misp_connector.publish(second, alert=True)
time.sleep(10)
# FIXME https://github.com/MISP/MISP/issues/4872
# mail_logs = self.admin_misp_connector.search_logs(model='User', action='email', limit=2, pythonify=True)
mail_logs = self.admin_misp_connector.search_logs(model='User', action='email', created=datetime.now() - timedelta(seconds=30), pythonify=True)
self.assertEqual(len(mail_logs), 3)
self.assertTrue(mail_logs[0].title.startswith(f'Email to {self.admin_misp_connector._current_user.email}'), mail_logs[0].title)
self.assertTrue(mail_logs[1].title.startswith(f'Email to {self.user_misp_connector._current_user.email}'), mail_logs[1].title)
self.assertTrue(mail_logs[2].title.startswith(f'Email to {self.user_misp_connector._current_user.email}'), mail_logs[2].title)
# Delete
# FIXME: https://github.com/MISP/MISP/issues/5297
# response = self.admin_misp_connector.delete_user_setting('publish_alert_filter')
finally:
self.test_usr.autoalert = False
self.user_misp_connector.update_user(self.test_usr)
# Delete event
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
@unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6')
def test_communities(self):
communities = self.admin_misp_connector.communities(pythonify=True)
@ -1977,7 +2031,7 @@ class TestComprehensive(unittest.TestCase):
finally:
# Delete event
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second['Event']['id'])
self.admin_misp_connector.delete_event(second)
if __name__ == '__main__':