mirror of https://github.com/MISP/PyMISP
new: Add test for pushing an event to ZMQ
parent
47229077f0
commit
9a7caa71fb
|
@ -14,7 +14,7 @@ try:
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
print(e)
|
print(e)
|
||||||
url = 'http://localhost:8080'
|
url = 'http://localhost:8080'
|
||||||
key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'
|
key = 'y0rs3LNOP0Y3v6dfSMMdhxj5Oxx9MfaInpRP2pBC'
|
||||||
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
@ -916,6 +916,16 @@ class TestComprehensive(unittest.TestCase):
|
||||||
self.assertEqual(r['Galaxy']['name'], list_name_test)
|
self.assertEqual(r['Galaxy']['name'], list_name_test)
|
||||||
self.assertTrue('GalaxyCluster' in r)
|
self.assertTrue('GalaxyCluster' in r)
|
||||||
|
|
||||||
|
def test_zmq(self):
|
||||||
|
first = self.create_simple_event()
|
||||||
|
try:
|
||||||
|
first = self.user_misp_connector.add_event(first)
|
||||||
|
r = self.admin_misp_connector.pushEventToZMQ(first.id)
|
||||||
|
self.assertEqual(r['message'], 'Event published to ZMQ')
|
||||||
|
finally:
|
||||||
|
# Delete event
|
||||||
|
self.admin_misp_connector.delete_event(first.id)
|
||||||
|
|
||||||
@unittest.skip("Currently failing")
|
@unittest.skip("Currently failing")
|
||||||
def test_search_type_event_csv(self):
|
def test_search_type_event_csv(self):
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue