From 5cfbb679db80dc7e01e5cc9695337ede7171ea60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 25 Jan 2017 15:09:12 +0100 Subject: [PATCH] Add orgs managment --- pymisp/api.py | 201 +++++++++++++++++++++++++++++++------------------- 1 file changed, 127 insertions(+), 74 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 9c714f0..71ec608 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -127,7 +127,8 @@ class PyMISP(object): if not self.describe_types.get('sane_defaults'): raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.') except: - describe_types = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r')) + with open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r') as f: + describe_types = json.load(f) self.describe_types = describe_types['result'] self.categories = self.describe_types['categories'] @@ -400,6 +401,7 @@ class PyMISP(object): regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I) match = regex.match(uuid) return bool(match) + # ##### File attributes ##### def _send_attributes(self, event, attributes, proposal=False): @@ -1110,7 +1112,8 @@ class PyMISP(object): def sighting_per_json(self, json_file): session = self.__prepare_session() - jdata = json.load(open(json_file)) + with open(json_file) as f: + jdata = json.load(f) url = urljoin(self.root_url, 'sightings/add/') response = session.post(url, data=json.dumps(jdata)) return self._check_response(response) @@ -1125,45 +1128,42 @@ class PyMISP(object): # ############## Users ################## - def _set_user_parameters(self, email, org_id, role_id, password, external_auth_required, - external_auth_key, enable_password, nids_sid, server_id, - gpgkey, certif_public, autoalert, contactalert, disabled, - change_pw, termsaccepted, newsread): + def _set_user_parameters(self, **kwargs): user = {} - if email is not None: - user['email'] = email - if org_id is not None: - user['org_id'] = org_id - if role_id is not None: - user['role_id'] = role_id - if password is not None: - user['password'] = password - if external_auth_required is not None: - user['external_auth_required'] = external_auth_required - if external_auth_key is not None: - user['external_auth_key'] = external_auth_key - if enable_password is not None: - user['enable_password'] = enable_password - if nids_sid is not None: - user['nids_sid'] = nids_sid - if server_id is not None: - user['server_id'] = server_id - if gpgkey is not None: - user['gpgkey'] = gpgkey - if certif_public is not None: - user['certif_public'] = certif_public - if autoalert is not None: - user['autoalert'] = autoalert - if contactalert is not None: - user['contactalert'] = contactalert - if disabled is not None: - user['disabled'] = disabled - if change_pw is not None: - user['change_pw'] = change_pw - if termsaccepted is not None: - user['termsaccepted'] = termsaccepted - if newsread is not None: - user['newsread'] = newsread + if kwargs.get('email'): + user['email'] = kwargs.get('email') + if kwargs.get('org_id'): + user['org_id'] = kwargs.get('org_id') + if kwargs.get('role_id'): + user['role_id'] = kwargs.get('role_id') + if kwargs.get('password'): + user['password'] = kwargs.get('password') + if kwargs.get('external_auth_required'): + user['external_auth_required'] = kwargs.get('external_auth_required') + if kwargs.get('external_auth_key'): + user['external_auth_key'] = kwargs.get('external_auth_key') + if kwargs.get('enable_password'): + user['enable_password'] = kwargs.get('enable_password') + if kwargs.get('nids_sid'): + user['nids_sid'] = kwargs.get('nids_sid') + if kwargs.get('server_id'): + user['server_id'] = kwargs.get('server_id') + if kwargs.get('gpgkey'): + user['gpgkey'] = kwargs.get('gpgkey') + if kwargs.get('certif_public'): + user['certif_public'] = kwargs.get('certif_public') + if kwargs.get('autoalert'): + user['autoalert'] = kwargs.get('autoalert') + if kwargs.get('contactalert'): + user['contactalert'] = kwargs.get('contactalert') + if kwargs.get('disabled'): + user['disabled'] = kwargs.get('disabled') + if kwargs.get('change_pw'): + user['change_pw'] = kwargs.get('change_pw') + if kwargs.get('termsaccepted'): + user['termsaccepted'] = kwargs.get('termsaccepted') + if kwargs.get('newsread'): + user['newsread'] = kwargs.get('newsread') return user def get_users_list(self): @@ -1178,18 +1178,8 @@ class PyMISP(object): response = session.get(url) return self._check_response(response) - def add_user(self, email, org_id, role_id, password=None, - external_auth_required=None, external_auth_key=None, - enable_password=None, nids_sid=None, server_id=None, - gpgkey=None, certif_public=None, autoalert=None, - contactalert=None, disabled=None, change_pw=None, - termsaccepted=None, newsread=None): - new_user = self._set_user_parameters(email, org_id, role_id, password, - external_auth_required, external_auth_key, - enable_password, nids_sid, server_id, - gpgkey, certif_public, autoalert, - contactalert, disabled, change_pw, - termsaccepted, newsread) + def add_user(self, email, org_id, role_id, **kwargs): + new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs)) session = self.__prepare_session() url = urljoin(self.root_url, 'admin/users/add/') response = session.post(url, data=json.dumps(new_user)) @@ -1197,30 +1187,20 @@ class PyMISP(object): def add_user_json(self, json_file): session = self.__prepare_session() - jdata = json.load(open(json_file)) + with open(json_file) as f: + jdata = json.load(f) url = urljoin(self.root_url, 'admin/users/add/') response = session.post(url, data=json.dumps(jdata)) return self._check_response(response) - def get_add_user_fields_list(self): + def get_user_fields_list(self): session = self.__prepare_session() url = urljoin(self.root_url, 'admin/users/add/') response = session.get(url) return self._check_response(response) - def edit_user(self, user_id, email=None, org_id=None, role_id=None, - password=None, external_auth_required=None, - external_auth_key=None, enable_password=None, nids_sid=None, - server_id=None, gpgkey=None, certif_public=None, - autoalert=None, contactalert=None, disabled=None, - change_pw=None, termsaccepted=None, newsread=None): - edit_user = self._set_user_parameters(email, org_id, role_id, password, - external_auth_required, external_auth_key, - enable_password, nids_sid, server_id, - gpgkey, certif_public, autoalert, - contactalert, disabled, change_pw, - termsaccepted, newsread) - + def edit_user(self, user_id, **kwargs): + edit_user = self._set_user_parameters(**kwargs) session = self.__prepare_session() url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) response = session.post(url, data=json.dumps(edit_user)) @@ -1228,19 +1208,92 @@ class PyMISP(object): def edit_user_json(self, json_file, user_id): session = self.__prepare_session() - jdata = json.load(open(json_file)) + with open(json_file) as f: + jdata = json.load(f) url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) response = session.post(url, data=json.dumps(jdata)) return self._check_response(response) - def get_edit_user_fields_list(self, user_id): - session = self.__prepare_session() - url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) - response = session.get(url) - return self._check_response(response) - def delete_user(self, user_id): session = self.__prepare_session() url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id)) response = session.post(url) return self._check_response(response) + + # ############## Organisations ################## + + def _set_organisation_parameters(self, **kwargs): + organisation = {} + if kwargs.get('name'): + organisation['name'] = kwargs.get('name') + if kwargs.get('anonymise'): + organisation['anonymise'] = kwargs.get('anonymise') + if kwargs.get('description'): + organisation['description'] = kwargs.get('description') + if kwargs.get('type'): + organisation['type'] = kwargs.get('type') + if kwargs.get('nationality'): + organisation['nationality'] = kwargs.get('nationality') + if kwargs.get('sector'): + organisation['sector'] = kwargs.get('sector') + if kwargs.get('uuid'): + organisation['uuid'] = kwargs.get('uuid') + if kwargs.get('contacts'): + organisation['contacts'] = kwargs.get('contacts') + if kwargs.get('local'): + organisation['local'] = kwargs.get('local') + return organisation + + def get_organisations_list(self): + session = self.__prepare_session() + url = urljoin(self.root_url, 'organisations') + response = session.get(url) + return self._check_response(response)['response'] + + def get_organisation(self, organisation_id): + session = self.__prepare_session() + url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id)) + response = session.get(url) + return self._check_response(response) + + def add_organisation(self, name, **kwargs): + new_org = self._set_organisation_parameters(**dict(name=name, **kwargs)) + session = self.__prepare_session() + url = urljoin(self.root_url, 'admin/organisations/add/') + response = session.post(url, data=json.dumps(new_org)) + return self._check_response(response) + + def add_organisation_json(self, json_file): + session = self.__prepare_session() + with open(json_file) as f: + jdata = json.load(f) + url = urljoin(self.root_url, 'admin/organisations/add/') + response = session.post(url, data=json.dumps(jdata)) + return self._check_response(response) + + def get_organisation_fields_list(self): + session = self.__prepare_session() + url = urljoin(self.root_url, 'admin/organisations/add/') + response = session.get(url) + return self._check_response(response) + + def edit_organisation(self, org_id, **kwargs): + edit_org = self._set_organisation_parameters(**kwargs) + session = self.__prepare_session() + url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) + response = session.post(url, data=json.dumps(edit_org)) + return self._check_response(response) + + def edit_organisation_json(self, json_file, org_id): + session = self.__prepare_session() + with open(json_file) as f: + jdata = json.load(f) + url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) + response = session.post(url, data=json.dumps(jdata)) + return self._check_response(response) + + def delete_organisation(self, org_id): + session = self.__prepare_session() + url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id)) + response = session.post(url) + return self._check_response(response)