mirror of https://github.com/MISP/PyMISP
parent
a68bd80ab9
commit
3fb54e62b2
|
@ -1602,12 +1602,16 @@ class PyMISP(object):
|
||||||
def get_users_list(self):
|
def get_users_list(self):
|
||||||
return self._rest_list('admin/users')
|
return self._rest_list('admin/users')
|
||||||
|
|
||||||
def get_user(self, user_id):
|
def get_user(self, user_id='me'):
|
||||||
return self._rest_view('admin/users', user_id)
|
return self._rest_view('users', user_id)
|
||||||
|
|
||||||
def add_user(self, email, org_id, role_id, **kwargs):
|
def add_user(self, email, org_id=None, role_id=None, **kwargs):
|
||||||
new_user = MISPUser()
|
if isinstance(email, MISPUser):
|
||||||
new_user.from_dict(email=email, org_id=org_id, role_id=role_id, **kwargs)
|
# Very dirty, allow to call that from ExpandedPyMISP
|
||||||
|
new_user = email
|
||||||
|
else:
|
||||||
|
new_user = MISPUser()
|
||||||
|
new_user.from_dict(email=email, org_id=org_id, role_id=role_id, **kwargs)
|
||||||
return self._rest_add('admin/users', new_user)
|
return self._rest_add('admin/users', new_user)
|
||||||
|
|
||||||
def add_user_json(self, json_file):
|
def add_user_json(self, json_file):
|
||||||
|
@ -1647,12 +1651,16 @@ class PyMISP(object):
|
||||||
return self._rest_view('organisations', organisation_id)
|
return self._rest_view('organisations', organisation_id)
|
||||||
|
|
||||||
def add_organisation(self, name, **kwargs):
|
def add_organisation(self, name, **kwargs):
|
||||||
new_org = MISPOrganisation()
|
if isinstance(name, MISPOrganisation):
|
||||||
new_org.from_dict(name=name, **kwargs)
|
# Very dirty, allow to call that from ExpandedPyMISP
|
||||||
if 'local' in new_org:
|
new_org = name
|
||||||
if new_org.get('local') is False:
|
else:
|
||||||
if 'uuid' not in new_org:
|
new_org = MISPOrganisation()
|
||||||
raise PyMISPError('A remote org MUST have a valid uuid')
|
new_org.from_dict(name=name, **kwargs)
|
||||||
|
if 'local' in new_org:
|
||||||
|
if new_org.get('local') is False:
|
||||||
|
if 'uuid' not in new_org:
|
||||||
|
raise PyMISPError('A remote org MUST have a valid uuid')
|
||||||
return self._rest_add('admin/organisations', new_org)
|
return self._rest_add('admin/organisations', new_org)
|
||||||
|
|
||||||
def add_organisation_json(self, json_file):
|
def add_organisation_json(self, json_file):
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet
|
from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet
|
||||||
from .api import PyMISP, everything_broken
|
from .api import PyMISP, everything_broken
|
||||||
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject
|
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, MISPUser, MISPOrganisation
|
||||||
from typing import TypeVar, Optional, Tuple, List, Dict, Union
|
from typing import TypeVar, Optional, Tuple, List, Dict, Union
|
||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
import csv
|
import csv
|
||||||
|
@ -161,6 +161,36 @@ class ExpandedPyMISP(PyMISP):
|
||||||
a.from_dict(**updated_attribute)
|
a.from_dict(**updated_attribute)
|
||||||
return a
|
return a
|
||||||
|
|
||||||
|
def add_user(self, user: MISPUser):
|
||||||
|
user = super().add_user(user)
|
||||||
|
if isinstance(user, str):
|
||||||
|
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {user}')
|
||||||
|
elif 'errors' in user:
|
||||||
|
return user
|
||||||
|
u = MISPUser()
|
||||||
|
u.from_dict(**user)
|
||||||
|
return u
|
||||||
|
|
||||||
|
def get_user(self, userid='me'):
|
||||||
|
user = super().get_user(userid)
|
||||||
|
if isinstance(user, str):
|
||||||
|
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {user}')
|
||||||
|
elif 'errors' in user:
|
||||||
|
return user
|
||||||
|
u = MISPUser()
|
||||||
|
u.from_dict(**user)
|
||||||
|
return u
|
||||||
|
|
||||||
|
def add_organisation(self, organisation: MISPOrganisation):
|
||||||
|
organisation = super().add_organisation(organisation)
|
||||||
|
if isinstance(organisation, str):
|
||||||
|
raise PyMISPUnexpectedResponse(f'Unexpected response from server: {organisation}')
|
||||||
|
elif 'errors' in organisation:
|
||||||
|
return organisation
|
||||||
|
o = MISPOrganisation()
|
||||||
|
o.from_dict(**organisation)
|
||||||
|
return o
|
||||||
|
|
||||||
def search_sightings(self, context: Optional[str]=None,
|
def search_sightings(self, context: Optional[str]=None,
|
||||||
context_id: Optional[SearchType]=None,
|
context_id: Optional[SearchType]=None,
|
||||||
type_sighting: Optional[str]=None,
|
type_sighting: Optional[str]=None,
|
||||||
|
|
|
@ -49,18 +49,22 @@ class TestComprehensive(unittest.TestCase):
|
||||||
# Connect as admin
|
# Connect as admin
|
||||||
cls.admin_misp_connector = ExpandedPyMISP(url, key, verifycert, debug=False)
|
cls.admin_misp_connector = ExpandedPyMISP(url, key, verifycert, debug=False)
|
||||||
# Creates an org
|
# Creates an org
|
||||||
org = cls.admin_misp_connector.add_organisation(name='Test Org')
|
organisation = MISPOrganisation()
|
||||||
cls.test_org = MISPOrganisation()
|
organisation.name = 'Test Org'
|
||||||
cls.test_org.from_dict(**org)
|
cls.test_org = cls.admin_misp_connector.add_organisation(organisation)
|
||||||
# Creates a user
|
# Creates a user
|
||||||
usr = cls.admin_misp_connector.add_user(email='testusr@user.local', org_id=cls.test_org.id, role_id=3)
|
# TODO & FIXME: set the default role to User is not already set - MISP/MISP #4423
|
||||||
cls.test_usr = MISPUser()
|
user = MISPUser()
|
||||||
cls.test_usr.from_dict(**usr)
|
user.email = 'testusr@user.local'
|
||||||
|
user.org_id = cls.test_org.id
|
||||||
|
cls.test_usr = cls.admin_misp_connector.add_user(user)
|
||||||
cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey, verifycert, debug=False)
|
cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey, verifycert, debug=False)
|
||||||
# Creates a publisher
|
# Creates a publisher
|
||||||
pub = cls.admin_misp_connector.add_user(email='testpub@user.local', org_id=cls.test_org.id, role_id=4)
|
user = MISPUser()
|
||||||
cls.test_pub = MISPUser()
|
user.email = 'testpub@user.local'
|
||||||
cls.test_pub.from_dict(**pub)
|
user.org_id = cls.test_org.id
|
||||||
|
user.role_id = 4
|
||||||
|
cls.test_pub = cls.admin_misp_connector.add_user(user)
|
||||||
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert)
|
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert)
|
||||||
# Update all json stuff
|
# Update all json stuff
|
||||||
cls.admin_misp_connector.update_object_templates()
|
cls.admin_misp_connector.update_object_templates()
|
||||||
|
@ -429,7 +433,6 @@ class TestComprehensive(unittest.TestCase):
|
||||||
# Delete event
|
# Delete event
|
||||||
self.admin_misp_connector.delete_event(first.id)
|
self.admin_misp_connector.delete_event(first.id)
|
||||||
|
|
||||||
# @unittest.skip("Uncomment when adding new tests, it has a 10s sleep")
|
|
||||||
def test_search_publish_timestamp(self):
|
def test_search_publish_timestamp(self):
|
||||||
'''Search for a specific publication timestamp, an interval, and invalid values.'''
|
'''Search for a specific publication timestamp, an interval, and invalid values.'''
|
||||||
# Creating event 1
|
# Creating event 1
|
||||||
|
@ -1100,6 +1103,10 @@ class TestComprehensive(unittest.TestCase):
|
||||||
# Delete event
|
# Delete event
|
||||||
self.admin_misp_connector.delete_event(first.id)
|
self.admin_misp_connector.delete_event(first.id)
|
||||||
|
|
||||||
|
def test_user(self):
|
||||||
|
user = self.user_misp_connector.get_user()
|
||||||
|
self.assertEqual(user.authkey, self.test_usr.authkey)
|
||||||
|
|
||||||
@unittest.skip("Currently failing")
|
@unittest.skip("Currently failing")
|
||||||
def test_search_type_event_csv(self):
|
def test_search_type_event_csv(self):
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue