mirror of https://github.com/MISP/PyMISP
Add orgs managment
parent
aadae9b20e
commit
5cfbb679db
201
pymisp/api.py
201
pymisp/api.py
|
@ -127,7 +127,8 @@ class PyMISP(object):
|
||||||
if not self.describe_types.get('sane_defaults'):
|
if not self.describe_types.get('sane_defaults'):
|
||||||
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
|
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
|
||||||
except:
|
except:
|
||||||
describe_types = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r'))
|
with open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r') as f:
|
||||||
|
describe_types = json.load(f)
|
||||||
self.describe_types = describe_types['result']
|
self.describe_types = describe_types['result']
|
||||||
|
|
||||||
self.categories = self.describe_types['categories']
|
self.categories = self.describe_types['categories']
|
||||||
|
@ -400,6 +401,7 @@ class PyMISP(object):
|
||||||
regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I)
|
regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I)
|
||||||
match = regex.match(uuid)
|
match = regex.match(uuid)
|
||||||
return bool(match)
|
return bool(match)
|
||||||
|
|
||||||
# ##### File attributes #####
|
# ##### File attributes #####
|
||||||
|
|
||||||
def _send_attributes(self, event, attributes, proposal=False):
|
def _send_attributes(self, event, attributes, proposal=False):
|
||||||
|
@ -1110,7 +1112,8 @@ class PyMISP(object):
|
||||||
|
|
||||||
def sighting_per_json(self, json_file):
|
def sighting_per_json(self, json_file):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
jdata = json.load(open(json_file))
|
with open(json_file) as f:
|
||||||
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'sightings/add/')
|
url = urljoin(self.root_url, 'sightings/add/')
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -1125,45 +1128,42 @@ class PyMISP(object):
|
||||||
|
|
||||||
# ############## Users ##################
|
# ############## Users ##################
|
||||||
|
|
||||||
def _set_user_parameters(self, email, org_id, role_id, password, external_auth_required,
|
def _set_user_parameters(self, **kwargs):
|
||||||
external_auth_key, enable_password, nids_sid, server_id,
|
|
||||||
gpgkey, certif_public, autoalert, contactalert, disabled,
|
|
||||||
change_pw, termsaccepted, newsread):
|
|
||||||
user = {}
|
user = {}
|
||||||
if email is not None:
|
if kwargs.get('email'):
|
||||||
user['email'] = email
|
user['email'] = kwargs.get('email')
|
||||||
if org_id is not None:
|
if kwargs.get('org_id'):
|
||||||
user['org_id'] = org_id
|
user['org_id'] = kwargs.get('org_id')
|
||||||
if role_id is not None:
|
if kwargs.get('role_id'):
|
||||||
user['role_id'] = role_id
|
user['role_id'] = kwargs.get('role_id')
|
||||||
if password is not None:
|
if kwargs.get('password'):
|
||||||
user['password'] = password
|
user['password'] = kwargs.get('password')
|
||||||
if external_auth_required is not None:
|
if kwargs.get('external_auth_required'):
|
||||||
user['external_auth_required'] = external_auth_required
|
user['external_auth_required'] = kwargs.get('external_auth_required')
|
||||||
if external_auth_key is not None:
|
if kwargs.get('external_auth_key'):
|
||||||
user['external_auth_key'] = external_auth_key
|
user['external_auth_key'] = kwargs.get('external_auth_key')
|
||||||
if enable_password is not None:
|
if kwargs.get('enable_password'):
|
||||||
user['enable_password'] = enable_password
|
user['enable_password'] = kwargs.get('enable_password')
|
||||||
if nids_sid is not None:
|
if kwargs.get('nids_sid'):
|
||||||
user['nids_sid'] = nids_sid
|
user['nids_sid'] = kwargs.get('nids_sid')
|
||||||
if server_id is not None:
|
if kwargs.get('server_id'):
|
||||||
user['server_id'] = server_id
|
user['server_id'] = kwargs.get('server_id')
|
||||||
if gpgkey is not None:
|
if kwargs.get('gpgkey'):
|
||||||
user['gpgkey'] = gpgkey
|
user['gpgkey'] = kwargs.get('gpgkey')
|
||||||
if certif_public is not None:
|
if kwargs.get('certif_public'):
|
||||||
user['certif_public'] = certif_public
|
user['certif_public'] = kwargs.get('certif_public')
|
||||||
if autoalert is not None:
|
if kwargs.get('autoalert'):
|
||||||
user['autoalert'] = autoalert
|
user['autoalert'] = kwargs.get('autoalert')
|
||||||
if contactalert is not None:
|
if kwargs.get('contactalert'):
|
||||||
user['contactalert'] = contactalert
|
user['contactalert'] = kwargs.get('contactalert')
|
||||||
if disabled is not None:
|
if kwargs.get('disabled'):
|
||||||
user['disabled'] = disabled
|
user['disabled'] = kwargs.get('disabled')
|
||||||
if change_pw is not None:
|
if kwargs.get('change_pw'):
|
||||||
user['change_pw'] = change_pw
|
user['change_pw'] = kwargs.get('change_pw')
|
||||||
if termsaccepted is not None:
|
if kwargs.get('termsaccepted'):
|
||||||
user['termsaccepted'] = termsaccepted
|
user['termsaccepted'] = kwargs.get('termsaccepted')
|
||||||
if newsread is not None:
|
if kwargs.get('newsread'):
|
||||||
user['newsread'] = newsread
|
user['newsread'] = kwargs.get('newsread')
|
||||||
return user
|
return user
|
||||||
|
|
||||||
def get_users_list(self):
|
def get_users_list(self):
|
||||||
|
@ -1178,18 +1178,8 @@ class PyMISP(object):
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_user(self, email, org_id, role_id, password=None,
|
def add_user(self, email, org_id, role_id, **kwargs):
|
||||||
external_auth_required=None, external_auth_key=None,
|
new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs))
|
||||||
enable_password=None, nids_sid=None, server_id=None,
|
|
||||||
gpgkey=None, certif_public=None, autoalert=None,
|
|
||||||
contactalert=None, disabled=None, change_pw=None,
|
|
||||||
termsaccepted=None, newsread=None):
|
|
||||||
new_user = self._set_user_parameters(email, org_id, role_id, password,
|
|
||||||
external_auth_required, external_auth_key,
|
|
||||||
enable_password, nids_sid, server_id,
|
|
||||||
gpgkey, certif_public, autoalert,
|
|
||||||
contactalert, disabled, change_pw,
|
|
||||||
termsaccepted, newsread)
|
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'admin/users/add/')
|
url = urljoin(self.root_url, 'admin/users/add/')
|
||||||
response = session.post(url, data=json.dumps(new_user))
|
response = session.post(url, data=json.dumps(new_user))
|
||||||
|
@ -1197,30 +1187,20 @@ class PyMISP(object):
|
||||||
|
|
||||||
def add_user_json(self, json_file):
|
def add_user_json(self, json_file):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
jdata = json.load(open(json_file))
|
with open(json_file) as f:
|
||||||
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'admin/users/add/')
|
url = urljoin(self.root_url, 'admin/users/add/')
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_add_user_fields_list(self):
|
def get_user_fields_list(self):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'admin/users/add/')
|
url = urljoin(self.root_url, 'admin/users/add/')
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def edit_user(self, user_id, email=None, org_id=None, role_id=None,
|
def edit_user(self, user_id, **kwargs):
|
||||||
password=None, external_auth_required=None,
|
edit_user = self._set_user_parameters(**kwargs)
|
||||||
external_auth_key=None, enable_password=None, nids_sid=None,
|
|
||||||
server_id=None, gpgkey=None, certif_public=None,
|
|
||||||
autoalert=None, contactalert=None, disabled=None,
|
|
||||||
change_pw=None, termsaccepted=None, newsread=None):
|
|
||||||
edit_user = self._set_user_parameters(email, org_id, role_id, password,
|
|
||||||
external_auth_required, external_auth_key,
|
|
||||||
enable_password, nids_sid, server_id,
|
|
||||||
gpgkey, certif_public, autoalert,
|
|
||||||
contactalert, disabled, change_pw,
|
|
||||||
termsaccepted, newsread)
|
|
||||||
|
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||||
response = session.post(url, data=json.dumps(edit_user))
|
response = session.post(url, data=json.dumps(edit_user))
|
||||||
|
@ -1228,19 +1208,92 @@ class PyMISP(object):
|
||||||
|
|
||||||
def edit_user_json(self, json_file, user_id):
|
def edit_user_json(self, json_file, user_id):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
jdata = json.load(open(json_file))
|
with open(json_file) as f:
|
||||||
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_edit_user_fields_list(self, user_id):
|
|
||||||
session = self.__prepare_session()
|
|
||||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
|
||||||
response = session.get(url)
|
|
||||||
return self._check_response(response)
|
|
||||||
|
|
||||||
def delete_user(self, user_id):
|
def delete_user(self, user_id):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id))
|
url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id))
|
||||||
response = session.post(url)
|
response = session.post(url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
|
# ############## Organisations ##################
|
||||||
|
|
||||||
|
def _set_organisation_parameters(self, **kwargs):
|
||||||
|
organisation = {}
|
||||||
|
if kwargs.get('name'):
|
||||||
|
organisation['name'] = kwargs.get('name')
|
||||||
|
if kwargs.get('anonymise'):
|
||||||
|
organisation['anonymise'] = kwargs.get('anonymise')
|
||||||
|
if kwargs.get('description'):
|
||||||
|
organisation['description'] = kwargs.get('description')
|
||||||
|
if kwargs.get('type'):
|
||||||
|
organisation['type'] = kwargs.get('type')
|
||||||
|
if kwargs.get('nationality'):
|
||||||
|
organisation['nationality'] = kwargs.get('nationality')
|
||||||
|
if kwargs.get('sector'):
|
||||||
|
organisation['sector'] = kwargs.get('sector')
|
||||||
|
if kwargs.get('uuid'):
|
||||||
|
organisation['uuid'] = kwargs.get('uuid')
|
||||||
|
if kwargs.get('contacts'):
|
||||||
|
organisation['contacts'] = kwargs.get('contacts')
|
||||||
|
if kwargs.get('local'):
|
||||||
|
organisation['local'] = kwargs.get('local')
|
||||||
|
return organisation
|
||||||
|
|
||||||
|
def get_organisations_list(self):
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'organisations')
|
||||||
|
response = session.get(url)
|
||||||
|
return self._check_response(response)['response']
|
||||||
|
|
||||||
|
def get_organisation(self, organisation_id):
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id))
|
||||||
|
response = session.get(url)
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def add_organisation(self, name, **kwargs):
|
||||||
|
new_org = self._set_organisation_parameters(**dict(name=name, **kwargs))
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||||
|
response = session.post(url, data=json.dumps(new_org))
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def add_organisation_json(self, json_file):
|
||||||
|
session = self.__prepare_session()
|
||||||
|
with open(json_file) as f:
|
||||||
|
jdata = json.load(f)
|
||||||
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||||
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def get_organisation_fields_list(self):
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||||
|
response = session.get(url)
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def edit_organisation(self, org_id, **kwargs):
|
||||||
|
edit_org = self._set_organisation_parameters(**kwargs)
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||||
|
response = session.post(url, data=json.dumps(edit_org))
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def edit_organisation_json(self, json_file, org_id):
|
||||||
|
session = self.__prepare_session()
|
||||||
|
with open(json_file) as f:
|
||||||
|
jdata = json.load(f)
|
||||||
|
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||||
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def delete_organisation(self, org_id):
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id))
|
||||||
|
response = session.post(url)
|
||||||
|
return self._check_response(response)
|
||||||
|
|
Loading…
Reference in New Issue