mirror of https://github.com/MISP/PyMISP
Fix travis
parent
351ad53c97
commit
619538ced7
|
@ -46,6 +46,7 @@ class distributions(object):
|
||||||
all_communities = 3
|
all_communities = 3
|
||||||
sharing_group = 4
|
sharing_group = 4
|
||||||
|
|
||||||
|
|
||||||
class threat_level(object):
|
class threat_level(object):
|
||||||
"""Enumeration of the available threat levels."""
|
"""Enumeration of the available threat levels."""
|
||||||
high = 1
|
high = 1
|
||||||
|
@ -320,6 +321,14 @@ class PyMISP(object):
|
||||||
|
|
||||||
# ########## Helpers ##########
|
# ########## Helpers ##########
|
||||||
|
|
||||||
|
def _make_mispevent(self, event):
|
||||||
|
if not isinstance(event, MISPEvent):
|
||||||
|
e = MISPEvent(self.describe_types)
|
||||||
|
e.load(event)
|
||||||
|
else:
|
||||||
|
e = event
|
||||||
|
return e
|
||||||
|
|
||||||
def get(self, eid):
|
def get(self, eid):
|
||||||
return self.get_event(eid)
|
return self.get_event(eid)
|
||||||
|
|
||||||
|
@ -327,33 +336,30 @@ class PyMISP(object):
|
||||||
return self.get_stix_event(**kwargs)
|
return self.get_stix_event(**kwargs)
|
||||||
|
|
||||||
def update(self, event):
|
def update(self, event):
|
||||||
if event['Event'].get('uuid'):
|
e = self._make_mispevent(event)
|
||||||
eid = event['Event']['uuid']
|
if e.uuid:
|
||||||
|
eid = e.uuid
|
||||||
else:
|
else:
|
||||||
eid = event['Event']['id']
|
eid = e.id
|
||||||
return self.update_event(eid, event)
|
return self.update_event(eid, json.dumps(e, cls=EncodeUpdate))
|
||||||
|
|
||||||
def publish(self, event):
|
def publish(self, event):
|
||||||
if event['Event']['published']:
|
e = self._make_mispevent(event)
|
||||||
|
if e.published:
|
||||||
return {'error': 'Already published'}
|
return {'error': 'Already published'}
|
||||||
e = MISPEvent(self.describe_types)
|
|
||||||
e.load(event)
|
|
||||||
e.publish()
|
e.publish()
|
||||||
return self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
|
return self.update(event)
|
||||||
|
|
||||||
def change_threat_level(self, event, threat_level_id):
|
def change_threat_level(self, event, threat_level_id):
|
||||||
e = MISPEvent(self.describe_types)
|
e = self._make_mispevent(event)
|
||||||
e.load(event)
|
|
||||||
e.threat_level_id = threat_level_id
|
e.threat_level_id = threat_level_id
|
||||||
return self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
|
return self.update(event)
|
||||||
|
|
||||||
def change_sharing_group(self, event, sharing_group_id):
|
def change_sharing_group(self, event, sharing_group_id):
|
||||||
e = MISPEvent(self.describe_types)
|
e = self._make_mispevent(event)
|
||||||
e.load(event)
|
|
||||||
e.distribution = 4 # Needs to be 'Sharing group'
|
e.distribution = 4 # Needs to be 'Sharing group'
|
||||||
e.sharing_group_id = sharing_group_id
|
e.sharing_group_id = sharing_group_id
|
||||||
return self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
|
return self.update(event)
|
||||||
|
|
||||||
|
|
||||||
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
|
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
|
||||||
misp_event = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published, orgc_id, org_id, sharing_group_id)
|
misp_event = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published, orgc_id, org_id, sharing_group_id)
|
||||||
|
@ -391,7 +397,7 @@ class PyMISP(object):
|
||||||
e = MISPEvent(self.describe_types)
|
e = MISPEvent(self.describe_types)
|
||||||
e.load(event)
|
e.load(event)
|
||||||
e.attributes += attributes
|
e.attributes += attributes
|
||||||
response = self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
|
response = self.update(event)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
||||||
|
@ -1054,11 +1060,10 @@ class PyMISP(object):
|
||||||
|
|
||||||
def get_sharing_groups(self):
|
def get_sharing_groups(self):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'sharing_groups')
|
url = urljoin(self.root_url, 'sharing_groups.json')
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
return self._check_response(response)['response']
|
return self._check_response(response)['response']
|
||||||
|
|
||||||
|
|
||||||
# ############## Users ##################
|
# ############## Users ##################
|
||||||
|
|
||||||
def _set_user_parameters(self, email, org_id, role_id, password, external_auth_required,
|
def _set_user_parameters(self, email, org_id, role_id, password, external_auth_required,
|
||||||
|
|
|
@ -38,11 +38,11 @@ class TestOffline(unittest.TestCase):
|
||||||
|
|
||||||
def initURI(self, m):
|
def initURI(self, m):
|
||||||
m.register_uri('GET', self.domain + 'events/1', json=self.auth_error_msg, status_code=403)
|
m.register_uri('GET', self.domain + 'events/1', json=self.auth_error_msg, status_code=403)
|
||||||
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.50"})
|
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.56"})
|
||||||
m.register_uri('GET', self.domain + 'sharing_groups/index.json', json=self.sharing_groups)
|
m.register_uri('GET', self.domain + 'sharing_groups.json', json=self.sharing_groups)
|
||||||
m.register_uri('GET', self.domain + 'attributes/describeTypes.json', json=self.types)
|
m.register_uri('GET', self.domain + 'attributes/describeTypes.json', json=self.types)
|
||||||
m.register_uri('GET', self.domain + 'events/2', json=self.event)
|
m.register_uri('GET', self.domain + 'events/2', json=self.event)
|
||||||
m.register_uri('POST', self.domain + 'events/2', json=self.event)
|
m.register_uri('POST', self.domain + 'events/5758ebf5-c898-48e6-9fe9-5665c0a83866', json=self.event)
|
||||||
m.register_uri('DELETE', self.domain + 'events/2', json={'message': 'Event deleted.'})
|
m.register_uri('DELETE', self.domain + 'events/2', json={'message': 'Event deleted.'})
|
||||||
m.register_uri('DELETE', self.domain + 'events/3', json={'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
|
m.register_uri('DELETE', self.domain + 'events/3', json={'errors': ['Invalid event'], 'message': 'Invalid event', 'name': 'Invalid event', 'url': '/events/3'})
|
||||||
m.register_uri('DELETE', self.domain + 'attributes/2', json={'message': 'Attribute deleted.'})
|
m.register_uri('DELETE', self.domain + 'attributes/2', json={'message': 'Attribute deleted.'})
|
||||||
|
@ -60,8 +60,8 @@ class TestOffline(unittest.TestCase):
|
||||||
def test_updateEvent(self, m):
|
def test_updateEvent(self, m):
|
||||||
self.initURI(m)
|
self.initURI(m)
|
||||||
pymisp = PyMISP(self.domain, self.key)
|
pymisp = PyMISP(self.domain, self.key)
|
||||||
e0 = pymisp.update_event(2, json.dumps(self.event))
|
e0 = pymisp.update_event('5758ebf5-c898-48e6-9fe9-5665c0a83866', json.dumps(self.event))
|
||||||
e1 = pymisp.update_event(2, self.event)
|
e1 = pymisp.update_event('5758ebf5-c898-48e6-9fe9-5665c0a83866', self.event)
|
||||||
self.assertEqual(e0, e1)
|
self.assertEqual(e0, e1)
|
||||||
e2 = pymisp.update(e0)
|
e2 = pymisp.update(e0)
|
||||||
self.assertEqual(e1, e2)
|
self.assertEqual(e1, e2)
|
||||||
|
@ -97,7 +97,7 @@ class TestOffline(unittest.TestCase):
|
||||||
api_version = pymisp.get_api_version()
|
api_version = pymisp.get_api_version()
|
||||||
self.assertEqual(api_version, {'version': pm.__version__})
|
self.assertEqual(api_version, {'version': pm.__version__})
|
||||||
server_version = pymisp.get_version()
|
server_version = pymisp.get_version()
|
||||||
self.assertEqual(server_version, {"version": "2.4.50"})
|
self.assertEqual(server_version, {"version": "2.4.56"})
|
||||||
|
|
||||||
def test_getSharingGroups(self, m):
|
def test_getSharingGroups(self, m):
|
||||||
self.initURI(m)
|
self.initURI(m)
|
||||||
|
|
Loading…
Reference in New Issue