From 21f1a642931f10b77882fe03b2de8257f0a85c46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 25 Apr 2018 16:44:00 +0200 Subject: [PATCH] chg: normalize the RestResponse calls. --- pymisp/api.py | 132 +++++++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 67 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index d413c40..f2aa86b 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1415,52 +1415,40 @@ class PyMISP(object): # ############## Users ################## def get_users_list(self): - url = urljoin(self.root_url, 'admin/users') - response = self.__prepare_request('GET', url) - return self._check_response(response)['response'] + return self._rest_list('admin/users') def get_user(self, user_id): - url = urljoin(self.root_url, 'admin/users/view/{}'.format(user_id)) - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_view('admin/users', user_id) def add_user(self, email, org_id, role_id, **kwargs): - url = urljoin(self.root_url, 'admin/users/add/') new_user = MISPUser() new_user.from_dict(email=email, org_id=org_id, role_id=role_id, **kwargs) - response = self.__prepare_request('POST', url, new_user.to_json()) - return self._check_response(response) + return self._rest_add('admin/users', new_user) def add_user_json(self, json_file): with open(json_file, 'r') as f: jdata = json.load(f) - url = urljoin(self.root_url, 'admin/users/add/') - response = self.__prepare_request('POST', url, json.dumps(jdata)) - return self._check_response(response) + new_user = MISPUser() + new_user.from_dict(**jdata) + return self._rest_add('admin/users', new_user) def get_user_fields_list(self): - url = urljoin(self.root_url, 'admin/users/add/') - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_get_parameters('admin/users') def edit_user(self, user_id, **kwargs): edit_user = MISPUser() edit_user.from_dict(**kwargs) - url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) - response = self.__prepare_request('POST', url, edit_user.to_json()) - return self._check_response(response) + return self._rest_edit('admin/users', edit_user, user_id) def edit_user_json(self, json_file, user_id): with open(json_file, 'r') as f: jdata = json.load(f) - url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) - response = self.__prepare_request('POST', url, json.dumps(jdata)) - return self._check_response(response) + new_user = MISPUser() + new_user.from_dict(**jdata) + return self._rest_edit('admin/users', new_user, user_id) def delete_user(self, user_id): - url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id)) - response = self.__prepare_request('POST', url) - return self._check_response(response) + return self._rest_delete('admin/users', user_id) # ############## Organisations ################## @@ -1468,14 +1456,10 @@ class PyMISP(object): scope = scope.lower() if scope not in ["local", "external", "all"]: raise ValueError("Authorized fields are 'local','external' or 'all'") - url = urljoin(self.root_url, 'organisations/index/scope:{}'.format(scope)) - response = self.__prepare_request('GET', url) - return self._check_response(response)['response'] + return self._rest_list('organisations/index/scope:{}'.format(scope)) def get_organisation(self, organisation_id): - url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id)) - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_view('organisations', organisation_id) def add_organisation(self, name, **kwargs): new_org = MISPOrganisation() @@ -1484,40 +1468,32 @@ class PyMISP(object): if new_org.get('local') is False: if 'uuid' not in new_org: raise PyMISPError('A remote org MUST have a valid uuid') - url = urljoin(self.root_url, 'admin/organisations/add/') - response = self.__prepare_request('POST', url, new_org.to_json()) - return self._check_response(response) + return self._rest_add('admin/organisations', new_org) def add_organisation_json(self, json_file): with open(json_file, 'r') as f: jdata = json.load(f) - url = urljoin(self.root_url, 'admin/organisations/add/') - response = self.__prepare_request('POST', url, json.dumps(jdata)) - return self._check_response(response) + new_org = MISPOrganisation() + new_org.from_dict(**jdata) + return self._rest_add('admin/organisations', new_org) def get_organisation_fields_list(self): - url = urljoin(self.root_url, 'admin/organisations/add/') - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_get_parameters('admin/organisations') def edit_organisation(self, org_id, **kwargs): edit_org = MISPOrganisation() edit_org.from_dict(**kwargs) - url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) - response = self.__prepare_request('POST', url, edit_org.to_json()) - return self._check_response(response) + return self._rest_edit('admin/organisations', edit_org, org_id) def edit_organisation_json(self, json_file, org_id): with open(json_file, 'r') as f: jdata = json.load(f) - url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) - response = self.__prepare_request('POST', url, json.dumps(jdata)) - return self._check_response(response) + edit_org = MISPOrganisation() + edit_org.from_dict(**jdata) + return self._rest_edit('admin/organisations', edit_org, org_id) def delete_organisation(self, org_id): - url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id)) - response = self.__prepare_request('POST', url) - return self._check_response(response) + return self._rest_delete('admin/organisations', org_id) # ############## Servers ################## @@ -1738,49 +1714,71 @@ class PyMISP(object): response = self.__prepare_request('POST', url, output_type='json') return response.text + # ####################################### + # ######## RestResponse generic ######### + # ####################################### + + def _rest_list(self, urlpath): + url = urljoin(self.root_url, urlpath) + response = self.__prepare_request('GET', url) + return self._check_response(response) + + def _rest_get_parameters(self, urlpath): + url = urljoin(self.root_url, '{}/add'.format(urlpath)) + response = self.__prepare_request('GET', url) + return self._check_response(response) + + def _rest_view(self, urlpath, rest_id): + url = urljoin(self.root_url, '{}/view/{}'.format(urlpath, rest_id)) + response = self.__prepare_request('GET', url) + return self._check_response(response) + + def _rest_add(self, urlpath, obj): + url = urljoin(self.root_url, '{}/add'.format(urlpath)) + response = self.__prepare_request('POST', url, obj.to_json()) + return self._check_response(response) + + def _rest_edit(self, urlpath, obj, rest_id): + url = urljoin(self.root_url, '{}/edit/{}'.format(urlpath, rest_id)) + response = self.__prepare_request('POST', url, obj.to_json()) + return self._check_response(response) + + def _rest_delete(self, urlpath, rest_id): + url = urljoin(self.root_url, '{}/delete/{}'.format(urlpath, rest_id)) + response = self.__prepare_request('GET', url) + return self._check_response(response) + # ########################### # ######## Feed ######### # ########################### def get_feeds_list(self): """Get the content of all the feeds""" - url = urljoin(self.root_url, 'feeds') - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_list('feed') - def get_feed(self, feed_ids): + def get_feed(self, feed_id): """Get the content of a single feed""" - url = urljoin(self.root_url, 'feeds/view/{}'.format(feed_ids)) - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_view('feed', feed_id) def add_feed(self, source_format, url, name, input_source, provider, **kwargs): """Delete a feed""" - url = urljoin(self.root_url, 'feeds/add') new_feed = MISPFeed() new_feed.from_dict(source_format=source_format, url=url, name=name, input_source=input_source, provider=provider) - response = self.__prepare_request('POST', url, new_feed.to_json()) - return self._check_response(response) + return self._rest_add('feed', new_feed) def get_feed_fields_list(self): - url = urljoin(self.root_url, 'feeds/add/') - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_get_parameters('feed') def edit_feed(self, feed_id, **kwargs): """Delete a feed""" - url = urljoin(self.root_url, 'feeds/edit/{}'.format(feed_id)) edit_feed = MISPFeed() edit_feed.from_dict(**kwargs) - response = self.__prepare_request('POST', url, edit_feed.to_json()) - return self._check_response(response) + return self._rest_edit('feed', edit_feed) def delete_feed(self, feed_id): """Delete a feed""" - url = urljoin(self.root_url, 'feeds/delete/{}'.format(feed_id)) - response = self.__prepare_request('GET', url) - return self._check_response(response) + return self._rest_delete('feed', feed_id) def fetch_feed(self, feed_id): """Fetch one single feed"""