Merge pull request #53 from SekoiaLab/feature/serversRestApi

Adds new methods to add and edit servers in MISP
pull/55/head
Raphaël Vinot 2017-02-23 16:11:47 +01:00 committed by GitHub
commit ad02ebdef7
1 changed files with 85 additions and 0 deletions

View File

@ -1334,3 +1334,88 @@ class PyMISP(object):
url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id))
response = session.post(url)
return self._check_response(response)
# ############## Servers ##################
def _set_server_organisation(self, server, organisation):
if organisation is not None and 'type' in organisation:
organisation_type = organisation['type']
if organisation_type < 2:
if 'id' in organisation:
server['organisation_type'] = organisation_type
server['json'] = json.dump({'id': organisation['id']})
else:
if 'name' in organisation and 'uuid' in organisation:
server['organisation_type'] = organisation_type
server['json'] = json.dumps({'name': organisation['name'], 'uuid': organisation['uuid']})
return server
def _set_server_parameters(self, url, name, authkey, organisation, internal,
push, pull, self_signed, push_rules, pull_rules,
submitted_cert, submitted_client_cert, delete_cert,
delete_client_cert):
server = {}
self._set_server_organisation(server, organisation)
if url is not None:
server['url'] = url
if name is not None:
server['name'] = name
if authkey is not None:
server['authkey'] = authkey
if internal is not None:
server['internal'] = internal
if push is not None:
server['push'] = push
if pull is not None:
server['pull'] = pull
if self_signed is not None:
server['self_signed'] = self_signed
if push_rules is not None:
server['push_rules'] = push_rules
if pull_rules is not None:
server['pull_rules'] = pull_rules
if submitted_cert is not None:
server['submitted_cert'] = submitted_cert
if submitted_client_cert is not None:
server['submitted_client_cert'] = submitted_client_cert
if delete_cert is not None:
server['delete_cert'] = delete_cert
if delete_client_cert is not None:
server['delete_client_cert'] = delete_client_cert
return server
def add_server(self, url, name, authkey, organisation, internal=None, push=None,
pull=None, self_signed=None, push_rules=None, pull_rules=None,
submitted_cert=None, submitted_client_cert=None):
new_server = self._set_server_parameters(url, name, authkey, organisation, internal,
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
submitted_client_cert, None, None)
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/add')
response = session.post(url, data=json.dumps(new_server))
return self._check_response(response)
def add_server_json(self, json_file):
session = self.__prepare_session()
jdata = json.load(open(json_file))
url = urljoin(self.root_url, 'servers/add')
response = session.post(url, data=json.dumps(jdata))
return self._check_response(response)
def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=None,
pull=None, self_signed=None, push_rules=None, pull_rules=None,
submitted_cert=None, submitted_client_cert=None, delete_cert=None, delete_client_cert=None):
new_server = self._set_server_parameters(url, name, authkey, organisation, internal,
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
submitted_client_cert, delete_cert, delete_client_cert)
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
response = session.post(url, data=json.dumps(new_server))
return self._check_response(response)
def add_server_json(self, json_file, server_id):
session = self.__prepare_session()
jdata = json.load(open(json_file))
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
response = session.post(url, data=json.dumps(jdata))
return self._check_response(response)