mirror of https://github.com/MISP/PyMISP
fix: Properly load feeds, fix undefined variable
parent
01a6ad4598
commit
14d278fff2
|
@ -966,6 +966,8 @@ class PyMISP:
|
||||||
f = MISPFeed()
|
f = MISPFeed()
|
||||||
f.id = feed_id
|
f.id = feed_id
|
||||||
f.enabled = True
|
f.enabled = True
|
||||||
|
else:
|
||||||
|
f = feed
|
||||||
return self.update_feed(feed=f, pythonify=pythonify)
|
return self.update_feed(feed=f, pythonify=pythonify)
|
||||||
|
|
||||||
def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
||||||
|
@ -975,6 +977,8 @@ class PyMISP:
|
||||||
f = MISPFeed()
|
f = MISPFeed()
|
||||||
f.id = feed_id
|
f.id = feed_id
|
||||||
f.enabled = False
|
f.enabled = False
|
||||||
|
else:
|
||||||
|
f = feed
|
||||||
return self.update_feed(feed=f, pythonify=pythonify)
|
return self.update_feed(feed=f, pythonify=pythonify)
|
||||||
|
|
||||||
def enable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
def enable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
||||||
|
@ -984,6 +988,8 @@ class PyMISP:
|
||||||
f = MISPFeed()
|
f = MISPFeed()
|
||||||
f.id = feed_id
|
f.id = feed_id
|
||||||
f.caching_enabled = True
|
f.caching_enabled = True
|
||||||
|
else:
|
||||||
|
f = feed
|
||||||
return self.update_feed(feed=f, pythonify=pythonify)
|
return self.update_feed(feed=f, pythonify=pythonify)
|
||||||
|
|
||||||
def disable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
def disable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
||||||
|
@ -993,6 +999,8 @@ class PyMISP:
|
||||||
f = MISPFeed()
|
f = MISPFeed()
|
||||||
f.id = feed_id
|
f.id = feed_id
|
||||||
f.caching_enabled = False
|
f.caching_enabled = False
|
||||||
|
else:
|
||||||
|
f = feed
|
||||||
return self.update_feed(feed=f, pythonify=pythonify)
|
return self.update_feed(feed=f, pythonify=pythonify)
|
||||||
|
|
||||||
def update_feed(self, feed: MISPFeed, feed_id: Optional[int]=None, pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
def update_feed(self, feed: MISPFeed, feed_id: Optional[int]=None, pythonify: bool=False) -> Union[Dict, MISPFeed]:
|
||||||
|
@ -1048,6 +1056,11 @@ class PyMISP:
|
||||||
response = self._prepare_request('GET', 'feeds/compareFeeds')
|
response = self._prepare_request('GET', 'feeds/compareFeeds')
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
|
def load_default_feeds(self) -> Dict:
|
||||||
|
"""Load all the default feeds."""
|
||||||
|
response = self._prepare_request('POST', 'feeds/loadDefaultFeeds')
|
||||||
|
return self._check_json_response(response)
|
||||||
|
|
||||||
# ## END Feed ###
|
# ## END Feed ###
|
||||||
|
|
||||||
# ## BEGIN Server ###
|
# ## BEGIN Server ###
|
||||||
|
|
|
@ -1507,6 +1507,7 @@ class MISPFeed(AbstractMISP):
|
||||||
if 'Feed' in kwargs:
|
if 'Feed' in kwargs:
|
||||||
kwargs = kwargs['Feed']
|
kwargs = kwargs['Feed']
|
||||||
super().from_dict(**kwargs)
|
super().from_dict(**kwargs)
|
||||||
|
self.settings = json.loads(self.settings)
|
||||||
|
|
||||||
|
|
||||||
class MISPWarninglist(AbstractMISP):
|
class MISPWarninglist(AbstractMISP):
|
||||||
|
|
|
@ -101,6 +101,7 @@ class TestComprehensive(unittest.TestCase):
|
||||||
cls.admin_misp_connector.update_noticelists()
|
cls.admin_misp_connector.update_noticelists()
|
||||||
cls.admin_misp_connector.update_warninglists()
|
cls.admin_misp_connector.update_warninglists()
|
||||||
cls.admin_misp_connector.update_taxonomies()
|
cls.admin_misp_connector.update_taxonomies()
|
||||||
|
cls.admin_misp_connector.load_default_feeds()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
|
@ -1880,6 +1881,20 @@ class TestComprehensive(unittest.TestCase):
|
||||||
self.assertFalse(feed.enabled)
|
self.assertFalse(feed.enabled)
|
||||||
feed = self.admin_misp_connector.disable_feed_cache(botvrij.id, pythonify=True)
|
feed = self.admin_misp_connector.disable_feed_cache(botvrij.id, pythonify=True)
|
||||||
self.assertFalse(feed.enabled)
|
self.assertFalse(feed.enabled)
|
||||||
|
# Test enable csv feed - https://github.com/MISP/PyMISP/issues/574
|
||||||
|
feeds = self.admin_misp_connector.feeds(pythonify=True)
|
||||||
|
for feed in feeds:
|
||||||
|
if feed.name == 'blockrules of rules.emergingthreats.net':
|
||||||
|
e_thread_csv_feed = feed
|
||||||
|
break
|
||||||
|
updated_feed = self.admin_misp_connector.enable_feed(e_thread_csv_feed, pythonify=True)
|
||||||
|
self.assertEqual(updated_feed.settings, e_thread_csv_feed.settings)
|
||||||
|
updated_feed = self.admin_misp_connector.disable_feed(e_thread_csv_feed, pythonify=True)
|
||||||
|
self.assertEqual(updated_feed.settings, e_thread_csv_feed.settings)
|
||||||
|
# Fails, partial update of the feed deletes the settigns
|
||||||
|
# https://github.com/MISP/MISP/issues/5896
|
||||||
|
# updated_feed = self.admin_misp_connector.enable_feed(e_thread_csv_feed.id, pythonify=True)
|
||||||
|
# self.assertEqual(updated_feed.settings, e_thread_csv_feed.settings)
|
||||||
|
|
||||||
def test_servers(self):
|
def test_servers(self):
|
||||||
# add
|
# add
|
||||||
|
|
Loading…
Reference in New Issue