mirror of https://github.com/MISP/PyMISP
chg: normalize the RestResponse calls.
parent
411016b80b
commit
21f1a64293
132
pymisp/api.py
132
pymisp/api.py
|
@ -1415,52 +1415,40 @@ class PyMISP(object):
|
|||
# ############## Users ##################
|
||||
|
||||
def get_users_list(self):
|
||||
url = urljoin(self.root_url, 'admin/users')
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)['response']
|
||||
return self._rest_list('admin/users')
|
||||
|
||||
def get_user(self, user_id):
|
||||
url = urljoin(self.root_url, 'admin/users/view/{}'.format(user_id))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_view('admin/users', user_id)
|
||||
|
||||
def add_user(self, email, org_id, role_id, **kwargs):
|
||||
url = urljoin(self.root_url, 'admin/users/add/')
|
||||
new_user = MISPUser()
|
||||
new_user.from_dict(email=email, org_id=org_id, role_id=role_id, **kwargs)
|
||||
response = self.__prepare_request('POST', url, new_user.to_json())
|
||||
return self._check_response(response)
|
||||
return self._rest_add('admin/users', new_user)
|
||||
|
||||
def add_user_json(self, json_file):
|
||||
with open(json_file, 'r') as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/users/add/')
|
||||
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
new_user = MISPUser()
|
||||
new_user.from_dict(**jdata)
|
||||
return self._rest_add('admin/users', new_user)
|
||||
|
||||
def get_user_fields_list(self):
|
||||
url = urljoin(self.root_url, 'admin/users/add/')
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_get_parameters('admin/users')
|
||||
|
||||
def edit_user(self, user_id, **kwargs):
|
||||
edit_user = MISPUser()
|
||||
edit_user.from_dict(**kwargs)
|
||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||
response = self.__prepare_request('POST', url, edit_user.to_json())
|
||||
return self._check_response(response)
|
||||
return self._rest_edit('admin/users', edit_user, user_id)
|
||||
|
||||
def edit_user_json(self, json_file, user_id):
|
||||
with open(json_file, 'r') as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
new_user = MISPUser()
|
||||
new_user.from_dict(**jdata)
|
||||
return self._rest_edit('admin/users', new_user, user_id)
|
||||
|
||||
def delete_user(self, user_id):
|
||||
url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id))
|
||||
response = self.__prepare_request('POST', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_delete('admin/users', user_id)
|
||||
|
||||
# ############## Organisations ##################
|
||||
|
||||
|
@ -1468,14 +1456,10 @@ class PyMISP(object):
|
|||
scope = scope.lower()
|
||||
if scope not in ["local", "external", "all"]:
|
||||
raise ValueError("Authorized fields are 'local','external' or 'all'")
|
||||
url = urljoin(self.root_url, 'organisations/index/scope:{}'.format(scope))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)['response']
|
||||
return self._rest_list('organisations/index/scope:{}'.format(scope))
|
||||
|
||||
def get_organisation(self, organisation_id):
|
||||
url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_view('organisations', organisation_id)
|
||||
|
||||
def add_organisation(self, name, **kwargs):
|
||||
new_org = MISPOrganisation()
|
||||
|
@ -1484,40 +1468,32 @@ class PyMISP(object):
|
|||
if new_org.get('local') is False:
|
||||
if 'uuid' not in new_org:
|
||||
raise PyMISPError('A remote org MUST have a valid uuid')
|
||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||
response = self.__prepare_request('POST', url, new_org.to_json())
|
||||
return self._check_response(response)
|
||||
return self._rest_add('admin/organisations', new_org)
|
||||
|
||||
def add_organisation_json(self, json_file):
|
||||
with open(json_file, 'r') as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
new_org = MISPOrganisation()
|
||||
new_org.from_dict(**jdata)
|
||||
return self._rest_add('admin/organisations', new_org)
|
||||
|
||||
def get_organisation_fields_list(self):
|
||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_get_parameters('admin/organisations')
|
||||
|
||||
def edit_organisation(self, org_id, **kwargs):
|
||||
edit_org = MISPOrganisation()
|
||||
edit_org.from_dict(**kwargs)
|
||||
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||
response = self.__prepare_request('POST', url, edit_org.to_json())
|
||||
return self._check_response(response)
|
||||
return self._rest_edit('admin/organisations', edit_org, org_id)
|
||||
|
||||
def edit_organisation_json(self, json_file, org_id):
|
||||
with open(json_file, 'r') as f:
|
||||
jdata = json.load(f)
|
||||
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
||||
return self._check_response(response)
|
||||
edit_org = MISPOrganisation()
|
||||
edit_org.from_dict(**jdata)
|
||||
return self._rest_edit('admin/organisations', edit_org, org_id)
|
||||
|
||||
def delete_organisation(self, org_id):
|
||||
url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id))
|
||||
response = self.__prepare_request('POST', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_delete('admin/organisations', org_id)
|
||||
|
||||
# ############## Servers ##################
|
||||
|
||||
|
@ -1738,49 +1714,71 @@ class PyMISP(object):
|
|||
response = self.__prepare_request('POST', url, output_type='json')
|
||||
return response.text
|
||||
|
||||
# #######################################
|
||||
# ######## RestResponse generic #########
|
||||
# #######################################
|
||||
|
||||
def _rest_list(self, urlpath):
|
||||
url = urljoin(self.root_url, urlpath)
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
|
||||
def _rest_get_parameters(self, urlpath):
|
||||
url = urljoin(self.root_url, '{}/add'.format(urlpath))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
|
||||
def _rest_view(self, urlpath, rest_id):
|
||||
url = urljoin(self.root_url, '{}/view/{}'.format(urlpath, rest_id))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
|
||||
def _rest_add(self, urlpath, obj):
|
||||
url = urljoin(self.root_url, '{}/add'.format(urlpath))
|
||||
response = self.__prepare_request('POST', url, obj.to_json())
|
||||
return self._check_response(response)
|
||||
|
||||
def _rest_edit(self, urlpath, obj, rest_id):
|
||||
url = urljoin(self.root_url, '{}/edit/{}'.format(urlpath, rest_id))
|
||||
response = self.__prepare_request('POST', url, obj.to_json())
|
||||
return self._check_response(response)
|
||||
|
||||
def _rest_delete(self, urlpath, rest_id):
|
||||
url = urljoin(self.root_url, '{}/delete/{}'.format(urlpath, rest_id))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
|
||||
# ###########################
|
||||
# ######## Feed #########
|
||||
# ###########################
|
||||
|
||||
def get_feeds_list(self):
|
||||
"""Get the content of all the feeds"""
|
||||
url = urljoin(self.root_url, 'feeds')
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_list('feed')
|
||||
|
||||
def get_feed(self, feed_ids):
|
||||
def get_feed(self, feed_id):
|
||||
"""Get the content of a single feed"""
|
||||
url = urljoin(self.root_url, 'feeds/view/{}'.format(feed_ids))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_view('feed', feed_id)
|
||||
|
||||
def add_feed(self, source_format, url, name, input_source, provider, **kwargs):
|
||||
"""Delete a feed"""
|
||||
url = urljoin(self.root_url, 'feeds/add')
|
||||
new_feed = MISPFeed()
|
||||
new_feed.from_dict(source_format=source_format, url=url, name=name,
|
||||
input_source=input_source, provider=provider)
|
||||
response = self.__prepare_request('POST', url, new_feed.to_json())
|
||||
return self._check_response(response)
|
||||
return self._rest_add('feed', new_feed)
|
||||
|
||||
def get_feed_fields_list(self):
|
||||
url = urljoin(self.root_url, 'feeds/add/')
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_get_parameters('feed')
|
||||
|
||||
def edit_feed(self, feed_id, **kwargs):
|
||||
"""Delete a feed"""
|
||||
url = urljoin(self.root_url, 'feeds/edit/{}'.format(feed_id))
|
||||
edit_feed = MISPFeed()
|
||||
edit_feed.from_dict(**kwargs)
|
||||
response = self.__prepare_request('POST', url, edit_feed.to_json())
|
||||
return self._check_response(response)
|
||||
return self._rest_edit('feed', edit_feed)
|
||||
|
||||
def delete_feed(self, feed_id):
|
||||
"""Delete a feed"""
|
||||
url = urljoin(self.root_url, 'feeds/delete/{}'.format(feed_id))
|
||||
response = self.__prepare_request('GET', url)
|
||||
return self._check_response(response)
|
||||
return self._rest_delete('feed', feed_id)
|
||||
|
||||
def fetch_feed(self, feed_id):
|
||||
"""Fetch one single feed"""
|
||||
|
|
Loading…
Reference in New Issue